Giter Club home page Giter Club logo

celeborn's Introduction

Apache Celeborn

Celeborn CI
Celeborn (/ˈkeləbɔ:n/) is dedicated to improving the efficiency and elasticity of different map-reduce engines and provides an elastic, high-efficient management service for intermediate data including shuffle data, spilled data, result data, etc. Currently, Celeborn is focusing on shuffle data.

Internals

Architecture

Celeborn architecture Celeborn has three primary components: Master, Worker, and Client. Master manages all resources and syncs shared states based on Raft. Worker processes read-write requests and merges data for each reducer. LifecycleManager maintains metadata of each shuffle and runs within the Spark driver.

Feature

  1. Disaggregate Computing and storage.
  2. Push-based shuffle write and merged shuffle read.
  3. High availability and high fault tolerance.

Shuffle Process

Celeborn shuffle

  1. Mappers lazily ask LifecycleManager to registerShuffle.
  2. LifecycleManager requests slots from Master.
  3. Workers reserve slots and create corresponding files.
  4. Mappers get worker locations from LifecycleManager.
  5. Mappers push data to specified workers.
  6. Workers merge and replicate data to its peer.
  7. Workers flush to disk periodically.
  8. Mapper tasks accomplish and trigger MapperEnd event.
  9. When all mapper tasks are complete, workers commit files.
  10. Reducers ask for file locations.
  11. Reducers read shuffle data.

Load Balance

Load Balance

We introduce slots to achieve load balance. We will equally distribute partitions on every Celeborn worker by tracking slot usage. The Slot is a logical concept in Celeborn Worker that represents how many partitions can be allocated to each Celeborn Worker. Celeborn Worker's slot count is decided by total usable disk size / average shuffle file size. Celeborn worker's slot count decreases when a partition is allocated and increments when a partition is freed.

Build

  1. Celeborn supports Spark 2.4/3.0/3.1/3.2/3.3/3.4/3.5, Flink 1.14/1.15/1.17/1.18/1.19 and Hadoop MapReduce 2/3.
  2. Celeborn tested under Scala 2.11/2.12/2.13 and Java 8/11/17 environment.

Build Celeborn via make-distribution.sh:

./build/make-distribution.sh -Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17/-Pflink-1.18/-Pflink-1.19/-Pmr

Package apache-celeborn-${project.version}-bin.tgz will be generated.

NOTE: The following table indicates the compatibility of Celeborn Spark and Flink clients with different versions of Spark and Flink for various Java and Scala versions.

Java 8/Scala 2.11 Java 8/Scala 2.12 Java 11/Scala 2.12 Java 17/Scala 2.12 Java 8/Scala 2.13 Java 11/Scala 2.13 Java 17/Scala 2.13
Spark 2.4
Spark 3.0
Spark 3.1
Spark 3.2
Spark 3.3
Spark 3.4
Spark 3.5
Flink 1.14
Flink 1.15
Flink 1.17
Flink 1.18
Flink 1.19

To compile the client for Spark 2.4 with Scala 2.12, please use the following command:

  • Scala 2.12.8/2.12.9/2.12.10
./build/make-distribution.sh -DskipTests -Pspark-2.4 -Dscala.version=${scala.version} -Dscala.binary.version=2.12 -Dmaven.plugin.scala.version=3.2.2 -Dmaven.plugin.silencer.version=1.6.0
  • Scala 2.12.13-2.12.18
./build/make-distribution.sh -DskipTests -Pspark-2.4 -Dscala.version=${scala.version} -Dscala.binary.version=2.12

To compile for Spark 3.5 with Java21, please use the following command

./build/make-distribution.sh -Pspark-3.5 -Pjdk-21
./build/make-distribution.sh --sbt-enabled -Pspark-3.5 -Pjdk-21

Package Details

Build procedure will create a compressed package.

General package layout:

    ├── RELEASE                         
    ├── bin                             
    ├── conf                            
    ├── jars           // common jars for master and worker                 
    ├── master-jars                     
    ├── worker-jars                     
    ├── spark          // Spark client jars if spark profiles are activated
    ├── flink          // Flink client jars if flink profiles are activated
    ├── mr             // MapReduce client jars if mr profile is activated
    └── sbin

Compatibility

Celeborn server is compatible with all clients inside various engines. However, Celeborn clients must be consistent with the version of the specified engine. For example, if you are running Spark 2.4, you must compile Celeborn client with -Pspark-2.4; if you are running Spark 3.2, you must compile Celeborn client with -Pspark-3.2; if you are running flink 1.14, you must compile Celeborn client with -Pflink-1.14.

Usage

Celeborn cluster composes of Master and Worker nodes, the Master supports both single and HA mode(Raft-based) deployments.

Deploy Celeborn

Deploy on host

  1. Unzip the tarball to $CELEBORN_HOME.
  2. Modify environment variables in $CELEBORN_HOME/conf/celeborn-env.sh.

EXAMPLE:

#!/usr/bin/env bash
CELEBORN_MASTER_MEMORY=4g
CELEBORN_WORKER_MEMORY=2g
CELEBORN_WORKER_OFFHEAP_MEMORY=4g
  1. Modify configurations in $CELEBORN_HOME/conf/celeborn-defaults.conf.

EXAMPLE: single master cluster

# used by client and worker to connect to master
celeborn.master.endpoints clb-master:9097

# used by master to bootstrap
celeborn.master.host clb-master
celeborn.master.port 9097

celeborn.metrics.enabled true
celeborn.worker.flusher.buffer.size 256k

# If Celeborn workers have local disks and HDFS. Following configs should be added.
# If Celeborn workers have local disks, use following config.
# Disk type is HDD by default.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD

# If Celeborn workers don't have local disks. You can use HDFS.
# Do not set `celeborn.worker.storage.dirs` and use following configs.
celeborn.storage.activeTypes HDFS
celeborn.worker.sortPartition.threads 64
celeborn.worker.commitFiles.timeout 240s
celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s
# Either principal/keytab or valid TGT cache is required to access kerberized HDFS
celeborn.storage.hdfs.kerberos.principal user@REALM
celeborn.storage.hdfs.kerberos.keytab /path/to/user.keytab

# If your hosts have disk raid or use lvm, set `celeborn.worker.monitor.disk.enabled` to false
celeborn.worker.monitor.disk.enabled false

EXAMPLE: HA cluster

# used by client and worker to connect to master
celeborn.master.endpoints clb-1:9097,clb-2:9097,clb-3:9097

# used by master nodes to bootstrap, every node should know the topology of whole cluster, for each node,
# `celeborn.master.ha.node.id` should be unique, and `celeborn.master.ha.node.<id>.host` is required.
celeborn.master.ha.enabled true
celeborn.master.ha.node.id 1
celeborn.master.ha.node.1.host clb-1
celeborn.master.ha.node.1.port 9097
celeborn.master.ha.node.1.ratis.port 9872
celeborn.master.ha.node.2.host clb-2
celeborn.master.ha.node.2.port 9097
celeborn.master.ha.node.2.ratis.port 9872
celeborn.master.ha.node.3.host clb-3
celeborn.master.ha.node.3.port 9097
celeborn.master.ha.node.3.ratis.port 9872
celeborn.master.ha.ratis.raft.server.storage.dir /mnt/disk1/celeborn_ratis/

celeborn.metrics.enabled true
# If you want to use HDFS as shuffle storage, make sure that flush buffer size is at least 4MB or larger.
celeborn.worker.flusher.buffer.size 256k

# If Celeborn workers have local disks and HDFS. Following configs should be added.
# Celeborn will use local disks until local disk become unavailable to gain the best performance.
# Increase Celeborn's off-heap memory if Celeborn write to HDFS.
# If Celeborn workers have local disks, use following config.
# Disk type is HDD by default.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD

# If Celeborn workers don't have local disks. You can use HDFS.
# Do not set `celeborn.worker.storage.dirs` and use following configs.
celeborn.storage.activeTypes HDFS
celeborn.worker.sortPartition.threads 64
celeborn.worker.commitFiles.timeout 240s
celeborn.worker.commitFiles.threads 128
celeborn.master.slot.assign.policy roundrobin
celeborn.rpc.askTimeout 240s
celeborn.worker.flusher.hdfs.buffer.size 4m
celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn
celeborn.worker.replicate.fastFail.duration 240s

# If your hosts have disk raid or use lvm, set `celeborn.worker.monitor.disk.enabled` to false
celeborn.worker.monitor.disk.enabled false

Flink engine related configurations:

# If you are using Celeborn for flink, these settings will be needed.
celeborn.worker.directMemoryRatioForReadBuffer 0.4
celeborn.worker.directMemoryRatioToResume 0.5
# These setting will affect performance. 
# If there is enough off-heap memory, you can try to increase read buffers.
# Read buffer max memory usage for a data partition is `taskmanager.memory.segment-size * readBuffersMax`
celeborn.worker.partition.initial.readBuffersMin 512
celeborn.worker.partition.initial.readBuffersMax 1024
celeborn.worker.readBuffer.allocationWait 10ms
  1. Copy Celeborn and configurations to all nodes.
  2. Start all services. If you install Celeborn distribution in the same path on every node and your cluster can perform SSH login then you can fill $CELEBORN_HOME/conf/hosts and use $CELEBORN_HOME/sbin/start-all.sh to start all services. If the installation paths are not identical, you will need to start the service manually.
    Start Celeborn master
    $CELEBORN_HOME/sbin/start-master.sh
    Start Celeborn worker
    $CELEBORN_HOME/sbin/start-worker.sh
  3. If Celeborn starts success, the output of the Master's log should be like this:
22/10/08 19:29:11,805 INFO [main] Dispatcher: Dispatcher numThreads: 64
22/10/08 19:29:11,875 INFO [main] TransportClientFactory: mode NIO threads 64
22/10/08 19:29:12,057 INFO [main] Utils: Successfully started service 'MasterSys' on port 9097.
22/10/08 19:29:12,113 INFO [main] Master: Metrics system enabled.
22/10/08 19:29:12,125 INFO [main] HttpServer: master: HttpServer started on port 9098.
22/10/08 19:29:12,126 INFO [main] Master: Master started.
22/10/08 19:29:57,842 INFO [dispatcher-event-loop-19] Master: Registered worker
Host: 192.168.15.140
RpcPort: 37359
PushPort: 38303
FetchPort: 37569
ReplicatePort: 37093
SlotsUsed: 0()
LastHeartbeat: 0
Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 448284381184, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , /mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk3, usableSpace: 450755608576, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , /mnt/disk2=DiskInfo(maxSlots: 6713, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk2, usableSpace: 450532900864, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , /mnt/disk4=DiskInfo(maxSlots: 6712, committed shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk4, usableSpace: 450456805376, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs }
WorkerRef: null

Deploy Celeborn on K8S

Please refer to our website

Deploy Spark client

Copy $CELEBORN_HOME/spark/*.jar to $SPARK_HOME/jars/.

Spark Configuration

To use Celeborn, the following spark configurations should be added.

# Shuffle manager class name changed in 0.3.0:
#    before 0.3.0: `org.apache.spark.shuffle.celeborn.RssShuffleManager`
#    since 0.3.0: `org.apache.spark.shuffle.celeborn.SparkShuffleManager`
spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
# must use kryo serializer because java serializer do not support relocation
spark.serializer org.apache.spark.serializer.KryoSerializer

# celeborn master
spark.celeborn.master.endpoints clb-1:9097,clb-2:9097,clb-3:9097
# This is not necessary if your Spark external shuffle service is Spark 3.1 or newer
spark.shuffle.service.enabled false

# options: hash, sort
# Hash shuffle writer use (partition count) * (celeborn.push.buffer.max.size) * (spark.executor.cores) memory.
# Sort shuffle writer uses less memory than hash shuffle writer, if your shuffle partition count is large, try to use sort hash writer.  
spark.celeborn.client.spark.shuffle.writer hash

# We recommend setting `spark.celeborn.client.push.replicate.enabled` to true to enable server-side data replication
# If you have only one worker, this setting must be false 
# If your Celeborn is using HDFS, it's recommended to set this setting to false
spark.celeborn.client.push.replicate.enabled true

# Support for Spark AQE only tested under Spark 3
# we recommend setting localShuffleReader to false for getting better performance of Celeborn
spark.sql.adaptive.localShuffleReader.enabled false

# If Celeborn is using HDFS
spark.celeborn.storage.hdfs.dir hdfs://<namenode>/celeborn

# we recommend enabling aqe support to gain better performance
spark.sql.adaptive.enabled true
spark.sql.adaptive.skewJoin.enabled true

# Support Spark Dynamic Resource Allocation
# Required Spark version >= 3.5.0
spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
# Required Spark version >= 3.4.0, highly recommended to disable
spark.dynamicAllocation.shuffleTracking.enabled false

Deploy Flink client

Copy $CELEBORN_HOME/flink/*.jar to $FLINK_HOME/lib/.

Flink Configuration

To use Celeborn, the following flink configurations should be added.

shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097

celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
celeborn.client.push.maxReqsInFlight: 128

# Network connections between peers
celeborn.data.io.numConnectionsPerPeer: 16
# threads number may vary according to your cluster but do not set to 1
celeborn.data.io.threads: 32
celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
celeborn.rpc.dispatcher.numThreads: 32

# Floating buffers may need to change `taskmanager.network.memory.fraction` and `taskmanager.network.memory.max`
taskmanager.network.memory.floating-buffers-per-gate: 4096
taskmanager.network.memory.buffers-per-channel: 0
taskmanager.memory.task.off-heap.size: 512m

Note: The config option execution.batch-shuffle-mode should configure as ALL_EXCHANGES_BLOCKING.

Deploy MapReduce client

Copy $CELEBORN_HOME/mr/*.jar into mapreduce.application.classpath and yarn.application.classpath. Meanwhile, configure the following settings in YARN and MapReduce config.

-Dyarn.app.mapreduce.am.job.recovery.enable=false
-Dmapreduce.job.reduce.slowstart.completedmaps=1
-Dmapreduce.celeborn.master.endpoints=<master-1-1>:9097
-Dyarn.app.mapreduce.am.command-opts=org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn
-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.CelebornMapOutputCollector
-Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer

Best Practice

If you want to set up a production-ready Celeborn cluster, your cluster should have at least 3 masters and at least 4 workers. Masters and works can be deployed on the same node but should not deploy multiple masters or workers on the same node. See more detail in CONFIGURATIONS

Support Spark Dynamic Allocation

For Spark versions >= 3.5.0, Celeborn can be used with Dynamic Resource Allocation(DRA) when spark.shuffle.sort.io.plugin.class is set to org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO. Check SPARK-42689 and CELEBORN-911 for more details.

For Spark versions < 3.5.0, we provide a patch to enable users to use Spark with DRA and Celeborn.

Metrics

Celeborn has various metrics. METRICS

Community and Support

Subscribe Mailing Lists

Mail List is the most recognized form of communication in the Apache community. Contact us through the following mailing list.

Name Scope
[email protected] Development-related discussions Subscribe Unsubscribe Archives

Report Issues or Submit Pull Request

If you meet any questions, feel free to file a 🔗Jira Ticket or connect us and fix it by submitting a 🔗Pull Request.

IM Contact Info
Slack 🔗Slack
DingTalk 🔗DingTalk

How to Contribute

This is an active open-source project. We are always open to developers who want to use the system or contribute to it. See more detail in Contributing.

celeborn's People

Contributors

angerszhuuuu avatar cfmcgrady avatar codingcat avatar cxzl25 avatar dev-lpq avatar fmx avatar jiaoqingbo avatar kaijchen avatar kerwin-zk avatar leesf avatar lyy-pineapple avatar miyuesc avatar mridulm avatar nafiyaix avatar onebox-li avatar otterc avatar pan3793 avatar rexxiong avatar stenicholas avatar tiny-dust avatar turbofei avatar ulysses-you avatar waitinfuture avatar wangshengjie123 avatar xleoken avatar yaooqinn avatar zhongqiangczq avatar zhouyifan279 avatar zwangsheng avatar zy-jordan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

celeborn's Issues

[FEATURE] update netty version .

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @waitinfuture

/assign @who-can-help-you

[BUG] Compile error with Spark3.2

What is the bug?

A clear and concise description of what the bug is.

How to reproduce the bug?

Steps to reproduce the bug.

Could you share logs or screenshots?

If applicable, add logs/screenshots to help explain your problem.

/cc @who-need-to-know

/assign @who-can-solve-this-bug

[BUG] Filechannel close takes a long time.

What is the bug?

When committing files, channel.close() sometimes takes a long time.

How to reproduce the bug?

  • shuffle write data size: 14.5G
  • shuffle partition num: 200

image

sparkProperties:

spark.executor.instances: 5
spark.kubernetes.executor.request.cores: 6
spark.executor.memory: 30g
spark.shuffle.manager: org.apache.spark.shuffle.rss.RssShuffleManager
spark.rss.master.address: 192.168.x.xx:9097
spark.shuffle.service.enabled: false
spark.rss.push.data.replicate: false
spark.sql.adaptive.enabled: false
spark.sql.adaptive.localShuffleReader.enabled: false
spark.sql.adaptive.skewJoin.enabled: false

rss-defaults.conf

rss.master.address 192.168.5.19:9097
rss.metrics.system.enable true
rss.remove.shuffle.delay 300s
rss.worker.flush.buffer.size 256k
rss.worker.flush.queue.capacity 512
rss.worker.base.dirs /mnt/disk1

ECS as virtual macheine:

16c64g(ecs.g7.4xlarge) x 3 workers, each worker has ESSD 100G PL1 disk.

Could you share logs or screenshots?

The first read took 28s because it was waiting for the completion of the previous stage commit file, and the subsequent shuffle read took a short time (within 0.1s), and the commit file was slow because it was slow when executing channel.close().

image

When I changed FileChannel.write() to DataOutputStream.write(), the above problem disappeared.

image

Note: I added channel.force() methoed before channel.close() and found that the time-consuming is mainly caused by channel.force().

[BUG] Master HttpServlet `workInfo` but

What is the bug?

22/03/03 15:06:34,901 WARN [nioEventLoopGroup-6-1] DefaultChannelPipeline: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.ClassCastException: scala.collection.immutable.$colon$colon cannot be cast to java.util.List
	at com.aliyun.emr.rss.service.deploy.master.Master.$anonfun$getWorkerInfos$1(Master.scala:464)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at com.aliyun.emr.rss.service.deploy.master.Master.getWorkerInfos(Master.scala:460)
	at com.aliyun.emr.rss.service.deploy.master.http.HttpRequestHandler.handleRequest(HttpRequestHandler.scala:66)
	at com.aliyun.emr.rss.service.deploy.master.http.HttpRequestHandler.channelRead0(HttpRequestHandler.scala:43)
	at com.aliyun.emr.rss.service.deploy.master.http.HttpRequestHandler.channelRead0(HttpRequestHandler.scala:31)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

How to reproduce the bug?

curl -get http://host:{rss.master.prometheus.metric.port}/workerInfo

Could you share logs or screenshots?

[FEATURE] Worker has many indicators, and Prometheus has been connected. Is there any plan to analyze the indicators in detail

我们已经采集指标到普罗米修斯,但是面对这么多指标,比较困扰,是否有计划对指标进行较为详细的剖析,我们也好有针对性地关注这些指标的不同表现,然后去优化RSS服务,或进行隐患诊断。

另外 是否有计划提供grafana的监控模版。

非常感谢!

We have collected indicators to Prometheus, but in the face of so many indicators, it is difficult to know whether there is a plan to carry out a detailed analysis of indicators, so that we can focus on the different performance of these indicators, and then optimize RSS service or carry out hidden trouble diagnosis.
In addition, is there any plan to provide the monitoring template of Grafana?
Thanks very much!

[BUG] NullPointerException for worker on LVM format

What is the bug?

Throw NullPointerException when I start worker on node which use LVM.
The way I see it, com.aliyun.emr.rss.service.deploy.worker.DeviceInfo#getDeviceAndMountInfos not support LVM format, it can't get correct mount information.

How to reproduce the bug?

start worker by $RSS_HOME/sbin/start-worker.sh rss://node01:9097 on node wich use LVM format

Could you share logs or screenshots?

image

/cc @who-need-to-know

/assign @who-can-solve-this-bug

[BUG] WorkRefer can be null cause throw NPE

What is the bug?

Worker in blacklist and WorkerRef is null

22/03/29 15:01:46 WARN LifecycleManager: [reserve buffer] failed due to blacklist:
Host: xxx.xxx.xxx.xx
RpcPort: 45149
PushPort: 46795
FetchPort: 38947
TotalSlots: -1
SlotsUsed: 0
SlotsAvailable: -1
LastHeartBeat: 0
WorkerRef: null

Cause task throw NPE when reserveSlot


22/03/29 15:01:46 ERROR Inbox: Ignoring error
java.lang.NullPointerException
	at com.aliyun.emr.rss.client.write.LifecycleManager.$anonfun$reserveSlotsWithRetry$2(LifecycleManager.scala:803)
	at com.aliyun.emr.rss.client.write.LifecycleManager.$anonfun$reserveSlotsWithRetry$2$adapted(LifecycleManager.scala:787)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at com.aliyun.emr.rss.client.write.LifecycleManager.reserveSlotsWithRetry(LifecycleManager.scala:787)
	at com.aliyun.emr.rss.client.write.LifecycleManager.handleRegisterShuffle(LifecycleManager.scala:287)
	at com.aliyun.emr.rss.client.write.LifecycleManager$$anonfun$receiveAndReply$1.applyOrElse(LifecycleManager.scala:180)
	at com.aliyun.emr.rss.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:110)
	at com.aliyun.emr.rss.common.rpc.netty.Inbox.safelyCall(Inbox.scala:214)
	at com.aliyun.emr.rss.common.rpc.netty.Inbox.process(Inbox.scala:107)
	at com.aliyun.emr.rss.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:222)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


How to reproduce the bug?

Close and restart a worker when running application

Could you share logs or screenshots?

If applicable, add logs/screenshots to help explain your problem.

/cc @who-need-to-know

/assign @who-can-solve-this-bug

[FEATURE] Rss shuffle reader choose proper replica to read.

Is your feature request related to a problem? Please describe.

If a disk went wrong, the RSS worker read can take a lot of time.

Describe the solution you'd like

We can optimize the RSS Shuffle reader to pick the faster replica if data replication is active.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @waitinfuture @wangshengjie123

/assign @who-can-help-you

[FEATURE] Support Spark3.0 and higher

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @who-need-to-know

/assign @who-can-help-you

[FEATURE] Load balance improvements.

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @who-need-to-know

/assign @who-can-help-you

[BUG] Reserve slots takes long time, cause executor register shuffle timeout

What is the bug?

Current logic, we reserve slots in serial, if cluster have hundreds and thousands workers, the action will take long time, it will cause register shuffle timeout.

How to reproduce the bug?

large cluster and shot rpc timeout

Could you share logs or screenshots?

image

`2022-02-08,17:33:10,318 INFO com.aliyun.emr.ess.common.meta.EssMetaSystem: OfferSlots for application_1642533366533_495641_1-16 Success!

2022-02-08,17:41:12,963 INFO com.aliyun.emr.ess.common.meta.EssMetaSystem: Destroyed peer partitions for reserve buffer failed workers application_1642533366533_495641_1-16, {

2022-02-08,17:41:13,169 INFO com.aliyun.emr.ess.common.meta.EssMetaSystem: Reserve buffer success for application_1642533366533_495641_1-16

2022-02-08,17:41:13,169 INFO com.aliyun.emr.ess.common.meta.EssMetaSystem: ReserveBuffer for application_1642533366533_495641_1-16 success!
`

/cc @FMX @TonyDoen

/assign @wangshengjie123

[FEATURE] Multiple version cluster compability.

Is your feature request related to a problem? Please describe.

We need different RSS clusters for Spark2 or Spark3. We should be able to run the different clients on a single RSS cluster.

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @who-need-to-know

/assign @who-can-help-you

[FEATURE] Add rest api to isolate disk.

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @waitinfuture @wangshengjie123

/assign @who-can-help-you

[BUG] duplicated tcp_nodelay flag

What is the bug?

there is duplicated tcp_nodelay flag set by sever side
image

How to reproduce the bug?

Steps to reproduce the bug.

Could you share logs or screenshots?

If applicable, add logs/screenshots to help explain your problem.

/cc @who-need-to-know

/assign @who-can-solve-this-bug

[FEATURE] Stage Re-Run when data lost

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @who-need-to-know

/assign @who-can-help-you

[FEATURE] Remove the dependency on active shuffle for dynamic allocation for spark 3

Is your feature request related to a problem? Please describe.

Only enabling shuffle tracking for spark 3 to support dynamic allocation will cause the executor to shut down late while waiting for the active shuffle to be consumed. Because the shuffle data is stored in the remote shuffle service, this wait is unnecessary.

Describe the solution you'd like

Add a spark patch like spark 2.x to support dynamic allocation.

[FEATURE] Support checksum

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @who-need-to-know

/assign @who-can-help-you

[QUESTION] HA模式下,指定Master服务IP和端口的地方只能配置一个IP

HA模式下,指定Master服务IP和端口的地方只能配置一个IP
如:启动worker时:
sh /opt/ali_rss/rss-1.0.0-bin-release/sbin/start-worker.sh rss://master-ip:9097
如果用类似如下命令则会报错:
sh /opt/ali_rss/rss-1.0.0-bin-release/sbin/start-worker.sh rss://master1-ip:9097,rss://master2-ip:9097,rss://master3-ip:9097

我想配置多个master IP该怎样配置呢,因为,如果某一个master进程挂掉,脚本是无法运行成功的

[Question] Fail to build project

Get a error when build project as follows
./dev/make-distribution.sh -Pspark-3 -Dspark.version=3.0.1 -Dmaven.test.skip=true

error message :
[INFO] Aliyun E-MapReduce Shuffle Service Project Parent POM SUCCESS [ 0.344 s] [INFO] Aliyun E-MapReduce Shuffle Service Common .......... SUCCESS [ 16.039 s] [INFO] Aliyun E-MapReduce Shuffle Service Java Client ..... SUCCESS [ 14.089 s] [INFO] Aliyun E-MapReduce Shuffle Service Shuffle Manager for Spark FAILURE [ 1.814 s] [INFO] Aliyun E-MapReduce Shuffle Service Shuffle Manager for Spark SKIPPED [INFO] Aliyun E-MapReduce Shuffle Service Common .......... SUCCESS [ 13.077 s] [INFO] Aliyun E-MapReduce Shuffle Service Service Master Module SKIPPED [INFO] Aliyun E-MapReduce Shuffle Service Service Worker Module SKIPPED [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 32.997 s (Wall Clock) [INFO] Finished at: 2022-01-17T14:14:59+08:00 [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal on project shuffle-manager-common: Could not resolve dependencies for project com.aliyun.emr:shuffle-manager-common:jar:1.0.0: Could not find artifact com.aliyun.emr:client:jar:tests:1.0.0 in XXXX -> [Help 1] [ERROR]

[FEATURE] RSS client supports multiple master addresses.

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @waitinfuture @wangshengjie123

/assign @FMX

[BUG] ShuffleClinet limitMaxInFlight does not take effect strictly #3

What is the bug?

A clear and concise description of what the bug is.

How to reproduce the bug?

Steps to reproduce the bug.

Could you share logs or screenshots?

If applicable, add logs/screenshots to help explain your problem.

/cc @who-need-to-know

/assign @who-can-solve-this-bug

[BUG] metric timer will trigger integer overflow .

What is the bug?

RssWorker metric system has a bug that will cause an integer overflow when runs long enough.

How to reproduce the bug?

Steps to reproduce the bug.

Could you share logs or screenshots?

If applicable, add logs/screenshots to help explain your problem.

22/01/06 10:24:53,820 ERROR [push-server-6-10] TransportRequestHandler: Error while invoking RpcHandler#receive() on PushData PushData{requestId=702699486, mode=1, shuffleKey=application_1640695558204_38730_1-2, partitionUniqueId=151-0, body size=15066}
java.lang.ArrayIndexOutOfBoundsException: -4095
at com.aliyun.emr.rss.server.common.metrics.ResettableSlidingWindowReservoir.update(ResettableSlidingWindowReservoir.scala:35)
at com.codahale.metrics.Histogram.update(Histogram.java:39)
at com.codahale.metrics.Timer.update(Timer.java:164)
at com.codahale.metrics.Timer.update(Timer.java:86)
at com.aliyun.emr.rss.server.common.metrics.source.AbstractSource.doStopTimer(AbstractSource.scala:148)
at com.aliyun.emr.rss.server.common.metrics.source.AbstractSource.stopTimer(AbstractSource.scala:131)
at com.aliyun.emr.rss.service.deploy.worker.Worker$$anon$7.onSuccess(Worker.scala:587)
at com.aliyun.emr.rss.service.deploy.worker.Worker.handlePushData(Worker.scala:660)
at com.aliyun.emr.rss.service.deploy.worker.PushDataRpcHandler.receivePushData(PushDataRpcHandler.java:56)

22/01/06 10:25:31,123 WARN [nioEventLoopGroup-11-1] DefaultChannelPipeline: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.NegativeArraySizeException
at com.aliyun.emr.rss.server.common.metrics.ResettableSlidingWindowReservoir.getSnapshot(ResettableSlidingWindowReservoir.scala:40)
at com.codahale.metrics.Histogram.getSnapshot(Histogram.java:54)
at com.codahale.metrics.Timer.getSnapshot(Timer.java:159)
at com.aliyun.emr.rss.server.common.metrics.source.AbstractSource.recordTimer(AbstractSource.scala:251)
at com.aliyun.emr.rss.server.common.metrics.source.AbstractSource$$anonfun$getMetrics$4.apply(AbstractSource.scala:282)

/cc @waitinfuture

/assign @FMX

[QUESTION] Spark job exec failed

软件版本:
spark2.4.5
hadoop2.6.0-cdh5.13.1

spark作业提交核心配置参数:

$SPARK_HOME/bin/spark-submit
--master yarn
--queue low.bigdata_online
--executor-memory 10G
--executor-cores 4
--driver-memory 5g
--conf spark.ui.port=$port
--conf spark.executor.memoryOverhead=4g
--conf spark.dynamicAllocation.minExecutors=1
--conf spark.dynamicAllocation.maxExecutors=24
--conf spark.yarn.maxAppAttempts=1
--conf spark.speculation=true
--conf spark.dynamicAllocation.enabled=true
--conf spark.shuffle.service.enabled=false
--conf spark.shuffle.manager=org.apache.spark.shuffle.rss.RssShuffleManager
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.rss.master.address=master-ip:9097
--conf spark.rss.shuffle.writer.mode=hash
--conf spark.rss.push.data.replicate=true
--conf spark.yarn.archive=hdfs:///data/spark2.4.5_rss_jars
......

问题描述:
RSS 服务运行正常,Spark作业运行时抛如下异常,(Spark Jars目录下是存在rss-shuffle-manager-1.0.0-shaded.jar的)

: java.lang.NoClassDefFoundError: com/aliyun/emr/rss/client/write/LifecycleManager
at org.apache.spark.shuffle.rss.RssShuffleManager.org$apache$spark$shuffle$rss$RssShuffleManager$$initializeLifecycleManager(RssShuffleManager.scala:60)
at org.apache.spark.shuffle.rss.RssShuffleManager$$anonfun$registerShuffle$2.apply(RssShuffleManager.scala:76)
at org.apache.spark.shuffle.rss.RssShuffleManager$$anonfun$registerShuffle$2.apply(RssShuffleManager.scala:76)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.shuffle.rss.RssShuffleManager.registerShuffle(RssShuffleManager.scala:76)
at org.apache.spark.ShuffleDependency.(Dependency.scala:93)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.aliyun.emr.rss.client.write.LifecycleManager
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 48 more

[FEATURE] Add metrics about shuffle fetch send data time.

Is your feature request related to a problem? Please describe.

A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like

A clear and concise description of what you want to happen.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

/cc @waitinfuture @wangshengjie123

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.