Giter Club home page Giter Club logo

dask-marathon's Introduction

Dask-Marathon ==========

Build Status

Deploy dask-worker processes on Marathon in response to load on a Dask scheduler. This creates a Marathon application of dask-worker processes. It watches a Dask Scheduler object in the local process and, based on current requested load, scales the Marathon application up and down.

This project is a proof of concept only. No guarantee of quality or future maintenance is made.

Run

It's not yet clear how to expose all of the necessary options to a command line interface. For now we're doing everything manually.

Make an IOLoop running in a separate thread:

from threading import Thread
from tornado.ioloop import IOLoop
loop = IOLoop()
thread = Thread(target=loop.start); thread.daemon = True
thread.start()

Start a Scheduler in that IOLoop

from distributed import Scheduler
s = Scheduler(loop=loop)
s.start()

Start a dask-marathon cluster. Give it the scheduler, information about the application, and the address of the Marathon master. This example assumes that dask-worker is available in the system environment, but see see marathon.MarathonApp for possible keyword args to define the application, including docker containers, etc..

from dask_marathon import AdaptiveCluster
cluster = AdaptiveCluster(s, marathon_address='http://localhost:8080',
                          cpus=1, mem=1000, executable='dask-worker',
                          **kwargs)

Create a Client and submit work to the scheduler. Marathon will scale workers up and down as neccessary in response to current workload.

from distributed import Client
c = Client(s.address)

future = c.submit(lambda x: x + 1, 10)

TODO

  • Deploy the scheduler on the cluster
  • Support a command line interface

Docker Testing Harness

This sets up a docker cluster of one Mesos master and two Mesos agents using docker-compose.

Requires:

  • docker version >= 1.11.1
  • docker-compose version >= 1.7.1
export DOCKER_IP=127.0.0.1
docker-compose up

Run py.test:

py.test dask-marathon

Additional notes

  • Master and Agents have dask.distributed installed its github repository
  • Mesos container names:
    • mesos_master
    • mesos_slave_one
    • mesos_slave_two

Web UIs

History

Mesos Docker-compose solution originally forked from https://github.com/bobrik/mesos-compose

This project was then forked from dask-mesos.

dask-marathon's People

Contributors

kszucs avatar mrocklin avatar quasiben avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

dask-marathon's Issues

How to properly expose Marathon Applications

The current solution in this repository is only really useful for testing. It uses the agent's standard environment. Most people I've spoken to use docker containers instead. What is the right way to expose the idiomatic use of docker containers for this project without tying ourselves to one particular solution favored by one particular company or lab? What is a good and generic way to help people specify docker containers of their choice while still ensuring that dask-worker is installed and runs?

cc @kszucs @hussainsultan

Workers erased from distributed scheduler before they're removed from Marathon

The way the Adaptive class' _retire_workers method works is:

  • it calls the scheduler's retire_workers method, which gives it a list of the addresses of the workers to terminate
  • it then calls the cluster's scale_down method with that list of workers

The problem with that is that inside the scheduler's retire_workers call, the workers are closed and erased from the scheduler's state. However, in the particular case of dask-marathon, the cluster's scale_down method needs to access the scheduler's worker_info dict, which contains the MESOS_URI that Marathon needs to refer to the running container and kill it. This yields error logs such as:

distributed.scheduler - INFO -   Scheduler at:    tcp://10.32.0.13:42805
distributed.scheduler - INFO -       bokeh at:           10.32.0.13:8787
INFO:marathon:Got response from http://172.16.0.5:8080
INFO:daskathon.core:Started marathon workers /daskathon-3e06-workers
INFO:root:Scheduler address: tcp://10.32.0.13:42805
distributed.core - DEBUG - Connection from 'tcp://10.32.0.13:47562' to Scheduler
distributed.core - DEBUG - Message from 'tcp://10.32.0.13:47562': {'op': 'register', 'ncores': 2, 'address': 'tcp://10.32.0.13:4583', 'keys': [], 'name': 'daskathon-3e06-workers.8e07ed66-b8a8-11e7-be28-1251e2c9cfa6', 'nbytes': {}, 'now': 1508841978.2218635, 'host_info': {'time': 1508841978.2218704, 'cpu': 52.6, 'memory': 7296647168, 'memory_percent': 15.4, 'network-send': 0, 'network-recv': 0, 'disk-read': 0, 'disk-write': 0}, 'services': {'nanny': 4584, 'http': 4585, 'bokeh': 4586}, 'memory_limit': 409600000.0, 'local_directory': 'worker-sfasiiho', 'resources': {}, 'pid': 13, 'reply': True}
distributed.core - DEBUG - Calling into handler add_worker
distributed.scheduler - INFO - Register tcp://10.32.0.13:4583
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.32.0.13:4583
distributed.scheduler - INFO - Closing worker tcp://10.32.0.13:4583
distributed.scheduler - INFO - Remove worker tcp://10.32.0.13:4583
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - DEBUG - Removed worker tcp://10.32.0.13:4583
distributed.core - DEBUG - Message from 'tcp://10.32.0.13:47562': {'op': 'unregister', 'address': 'tcp://10.32.0.13:4583', 'reply': True}
distributed.core - DEBUG - Calling into handler remove_worker
distributed.core - DEBUG - Message from 'tcp://10.32.0.13:47562': {'op': 'close', 'reply': False}
distributed.deploy.adaptive - INFO - Retiring workers ['tcp://10.32.0.13:4583']
distributed.utils - ERROR - 'tcp://10.32.0.13:4583'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 454, in log_errors
    yield
  File "/opt/conda/lib/python3.6/site-packages/distributed/deploy/adaptive.py", line 162, in _retire_workers
    f = self.cluster.scale_down(workers)
  File "/opt/conda/lib/python3.6/site-packages/daskathon/core.py", line 117, in scale_down
    self.scheduler.worker_info[worker]['name'],
KeyError: 'tcp://10.32.0.13:4583'
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f48ab9cb488>, <tornado.concurrent.Future object at 0x7f48aba16da0>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.6/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/deploy/adaptive.py", line 199, in _adapt
    yield self._retire_workers()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/distributed/deploy/adaptive.py", line 162, in _retire_workers
    f = self.cluster.scale_down(workers)
  File "/opt/conda/lib/python3.6/site-packages/daskathon/core.py", line 117, in scale_down
    self.scheduler.worker_info[worker]['name'],
KeyError: 'tcp://10.32.0.13:4583'

As you can see, that dict key is inaccessible by the time the cluster's scale_down method is called.

I'm thinking of a workaround where I store the needed information elsewhere, but is that the best way? I could also subclass stuff and have the workers list contain tuples with their names instead of only the address. Does this make sense? Or should the Adaptive class' behaviour be changed in itself?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.