Giter Club home page Giter Club logo

elasticdl's Introduction

ElasticDL: A Kubernetes-native Deep Learning Framework

Travis-CI Build Status Code Coverage License: MIT PyPI Status Badge

ElasticDL is a Kubernetes-native deep learning framework that supports fault-tolerance and elastic scheduling.

Main Features

Elastic Scheduling and Fault-Tolerance

Through Kubernetes-native design, ElasticDL enables fault-tolerance and works with the priority-based preemption of Kubernetes to achieve elastic scheduling for deep learning tasks.

Support TensorFlow and PyTorch

  • TensorFlow Estimator.
  • TensorFlow Keras.
  • PyTorch

Minimalism Interface

Given a model defined with Keras API, train the model distributedly with a command line.

elasticdl train \
  --image_name=elasticdl:mnist \
  --model_zoo=model_zoo \
  --model_def=mnist.mnist_functional_api.custom_model \
  --training_data=/data/mnist/train \
  --job_name=test-mnist \
  --volume="host_path=/data,mount_path=/data"

Quick Start

Please check out our step-by-step tutorial for running ElasticDL on local laptop, on-prem cluster, or on public cloud such as Google Kubernetes Engine.

TensorFlow Estimator on MiniKube

TensorFlow Keras on MiniKube

PyTorch on MiniKube

Background

TensorFlow/PyTorch has its native distributed computing feature that is fault-recoverable. In the case that some processes fail, the distributed computing job would fail; however, we can restart the job and recover its status from the most recent checkpoint files.

ElasticDL supports fault-tolerance during distributed training. In the case that some processes fail, the job would go on running. Therefore, ElasticDL doesn't need to save checkpoint nor recover from checkpoints.

The feature of fault-tolerance makes ElasticDL works with the priority-based preemption of Kubernetes to achieve elastic scheduling. When Kubernetes kills some processes of a job to free resource for new-coming jobs with higher priority, the current job doesn't fail but continues with less resource.

Elastic scheduling could significantly improve the overall utilization of a cluster. Suppose that a cluster has N GPUs, and a job is using one of them. Without elastic scheduling, a new job claiming N GPUs would have to wait for the first job to complete before starting. This pending time could be hours, days, or even weeks. During this very long time, the utilization of the cluster is 1/N. With elastic scheduling, the new job could start running immediately with N-1 GPUs, and Kubernetes might increase its GPU consumption by 1 after the first job completes. In this case, the overall utilization is 100%.

The feature of elastic scheduling of ElasticDL comes from its Kubernetes-native design -- it doesn't rely on Kubernetes extensions like Kubeflow to run TensorFlow/PyTorch programs; instead, the master process of an ElasticDL job calls Kubernetes API to start workers and parameter servers; it also watches events like process/pod killing and reacts to such events to realize fault-tolerance.

In short, ElasticDL enhances TensorFlow/PyTorch with fault-tolerance and elastic scheduling in the case that you have a Kubernetes cluster. We provide a tutorial showing how to set up a Kubernetes cluster on Google Cloud and run ElasticDL jobs there. We respect TensorFlow's native distributed computing feature, which doesn't require specific computing platforms like Kubernetes and allows TensorFlow running on any platform.

Development Guide

Please refer to this document for development guide.

elasticdl's People

Contributors

brightcoder01 avatar chunyang-wen avatar dependabot[bot] avatar dlperf avatar kelang-tian avatar lwpyr avatar mhaoli avatar qijune avatar shijungg avatar skydoorkai avatar terrytangyuan avatar tonyyang-svail avatar typhoonzero avatar wangkuiyi avatar workingloong avatar xiaoyili avatar yancey1989 avatar yhjust1 avatar yuyicg avatar ywskycn avatar zhujl1991 avatar zou000 avatar zuston avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

elasticdl's Issues

The reading notes of DistBelief paper from Yi

https://papers.nips.cc/paper/4687-large-scale-distributed-deep-networks.pdf

  • One framework: DistBelief

    • model parallelism and data parallelism
  • Two algorithms:

    1. Downpour SGD:

      • a slight variant of asynchronous SGD
      • a minibatch-based method
      • naive failover strategy

        for asynchronous SGD, if one machine in a model replica fails, the other model replicas continue processing their training data and updating the model parameters via the parameter servers.

    2. Sandblaster L-BFGS

      • a batch method
      • workload balancing

        The coordinator assigns each of the N model replicas a small portion of work, much smaller than 1/Nth of the total size of a batch, and assigns replicas new portions whenever they are free.

  • Two experiments:

    1. object recognition in still images
    2. acoustic processing for speech recognition.
  • Metrics:

    1. accuracy v.s. training time (Figure 4.) -- Downpour SGD + Adagrad reached the highest accuracy
    2. time to converge v.s. #computers -- Downpour SGD converges fastest w/ the minimal number of computers.

swamp test

swamp test
engine -- pytorch
resource-spec -- 1trainer
-- 1ps (1, 2, 4, 8)trainer
dataset -- mnist
net -- cnn

Design discussion

Motivation

Users want to write deep learning programs easily. To ease the writing of a certain kind of programs, tool developers would usually provide libraries to encapsulate common functionalities for calling. There are vaguely two types of libraries:

  1. the ones providing APIs, so that users can write their own main function and ease the programming by calling these APIs, and
  2. the ones providing frameworks, which usually includes the main function so that users implement certain functions to be called by the main function.

A typical framework easing the development of fault-tolerable large-scale offline data processing programs is MapReduce, where users only need to implement the map and the reduce functions.

Another framework that eases GUI programming on Windows is MFC, which provides the main function that runs the event loop so that users only need to specify GUI components and actions to be taking in response to end users' reactions to these components.

It would be great if we could have a framework for deep learning because most users only want to implement the forward pass and expect the framework to do the backward pass and (local/distributed) parameter updates.

Unfortunately, popular deep learning systems, including TensorFlow, PyTorch, Caffe2, PaddlePaddle, MxNet, provide APIs, but not frameworks.

Swamp: experiment with evolution method (selection, crossover)

Currently, worker supports pulling the model from PS with a probability. This is equivalent as the selection in evolution.
We can also add crossover, by combining the pulled model with the worker's current model (model average is the easiest choice), also with a probability.
Experiment with it to see if this will increase or decrease the training performance.

Docker image for end user

Docker image for end user

We are going to provide a Docker image to end user, who can run it locally or deploy it to K8S to launch ElasticDL system.

Usage

User can use command line like following to train a model.

docker run -it --rm -v $WORK_DIR:/work elasticdl:user python /elasticdl/launcher.py \
    --script=/work/mnist.py
    --class_name=MnistCNN
    --runner=thread
    --num_ps=2
    --num_worker=2
    --input=/data/mnist  

Content in docker image

  • tensorflow/tensorflow-1.12.0-py3
  • pytorch 1.0.0
  • ElasticDL's supporting python library
    • recordio
    • crc32c
    • snappy
  • ElastiDL's python library
  • Some prebuilt data in recordio format

build process

The dockerfiles/elasticdl directory keeps the predefine dockerfile, which has the definition, install commands, etc. The directory also has a copy of RecordIO wheel file.

Run build_docker.sh in the repo root directory, which will do the following:

  1. copy dockerfiles/elasticdlto a staging directory, e.g. /tmp/elasticdl
  2. copy preprocessed data to /tmp/elasticdl
  3. copy all content of python directory to /tmp/elasticdl
  4. run docker build -t elasticdl:user /tmp/elasticdl which builds the end user package.

Local master

Read local RecordIO file index, split into tasks, assign task to workers, keep track of status of each task.

Swamp Optimization

Swamp Optimization for Efficient Distributed Deep Learning

Deep learning is mostly about SGD. Synchronous distributed SGD is not fault-tolerable. Its asynchronous counterpart cannot guarantee convergence.

This proposal is about a new distributed learning algorithm. Instead of expending an algorithm developed for non-concurrent usages to a distributed version, we try to follow the first principle and think optimization a parallel work.

The Metaphor

A biological metaphor of distributed optimization is a swamp of bees looking for areas with a density of flowers. Usual optimization algorithms work like a bee flying around following the smell of sweet (the gradient). Intuitively, it should be more useful for many bees search collaboratively — as they cover a region while moving, whereas a single bee covers a point.

Let’s imagine that each thread/process is a bee. The location of the bee is the local copy of a model. Iteratively, the bee takes a smell, which is a mini batch, and move a little bit by updating the local model along the direction to a denser scent.

After every few steps, the bee checks if it’s at somewhere with more flowers. If so, it somehow radios its location to a leader bee, the parameter server, by pushing the local copy of the model, not the gradient as distributed SGD does. Alternatively, if the search result is not very useful, it returns to the leader by pulling the “global model” from the parameter server.

Such a metaphor ensures that bees have some extent of freedom to spread out and cover a region around the leader while keeping a reasonably close distance from the leader and maintains a collaborative swamp.

Some key points revealed by the metaphor include:

  • Processes periodically check its location using a mini batch as the dev dataset, but not the training dataset.
  • A process pushes its location to the parameter server, only if it's at an excellent position. It’s hard to say how good a location is by using only a mini batch as the dev dataset. So we might
  • Because processes push the local copy of models, but not the gradient, to the parameter server; the parameter server doesn't need to run an optimization algorithm but merely compute a weighted sum of incoming model copies -- the better and newer the more weight.

The Algorithms

The algorithm for each bee is as the following.

steps := 0
oldScore := - Inf
for minibatch := <- dataReader {
    if steps < freedom {
        update(localModel, minibatch)
        steps++
    } else {
        if newScore := eval(localModel, minibatch); newScore > oldScore {
            pushToParameterServer(localModel, newScore)
            oldScore = newScore
        }
        localModel = pullFromParameterServer()
        steps = 0
    }
}

The parameter server is an RPC server that implements the remote call of pushToParameterServer and pullFromParameterServer. There are many strategies we could use to merge the pushed local copies into the global model; a simple one for illustration the idea is as follows:

func pushToParameterServer(localModel, score) {
    models = append(models, localModel)
    scores = append(scores, score)
    weights = normalize(scores)
    globalModel = weightedSum(models, weights)
}
func pullFromParameterServer() Model {
    return globalModel
}

An exhaustive survey of distributed optimization algorithms for deep learning

paper year citations source Yi Li
The Google DistBelief paper 2012 1699 NIPS #115 #137
The efficient parameter server paper by 李沐 and 余凯 2014 157 NIPS #138
How to scale distributed deep learning? 2016 34 #147
The DC SGD paper 2016 16 #148
The CDSGD and CDMSGD paper 2017 15 NIPS #151
Toward Understanding the Impact of Staleness in Distributed Machine Learning 2017
The ChainerMN paper 2017 8 NIPS
The PQASGD paper 2018 NIPS
Demystifying Parallel and Distributed Deep Learning: An In-Depth Concurrency Analysis 2018 17
The GoSGD paper 2018
Google REVISITING DISTRIBUTED SYNCHRONOUS SGD 2017

Change PS to run in graph mode.

在multi-thread下面, 如果ps enable eager mode, 在同一个process下面,所有thread都是在eager mode下面了。 eager mode 可以run graph, 反之则不行。

ElasticFlow implementation tasks

ParameterServer

  1. RPC interface

    • service definition
    • SparseTensor support
  2. RPC implementation

    • TF/PyTorch tensor to.from Tensor proto bidirectional convertors
    • multi-threaded Server
    • Client
  3. PS implementation

    • Graph gradient update
    • large model support: model partition
    • large lookup table support (e.g. use redis)

Data API

  1. data fetcher: Need a unified way for feeding data.
    • ODPS support for TF, PyTorch
      • tf.data.DataSet and torch.utils.data.Dataset
    • Other data source?
  2. data sharding
    • sharding methods
    • shard tracking: re-queue shard when worker dies
      • use etcd?

Worker with PS client/data fetcher/etcd client

  1. PyTorch worker
  2. Estimator Worker
    • Current POC crashes when iteration number is high, need to debug first
  3. Other TF workers

Master

  • divide work, wait for workers to finish

Job launcher

  1. based on user code, decide PS and worker to launch
  2. local launcher (for testing)
  3. kubernetes launcher
    • on premise
    • AWS
    • GKE

Elasticity, fault tolerance support

  • periodic checkpoint/model saving (to file? redis?)
  • initial model loading (from file? redis? PS?)
  • PS scaling
    • partitioned model support

Kubernetes controller

  1. resource tracking
  2. scaling

Don't push/pull optimizer state_dict to/from parameter server

Ideally, we want to allow each trainer/bee to keep its own momentum. Suppose that two trainered both pulled from the parameter server, their "location" would be reset to where the parameter server/leader resides, we want them to have different momentum and other optimizer states to allow them going on from the leader's location to explore even better optima along different directions and with different momentum.

Learning rate adjustment in async-distributed training

Assume a model converges under 1 worker with learning rate lr. When switches to async-distributed training with n workers, the same learning rate lr might not work. This is because that n workers may get the same version of a model, and each computes a gradient and sends to ps. Ps will apply the n gradients one-by-one, this is equivalent of using n*lr as learning rate in sync-distributed training.

Dividing the learning rate by the number of workers is a way to ease this issue. Below is the test results of an epoch training with mnist data with different worker numbers (n=1, 2, 4, 8) and with/without learning rate scale (1/n). For example, 'w2_scale' means 2 workers with learning rate scale. 'w4_no_scale' means 4 workers without learning rate scale.

learning_rate = 0.01
batch_size = 64

figure_1

We can see with n=2,4,8, training accuracy remains 0.1 without learning rate scale.

With learning rate scale, n=2,4 converges, but n= 8 is not. This is another issue with async-distributed training as the number of worker increases. Some gradients are too old to the model version in ps, which will prevent the training accuracy improvement. One way to address this issue is to adjust the learning rate further for the old gradients or just drop them.

Below is the test result of staleness-aware learning rate adjustment.
We define
staleness = difference with the worker gradient version and ps version
In 1-worker scenario, staleness = 1
In 2-worker scenario, if worker 0 and 1 use the same version of the model, compute their gradients, and push the gradients to ps. The 1st applied gradient has staleness = 1, the second applied gradient has staleness = 2.
Learning rate lr is adjusted by staleness:
lr_used = lr / staleness

staleness_aware

Model converges with worker number n=1, 2, 4, 8, but does not with n=16.

Thus, we can see that async-distributed training might not suitable for cnn model with large number of workers. Probably that is why almost all distributed training for cnn model use synchronized strategy.

How trainers connect to parameter servers

class ParameterServerClient:
    pass

class LocalThreadParameterServerClient(ParameterServerClient):
    pass

def createParameterServerClient(*kwargs):
    if kwargs["..."] != None:
        return LocalThreadParameterServerClient(...)
    
class Worker:
    def __init__():
        self.ps = createParameterServerClient(...)

    assert(len(self.ps) == NUM_PS)
    assert(self.ps[0].PUsh(...))
    assert(self.ps[1].Pull(...))

    assert(type(self.ps[0]) == elasticflow.ParameterServerClient)

Add metrics to swamp/mnist.py

The DistBelief paper introduces two interesting metrics as described in #115 (comment):

  1. accuracy v.s. training time.
  2. time to convergence v.s. the number of computers.

We might think about adding these two plots to swamp/mnist.py

pull probability doesn't matter as before after changing multithreading to multiprocessing

I used the code commit right after the merging of #121 for the following experiment:

#!/bin/bash
for (( t = 1; t <= 64; t = t * 2 )); do
    for (( n = 1; n <= 10; n = n + 2 )); do
	p=0$(echo "$n / 10" | bc -l)
	f=loss-t$t-p$p.pdf
	echo "Generating $f ..."
	if [[ ! -f $f ]]; then
	    cmd="docker run --rm -i -v $PWD:/work -w /work swamp python mnist.py --loss-sample-interval 5 --trainer-number $t --pull-probability $p --loss-file $f"
	    echo Running $cmd
	    eval $cmd
	fi
    done
done

This experiment runs the code with --trainer-number=1,2,4,8,16,32,64 and --pull-probability=0.1,0.3,0.5,0.7,0.9. It takes a whole night to complete 35 training jobs on my personal powerful computer.

Then I ran the following ImageMagick command to tile the result pdf files:

montage loss*.pdf -background none -tile 5x7 -geometry +0+0 big.pdf

The tiled image is here: big.pdf. Please feel free to click the above link to the PDF file. From left to right, each column corresponds to pull probability p=0.1,0.3,0.5,0.7,0.9. From up to bottom, each row corresponds to trainer number t=1,2,4,8,16,32,64.

It looks to me that the pull probability doesn't affect the fluctuation of the loss curve as before. Is this because of the change of the pull mechanism into the use of multiprocessing.Manager.Value?

Parameter initialization design

Just record the discussion conclusion from @ywskycn @yuyicg @zou000 :

  • The initialization of model parameters happens on the parameter server.
  • Trainers pull the initialization version from the parameter server.

In the case that a job has one parameter server process

It just initialization

In the case that a job has multiple parameter server processes

Each process handles only part of the parameters, so no duplication on parameter initialization.

Project Goal: DL Framework

A DL framework that fits TensorFlow and PyTorch users' conventions.

The framework takes care locally and distributedly run.

Submit job

elasticflow --runner=k8s \
   --num_trainer=8 \
   --num_ps=2 \
   --input=/oss/yi/mnist* \
   --output_dir=/oss/yi/experiment \
   MyModel.py \
   --embedding_size=1000

where

  • elasticflow is a command-line tool,
  • MyModel.py includes a class derived from torch.nn.Module, tf.keras.Model, tf.Estimator, etc.

Similarly, to start a local job

elasticflow --runner=local \
    ....

PoC Combinations

We hope that our users can use the following ways to describe the forward pass

  1. TF graph mode

  2. Keras on TF graph mode

  3. Estimator on TF graph mode

  4. TF eager mode

  5. Keras on TF eager mode

  6. PyTorch low-level API

  7. PyTorch nn.Module

Improve the swamp/mnist example

To be more confident with the effectiveness of the swamp optimization algorithm, I feel it might be good to let each worker and the parameter server thread print

  1. worker id, or "ps" for the parameter server
  2. timestamp
  3. local cost

Then we can draw a plot with

  • x-axis represents the timestamp
  • y-axis represents the local cost
  • each curve corresponds to a worker thread or the parameter server thread

An example plot is something like the following

screen shot 2018-12-17 at 8 19 47 pm

where there are two trainers and one parameter server.

The plot might be done with gnuplot or any other tool of convenience.

With this plotting feature, it might be interested to understand more about the effectiveness measured in the speed of convergence with respect to the number of workers.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.