Giter Club home page Giter Club logo

dlrover's Introduction

DLRover

Editor
DLRover: An Automatic Distributed Deep Learning System

Build Code Coverage PyPI Status Badge

DLRover makes the distributed training of large AI models easy, stable, fast and green. It can automatically train the Deep Learning model on the distributed cluster. It helps model developers to focus on model arichtecture, without taking care of any engineering stuff, say, hardware acceleration, distributed running, etc. Now, it provides automated operation and maintenance for deep learning training jobs on K8s/Ray. Major features as

  • Fault-Tolerance: The distributed training can continue running in the event of failures.
  • Flash Checkpoint: The distributed training can recover failures from the in-memory checkpoint in seconds.
  • Auto-Scaling: The distributed training can scale up/down resources to improve the stability, throughput and resource utilization.

What's more, DLRover provides extension libraries of PyTorch and TensorFlow to Speed Up Training.

  • ATorch: an extension library of PyTorch to Speed Up Training of Large LLM.
  • TFPlus: an extension library of TensorFlow to Speed Up Training of Search, Recommendation and Advertisement.

Latest News

Why DLRover?

Fault Tolerance to Reduce the Downtime of a Large Scale Training Job

DLRover can restore the training when the process fails without stopping the training job. The actions to restore training in DLRover are:

  1. Automatically diagnose the failure reason.
  2. Restart the process not the node due to software errors.
  3. Restart the failed nodes due to hardward errors.

For detail, we can see the blog of fault-tolerance and elasticity. With fault tolerance, the goodput of GLM-65B training on thousands of GPUs increased from 69% to 95%. The goodput is the time spent computing useful new steps over the elapsed time of the training job. The downtime details are shown:

Editor

Fault Tolerance and Flash Checkpoint to Reduce Downtime of PyTorch Training

In addition to fault tolerance, DLRover provides the flash checkpoint to save/load checkpoint in seconds. With flash checkpoint, the training can frequently save checkpoints and reduce the roll-back step to resume training from the latest checkpoint when a failure happens. The features of flash checkpoint are:

  1. Asynchronously persist the checkpoint to the storage.
  2. Persist the checkpoint to the storage once the training process fails.
  3. Load the checkpoint from the host memory after the training process restarts.
  4. APIs for DDP, FSDP, DeepSpeed and Megatron-LM(cb995d5).
Editor Editor

The Performance of DLRover Flash Checkpoint to Save/Load GPT2-1.5B.

The figure illustrates that the I/O time of different DL frameworks to read checkpoint files when resuming training processes. With DLRover Flash Checkpoint, recovery could be completed in the order of seconds by loading checkpoints directly from shared memory, which is much faster compared to loading checkpoints from SSD and NAS.

Fault Tolerance Improves the Stability of TensorFlow PS Training

DLRover can recover failed parameter servers and workers to resume training.

  1. DLRover can automatically launch a Pod with more memory to recover the OOM node.
  2. DLRover can reassign the training data of a failed worker to other workers.
  3. DLRover can automatically scale up the parameter servers to fit the model size.

In AntGroup, DLRover manages hundreds of DL training jobs every day on the customized Kubernetes cluster in AntGroup. Except for the failed job resulting from code errors, the rate of completed jobs increase from 89% with tf-operator in KubeFlow to 95%. Other unrecoverable failure reasons of a job are data error, NaN loss of the model, network breakdown, and so on.

Editor

Auto-Scaling to Improve Training Performance and Resource Utilization

DLRover automatically scales up/down resources (for parameter servers or workers) at the runtime of a training job. By monitoring the workload of nodes and throughput, DLRover can diagnose the bottleneck of the resource configuration. The common bottleneck contains node straggler, the unbalanced workload of PS, insufficient CPU cores of nodes, and the insufficient number of nodes. DLRover can improve the training performance by dynamic resource adjustment.

In order to improve the training througphput, users prefer to configure their jobs with over-provision resources to avoid any potential risk from insufficient resources. This usually ends up in huge resource waste. DLRover Auto-Scaling can allocate resources by the demand of model training to reduce the waste of resources.

Editor

Dynamic Data Sharding For Elasticity and Fault-tolerance

Dynamic data sharding splits the dataset into many small shards and each shard only contains a few batches of training samples. The worker will get a shard only when it using up samples of the last one. With the dynaic sharding, DLRover can

  • recover the shard if the worker fails before using up samples of the shard.
  • mitigate the worker straggler by assigning more shards to the fast worker.

Integration to Offline and Online Deep Learning

With the data source transparency provided by dynamic data sharding, DLRover can be integrated with offline training which consumes batch data, and also supports online learning with real-time streaming data. (fed with a message queue like RocketMQ/Kafka/Pulsar/..., or executed as a training sink node inside Flink/Spark/Ray/...)

By practice, DLRover is an ideal component to build an end-to-end industrial online learning system, estimator.md provides a detailed example implemented with tf.estimator.Estimator.

How to Use DLRover to Train Your Models?

Train a PyTorch Model

We can use dlrover-run to run the training script which torchrun or torch.distributed.run can run.

pip install dlrover[torch]
dlrover-run --nnodes=1 --nproc_per_node=$NUM_TRAINERS train_scripts.py

The more detail tutorials are:

Train a TensorFlow Model

We can use DLRover to train a TensorFlow by the following steps:

  • Use TensorFlow estimator to develop the TensorFlow model.
  • Define the input of tf.dataset in a training configuration of DLRover.
  • Define your reader to read samples from the dataset file.

We can refer to the estimator.md to train a model with DLRover.

What's Next?

  • Multi-node in-memory redundant backup checkpoint to fast failure recovery.
  • Fine-grained automatic distributed training for GPU Synchronous jobs
    • hybrid-parallel mode
    • adapted hyper parameters adjustment with dynamic resources
    • more strategies for Fine-grained scenarioes
  • Full stack solution for Online Deep Learning
  • High performance extension library for Tensorflow/Pytorch to speed up training
  • ...

Contributing

Please refer to the DEVELOPMENT

Quick Start

An Example of Flash Checkpoint.

Train a PyTorch Model on Kubernetes.

Train a GPT Model on Kubernetes.

Train a TensorFlow Estimator on Kubernetes.

Community

Welcome to scan the DingTalk QR or search "AI Infra" in WeChat(微信) to join DLRover group. The DingTalk QR is:

Editor
Editor

dlrover's People

Contributors

adamantboy avatar antlera avatar balabalayi avatar cailun01 avatar chenhuihu avatar dufu1992 avatar fkyms avatar hxdtest avatar ideny42 avatar jiangjiadi avatar linkerzhang avatar liutongxuan avatar liyzcj avatar major-333 avatar mars1248 avatar merlintang avatar misterbrookt avatar nash635 avatar niconical avatar rockychan724 avatar samplise avatar schopenhauerzhang avatar skydoorkai avatar sylviasyp avatar wbmc avatar wlong692 avatar workingloong avatar youxingling avatar yzlnew avatar zhongyingmatrix avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dlrover's Issues

ElasticJob opreator fails when to apply an existing job.

INFO[0291] jobName: elasticjob-sample, phase Running    
INFO[0291] Master elasticjob-elasticjob-sample-master is deleted and relaunch a new one elasticjob-elasticjob-sample-master 
INFO[0291] Pod elasticjob-sample-edljob-ps-0 is deleted and will be relaunched 
1.670830146192972e+09	INFO	Observed a panic in reconciler: runtime error: invalid memory address or nil pointer dereference	{"controller": "elasticjob", "controllerGroup": "elastic.iml.github.io", "controllerKind": "ElasticJob", "elasticJob": {"name":"elasticjob-sample","namespace":"default"}, "namespace": "default", "name": "elasticjob-sample", "reconcileID": "c5e12fb0-74bb-404a-a2a6-d6ea166b7673"}
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x8 pc=0x103d4475c]

goroutine 269 [running]:
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Reconcile.func1()
	/Users/lazylong/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:118 +0x1a0
panic({0x1040bca60, 0x104d06ea0})
	/opt/homebrew/Cellar/go/1.18/libexec/src/runtime/panic.go:838 +0x204
github.com/intelligent-machine-learning/easydl/dlrover/go/operator/pkg/controllers/psstrategy.(*PSTaskManager).getTotalTaskCount(...)
	/Users/lazylong/workspace/easydl/dlrover/go/operator/pkg/controllers/psstrategy/strategy.go:154
github.com/intelligent-machine-learning/easydl/dlrover/go/operator/pkg/controllers/psstrategy.(*PSTaskManager).HandleFaultPods(0x1400045cd50, {0x10429d8f8, 0x14000131220}, 0x140007f04e0)
	/Users/lazylong/workspace/easydl/dlrover/go/operator/pkg/controllers/psstrategy/strategy.go:267 +0x28c
github.com/intelligent-machine-learning/easydl/dlrover/go/operator/pkg/controllers.(*ElasticJobReconciler).handleFaultPods(...)
	/Users/lazylong/workspace/easydl/dlrover/go/operator/pkg/controllers/elasticjob_controller.go:223
github.com/intelligent-machine-learning/easydl/dlrover/go/operator/pkg/controllers.(*ElasticJobReconciler).reconcileJobs(0x140006ff4c0, 0x140007f04e0)
	/Users/lazylong/workspace/easydl/dlrover/go/operator/pkg/controllers/elasticjob_controller.go:134 +0x614
github.com/intelligent-machine-learning/easydl/dlrover/go/operator/pkg/controllers.(*ElasticJobReconciler).Reconcile(0x140006ff4c0, {0x1042997d0?, 0x14000975d10?}, {{{0x140006e0c60, 0x7}, {0x1400016aba0, 0x11}}})
	/Users/lazylong/workspace/easydl/dlrover/go/operator/pkg/controllers/elasticjob_controller.go:96 +0x1e0
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Reconcile(0x104299728?, {0x1042997d0?, 0x14000975d10?}, {{{0x140006e0c60?, 0x1041c3c00?}, {0x1400016aba0?, 0xc0376bcfabc18d3c?}}})
	/Users/lazylong/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:121 +0x8c
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler(0x14000228f00, {0x104299728, 0x140006ff400}, {0x10410e360?, 0x140001ccda0?})
	/Users/lazylong/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:320 +0x2a8
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem(0x14000228f00, {0x104299728, 0x140006ff400})
	/Users/lazylong/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:273 +0x1b0
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2()
	/Users/lazylong/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:234 +0x78
created by sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2
	/Users/lazylong/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:230 +0x294
exit status 2

Add an online deep learning example.

  1. Add a streaming reader transferring data from upstream to worker.
  2. Using deeprec to implement delta model exportation.
  3. Test streaming data manager/splitter and restore from checkpoints.
  4. Add a example to illustrate online deep learning effects.

Rename _initial_nodes as nodes_queue

_initial_nodes is a little confusing. _initial_nodes serves as a queue storing pods template. Once pod is created, pods template is removed from _initial_nodes.

Make Job's Brain Relevant Parameters Configurable in Job Yaml

Brain requires a job to specify a couple of parameters when to process the requests from the job, e.g., processor, data store, config retriever. Now those parameters are constants in the master source codes. In this way, it is inconvenient to update those parameters. Furthermore, all jobs are sharing the same configuration. Therefore, it'd be better to make those parameters configurable on the job's yaml.

Exception in tensorflow_failover after scaling up PS.

[2023-03-16 15:39:23,737] [INFO][tensorflow_failover.py:127:refresh_env] successfully refresh TF_CONFIFG {"cluster": {"worker": ["deepctr-auto-scale-edljob-worker-1:2222"], "ps": ["deepctr-auto-scale-edljob-ps-0.dlrover.svc:2222", "deepctr-auto-scale-edljob-ps-1.dlrover.svc:2222"], "chief": [""]}, "task": {"type": "worker", "index": 1}}
[2023-03-16 15:39:23,737] [INFO][tensorflow_failover.py:142:refresh_env] global dict is {'executor': <dlrover.trainer.tensorflow.executor.estimator_executor.EstimatorExecutor object at 0x7f7a56f5f790>, 'failover': <dlrover.trainer.tensorflow.failover.tensorflow_failover.TensorflowFailover object at 0x7f7a56f5f7c0>, 'relaunch_for_ps': True}
[2023-03-16 15:39:23,748] [INFO][file_reader.py:88:iterator] shard is name: "iris_training_data"
start: 128
end: 160

[2023-03-16 15:39:23,753] [INFO][elastic_data_shard_report_hook.py:26:after_run] report_batch_done
[2023-03-16 15:39:23,753] [INFO][estimator_util.py:33:after_run] The training thread should stop for due to ps migration/scaling
[2023-03-16 15:39:23,753] [INFO] [master_client.py:319:join_sync]  1:worker join sync relauch_for_ps
[2023-03-16 15:39:23,754] [INFO][estimator_util.py:41:after_run] Before stopping training thread,                  worker should wait for cheif to save checkpoint
[2023-03-16 15:39:34,768] [INFO][estimator_util.py:49:after_run] Training thread stopped because chief had saved checkpoint
[2023-03-16 15:39:34,768] [INFO][global_step_hook.py:42:end] hook end
[2023-03-16 15:39:34,885] [INFO][estimator.py:371:train] Loss for final step: 0.875.
[2023-03-16 15:39:34,886] [INFO][tf_kubernetes_worker.py:77:run] ps is migrating or scaling

[2023-03-16 15:39:34,886] [INFO][tf_kubernetes_worker.py:42:init_executor] init_executor
[2023-03-16 15:39:34,886] [INFO][tensorflow_failover.py:41:__init__] initiating tensorflow_failover and failover level is 1
Traceback (most recent call last):
  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.8/dist-packages/dlrover/trainer/entry/local_entry.py", line 27, in <module>
    starter.run()
  File "/usr/local/lib/python3.8/dist-packages/dlrover/trainer/platform/starter.py", line 94, in run
    return execute(args)
  File "/usr/local/lib/python3.8/dist-packages/dlrover/trainer/platform/starter.py", line 85, in execute
    return worker.run()
  File "/usr/local/lib/python3.8/dist-packages/dlrover/trainer/worker/tf_kubernetes_worker.py", line 62, in run
    self.start_failover_monitor()
  File "/usr/local/lib/python3.8/dist-packages/dlrover/trainer/worker/tf_kubernetes_worker.py", line 48, in start_failover_monitor
    self.tensorflow_failover = TensorflowFailover()
  File "/usr/local/lib/python3.8/dist-packages/dlrover/trainer/tensorflow/failover/tensorflow_failover.py", line 48, in __init__
    self.init_for_dynet()
  File "/usr/local/lib/python3.8/dist-packages/dlrover/trainer/tensorflow/failover/tensorflow_failover.py", line 56, in init_for_dynet
    self._address = TF_CONFIG["cluster"][task_type][task_id]
IndexError: list index out of range

Directory design of EasyDL

  1. How to organize directories of EasyDL?
|-brain # Automaticallly generates the resource plan of the job.
|-operator
  |-controllers
    |-elastic-job.   # Creates a k8s Job
    |-resource-scale  # Scale out or in the job resource according to the Custom Resource(CR)
|-elasticdl   # dispatches data shards to workers and monitors training nodes.
|-easydl  # APIs for the training loop of TensorFlow/Pytorch to use elastic training.
  1. Which framework do we use to implement a trainer to support elastic training?
    We need a trainer to catch the exception and rebuild the session if parameter servers change. Now, a trainer is implemented with tf.estimator framework in AntGroup. However, Keras is more common than tf.estimator and TF 2.x has supported training a Keras model using ParameterServerStrategy. In another way, we can implement a trainer based on tf.estimator and convert a Keras model to an estimator model in TensorFlow.

Support torch elastic with c10d as the rdzv_backend

DLRover can set the service address of the 1st worker as rdzv_endpoint to execute torchrun.

torchrun
    --nnodes=MIN_SIZE:MAX_SIZE
    --nproc_per_node=TRAINERS_PER_NODE
    --max_restarts=NUM_ALLOWED_FAILURES_OR_MEMBERSHIP_CHANGES
    --rdzv_id=JOB_ID
    --rdzv_backend=c10d
    --rdzv_endpoint=HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

Dynamic data sharding

Dynamic data sharding can dispatch shards to workers during training and recover the task if a worker fails.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.