Giter Club home page Giter Club logo

dask-drmaa's Introduction

Dask on DRMAA

This project is unmaintained. We recommended that you use dask-jobqueue instead: https://github.com/dask/dask-jobqueue

Build Status PyPI Release conda-forge Release

Deploy a Dask.distributed cluster on top of a cluster running a DRMAA-compliant job scheduler.

Example

Launch from Python

from dask_drmaa import DRMAACluster
cluster = DRMAACluster()

from dask.distributed import Client
client = Client(cluster)
cluster.start_workers(2)

>>> future = client.submit(lambda x: x + 1, 10)
>>> future.result()
11

Or launch from the command line:

$ dask-drmaa 10  # starts local scheduler and ten remote workers

Install

Python packages are available from PyPI and can be installed with pip:

pip install dask-drmaa

Also conda packages are available from conda-forge:

conda install -c conda-forge dask-drmaa

Additionally the package can be installed from GitHub with the latest changes:

pip install git+https://github.com/dask/dask-drmaa.git --upgrade

or:

git clone [email protected]:dask/dask-drmaa.git
cd dask-drmaa
pip install .

You must have the DRMAA system library installed and be able to submit jobs from your local machine. Please make sure to set the environment variable DRMAA_LIBRARY_PATH to point to the location of libdrmaa.so for your system.

Testing

This repository contains a Docker-compose testing harness for a Son of Grid Engine cluster with a master and two slaves. You can initialize this system as follows:

docker-compose build
./start-sge.sh

If you have done this previously and need to refresh your solution you can do the following

docker-compose stop
docker-compose build --no-cache
./start-sge.sh

And run tests with py.test in the master docker container

docker exec -it sge_master /bin/bash -c "cd /dask-drmaa; python setup.py develop"
docker exec -it sge_master /bin/bash -c "cd /dask-drmaa; py.test dask_drmaa --verbose"

Adaptive Load

Dask-drmaa can adapt to scheduler load, deploying more workers on the grid when it has more work, and cleaning up these workers when they are no longer necessary. This can simplify setup (you can just leave a cluster running) and it can reduce load on the cluster, making IT happy.

To enable this, call the adapt method of a DRMAACluster. You can submit computations to the cluster without ever explicitly creating workers.

from dask_drmaa import DRMAACluster
from dask.distributed import Client

cluster = DRMAACluster()
cluster.adapt()
client = Client(cluster)

futures = client.map(func, seq)  # workers will be created as necessary

Extensible

The DRMAA interface is the lowest common denominator among many different job schedulers like SGE, SLURM, LSF, Torque, and others. However, sometimes users need to specify parameters particular to their cluster, such as resource queues, wall times, memory constraints, etc..

DRMAA allows users to pass native specifications either when constructing the cluster or when starting new workers:

cluster = DRMAACluster(template={'nativeSpecification': '-l h_rt=01:00:00'})
# or
cluster.start_workers(10, nativeSpecification='-l h_rt=01:00:00')
  • DRMAA: The Distributed Resource Management Application API, a high level API for general use on traditional job schedulers
  • drmaa-python: The Python bindings for DRMAA
  • DaskSGE: An earlier dask-drmaa implementation
  • Son of Grid Engine: The default implementation used in testing
  • Dask.distributed: The actual distributed computing library this launches

dask-drmaa's People

Contributors

azjps avatar basnijholt avatar edurand avatar jakirkham avatar josephwagner avatar jsignell avatar lesteve avatar maxnoe avatar mivade avatar mrocklin avatar pitrou avatar quasiben avatar tomaugspurger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-drmaa's Issues

Munge authentication failure on SGE

I am having an issue with submitting tasks on SGE where is appears that cleanup_closed_workers callback is failing to execute correctly due to an authentication issue.

Job submission appears to work, as does job deletion at exit, just the polling from the callback fails.

In [1]: from dask_drmaa import DRMAACluster, Adaptive

In [2]: cluster = DRMAACluster()

In [3]: cluster.start_workers(1)

In [4]: tornado.application - ERROR - Exception in callback <bound method DRMAACluster.cleanup_closed_workers of <DRMAACluster: 1 workers>>
Traceback (most recent call last):
  File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/tornado/ioloop.py", line 1026, in _run
    return self.callback()
  File "/home/ac1mpt/repos/dask-drmaa/dask_drmaa/core.py", line 251, in cleanup_closed_workers
    if get_session().jobStatus(jid) in ('closed', 'done'):
  File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/drmaa/session.py", line 520, in jobStatus
    c(drmaa_job_ps, jobId, byref(status))
  File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/drmaa/helpers.py", line 303, in c
    return f(*(args + (error_buffer, sizeof(error_buffer))))
  File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/drmaa/errors.py", line 151, in error_check
    raise _ERRORS[code - 1](error_string)
drmaa.errors.DrmCommunicationException: code 2: MUNGE authentication failed: Invalid credential format

Launch Scheduler on remotely or on the local machine?

Do we want to launch the dask-scheduler process on the cluster or do we want to keep it local?

Keeping the scheduler local simplifies things, but makes it harder for multiple users to share the same scheduler.

Launching the scheduler on the cluster is quite doable, but we'll need to learn on which machine the scheduler launched. I know how to do this through the SGE interface, but it's not clear to me that it is exposed through DRMAA. See this stackoverflow question. Alternatively we could pass the location of the scheduler back through some other means. This could be through a file on NFS (do we want to assume the presence of a shared filesystem?) or by setting up a tiny TCP Server to which the scheduler connects with its information.

The current implementation just launches the scheduler on the user's machine. Barring suggestions to the contrary my intention is to move forward with this approach until there is an obvious issue.

SLURM won't replace $JOB_ID placeholder

Hi, I'm using SLURM 17.02.3.
In my environment, I noticed filenames of stdout and stderr are wrong (e.g., worker.$JOB_ID.err/out)

https://github.com/dask/dask-drmaa/blob/master/dask_drmaa/core.py#L42-L43

'outputPath': ':' + worker_out_path_template % dict(jid='$JOB_ID.$drmaa_incr_ph$', kind='out'),
'errorPath': ':' + worker_out_path_template % dict(jid='$JOB_ID.$drmaa_incr_ph$', kind='err'),

Solution

Instead of $JOB_ID, SLURM uses %j placeholder, please see https://slurm.schedmd.com/sbatch.html
jid='$JOB_ID.$drmaa_incr_ph$' should be jid='%j.$drmaa_incr_ph$' in slurm.
I confirmed that when I hard-cord it in slurm but I don't know how to determine slurm or any other grid engines.

test_stop_single_worker failing on CI

Noticed that test_stop_single_worker has started failing on CI. Seems to be a consistent failure. However it wasn't failing on the same code a month ago. So something else has changed (possibly in Distributed) related to how Dask worker directories are managed.

=================================== FAILURES ===================================
___________________________ test_stop_single_worker ____________________________
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f270bb33978>
    def test_stop_single_worker(loop):
        with DRMAACluster(scheduler_port=0) as cluster:
            with Client(cluster, loop=loop) as client:
                cluster.start_workers(2)
                future = client.submit(lambda x: x + 1, 1)
                assert future.result() == 2
                while len(client.ncores()) < 2:
                    sleep(0.1)
    
                a, b = cluster.workers
                local_dir = client.run(lambda dask_worker: dask_worker.local_dir,
                                       workers=[a])[a]
                assert os.path.exists(local_dir)
    
                cluster.stop_workers(a)
                start = time()
                while len(client.ncores()) != 1:
                    sleep(0.2)
                    assert time() < start + 60
>       assert not os.path.exists(local_dir)
E       AssertionError: assert not True
E        +  where True = <function exists at 0x7f27547ba950>('/dask-drmaa/dask-worker-space/worker-c7271lb9')
E        +    where <function exists at 0x7f27547ba950> = <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'>.exists
E        +      where <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'> = os.path
dask_drmaa/tests/test_core.py:128: AssertionError
=============== 1 failed, 23 passed, 1 xpassed in 120.47 seconds ===============

ref: https://travis-ci.org/dask/dask-drmaa/jobs/396654145#L2188-L2219
ref: https://travis-ci.org/dask/dask-drmaa/builds/385798967

Release?

Planning on doing a 0.2.0 release today. No breaking changes, but a newer version of distributed is required. So thought 0.2.0 was more appropriate. Thoughts?

cc @azjps @lesteve @mrocklin

Support Multi-core job resources

Currently when we create workers in SGE we add the memory used as a dask resource that can be tracked and used within constraints. We might consider doing the same for cores so that tasks that required many cores could be appropriately assigned to larger nodes. Alternatively, we could also allow for arbitrary resources to be passed, so allow people to handle this in their own way.

Start cluster over ssh

I work on a server with a Jupyterhub and have access to a pbs cluster, both machines have the same Python environments.

Right now I do the following (manual work):

  1. I start workers on the cluster with dask-drmaa 10
  2. I create a tunnel from the port at which the scheduler runs to the server on which I run my Juypter notebook with: ssh -NL 39408:localhost:39408 cluster
  3. In my Jupyter notebook I do the following:
import distributed
client = distributed.Client('localhost:39408')

It would be great if this could somehow be automated ๐Ÿ˜„

workers can't reach multi-homed scheduler

Using SGE and wanting to initialize dask fully from python.

My main node is dual-homed, with 1G and 10G interfaces. The 10G is the one that my SGE cluster uses.

from dask_drmaa import DRMAACluster
from dask.distributed import Client

In [9]: cluster = DRMAACluster(hostname='master-10g')
INFO:dask_drmaa.core:Start local scheduler at master-10g

In [10]: cluster.scheduler_address
Out[10]: 'tcp://10.22.150.194:37386' . # this is the master-1g IP, not the one I want 

Meanwhile, the workers are spinning trying to connect to the 1G IP:

tail worker.23523.1.err
distributed.worker - INFO - Trying to connect to scheduler: tcp://10.22.150.194:37386

Can this be extended to allow one to specify the scheduler interface / hostname / IP to give to the workers?

Work stealing is not happening

When the load on all workers is not evenly distributed, no work stealing is performed.

One way to provoke this, is to submit the jobs first and start the workers afterwards. Then the worker that starts first will get all jobs assigned and all other workers will be idle. However, this is also happening if the workers get started first and the execution time of the jobs differs very much.

In the web interface under /stealing, the idle and saturated workers are correctly displayed.

I wrote a little MWE:

import time
import dask.multiprocessing
from dask.distributed import Client
from dask_drmaa import SGECluster

def execute(sleep_time):
    import time
    
    time.sleep(sleep_time)
    return sleep_time

if __name__ == "__main__":
    times = [60, 60, 60, 60, 1, 1, 1, 1]
    lazy_results = [dask.delayed(execute)(sleep_time) for sleep_time in times]
    
    cluster = SGECluster()
    client = Client(cluster)
    cluster.start_workers(2)
    
    results = dask.compute(*lazy_results, get=client.get)
    print(results)
    
    cluster.close()

I don't know if this is really a bug in dask-drmaa or if it is in dask.distributed, but I submitted it here because I can't really reproduce this with LocalCluster.

I use a fully updated version of Anaconda with Python 3.6 on Linux. The packages are installed in the following versions:

dask                      0.16.0           py36h73d177f_0
dask-core                 0.16.0           py36ha827fd6_0
dask-drmaa                0.1.0                     <pip>
distributed               1.20.2                   py36_0

Happy holidays and thanks for your help. Please let me know if you need any further information.

Archive this repository

There hasn't been any activity here for a couple of year. Is this project still used? Has it been replaced with something like dask-jobqueue?

Should we archive it?

How to Handle Scheduler Specific Details

In responses to both #3 and #4 it seems like there are a number of useful options that are not explicitly handled by DRMAA. How should we handle this? Some thoughts:

  1. We pass through options so that people who understand their underlying system aren't throttled by what we provide natively
  2. We might consider making scheduler-specific subclasses and CLIs like dask-sge alongside dask-drmaa

worker_info dict has no key 'name'

On cluster.close(), this error is thrown on current master:

Traceback (most recent call last):
  File "/home/smmanoet/.local/lib/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/home/smmanoet/.local/lib/python3.6/site-packages/dask_drmaa/core.py", line 285, in stop_workers
    v['name']: k for k, v in self.scheduler.worker_info.items()
  File "/home/smmanoet/.local/lib/python3.6/site-packages/dask_drmaa/core.py", line 285, in <dictcomp>
    v['name']: k for k, v in self.scheduler.worker_info.items()
KeyError: 'name'

I looked at the entries of worker_info, they only have the keys

cpu
memory
time
read_bytes
write_bytes
num_fds
executing
in_memory
ready
in_flight

Closing all workers

In cases where there are lots of workers stop_workers is slow. This appears to be caused by closing each worker in a for-loop.

As the common case where multiple workers are submitted to be stopped is just submitting all workers to stop, it would be good to special case this and pass JOB_IDS_SESSION_ALL to DRMAA. This would bypass the need for the loop.

There are a few options:

  1. Detect if all workers were passed in (easy, maybe a little hacky)
  2. Make the worker_ids parameter optional and the default value equal to all workers (easy, straightforward)
  3. Integrate this into close (easy, hacky without 1 or 2)
  4. Background the task somehow using threads, dask.delayed, or similar (may need a few tricks depending)

Thoughts?

Amazon ECS?

How would one go about using this with say Amazon ECS?

distributed.utils.ignoring no longer exists - should be replaced with contextlib.suppress

What happened:

Using dask-drmaa with recent versions of dask.distributed fails with the following exception:

~/miniconda3/lib/python3.6/site-packages/dask_drmaa/__init__.py in <module>
----> 1 from .core import DRMAACluster, get_session
      2 from .sge import SGECluster
      3 from .adaptive import Adaptive
      4
      5 from ._version import get_versions

~/miniconda3/lib/python3.6/site-packages/dask_drmaa/core.py in <module>
     14 from distributed import LocalCluster
     15 from distributed.deploy import Cluster
---> 16 from distributed.utils import log_errors, ignoring
     17 from distributed.utils import PeriodicCallback
     18

ImportError: cannot import name 'ignoring'

What you expected to happen:

No exception.

Minimal Complete Verifiable Example:

Install dask.distributed 2.19.0 and dask-drmaa; then run the following code:

import dask_drmaa

Anything else we need to know?:

distributed.utils.ignoring has been removed from dask.distributed as of 5/20/2020; dask-drmaa should be updated to use contextlib.suppress instead.

Environment:

  • Dask version: 2.19.0
  • Python version: 3.6.10
  • Operating System: CentOS 6.10
  • Install method (conda, pip, source): conda

Where to find dask-worker

Currently we assume that the dask-worker executable is in the same absolute location on all slaves as it is on the client machine. I'm not sure what the standard assumption is. It seems that there are a few options:

  1. Use absolute location of client process
  2. Use dask-worker and trust PATH on the slaves to find the right location
  3. Use dask-worker and convey the local user environment to jobs
  4. ?

test_stop_workers_politely CI failure

Seeing this test failure on the CI.

Test failure snippet
_________________________ test_stop_workers_politely __________________________
self = <Client: not connected>
futures = [<Future: status: cancelled, type: int, key: int-5c8a950061aa331153f4a172bbcbfd1b>, <Future: status: cancelled, type: ...9541ea58b401f115b751e79eabbff>, <Future: status: cancelled, type: int, key: int-ce9a05dd6ec76c6a6d171b0c055f3127>, ...]
errors = 'raise', direct = False, local_worker = None
    @gen.coroutine
    def _gather(self, futures, errors='raise', direct=None, local_worker=None):
        futures2, keys = unpack_remotedata(futures, byte_keys=True)
        keys = [tokey(key) for key in keys]
        bad_data = dict()
    
        if direct is None:
            try:
                w = get_worker()
            except Exception:
                direct = False
            else:
                if w.scheduler.address == self.scheduler.address:
                    direct = True
    
        @gen.coroutine
        def wait(k):
            """ Want to stop the All(...) early if we find an error """
            st = self.futures[k]
            yield st.event.wait()
            if st.status != 'finished':
                raise AllExit()
    
        while True:
            logger.debug("Waiting on futures to clear before gather")
    
            with ignoring(AllExit):
                yield All([wait(key) for key in keys if key in self.futures])
    
            failed = ('error', 'cancelled')
    
            exceptions = set()
            bad_keys = set()
            for key in keys:
                if (key not in self.futures or
                        self.futures[key].status in failed):
                    exceptions.add(key)
                    if errors == 'raise':
                        try:
                            st = self.futures[key]
>                           exception = st.exception
E                           AttributeError: exception
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1233: AttributeError
During handling of the above exception, another exception occurred:
loop = <tornado.platform.epoll.EPollIOLoop object at 0x7f3505aedf28>
    def test_stop_workers_politely(loop):
        with DRMAACluster(scheduler_port=0) as cluster:
            with Client(cluster, loop=loop) as client:
                cluster.start_workers(2)
    
                while len(client.ncores()) < 2:
                    sleep(0.1)
    
                futures = client.scatter(list(range(10)))
    
                a, b = cluster.workers
                cluster.stop_workers(a)
    
>               data = client.gather(futures)
dask_drmaa/tests/test_core.py:131: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1364: in gather
    asynchronous=asynchronous)
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:547: in sync
    return sync(self.loop, func, *args, **kwargs)
/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py:270: in sync
    six.reraise(*error[0])
/opt/anaconda/lib/python3.6/site-packages/six.py:693: in reraise
    raise value
/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py:258: in f
    result[0] = yield make_coro()
/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py:1055: in run
    value = future.result()
/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py:238: in result
    raise_exc_info(self._exc_info)
<string>:4: in raise_exc_info
    ???
/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py:1063: in run
    yielded = self.gen.throw(*exc_info)
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1238: in _gather
    None)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tp = <class 'concurrent.futures._base.CancelledError'>, value = None, tb = None
    def reraise(tp, value, tb=None):
        try:
            if value is None:
                value = tp()
            if value.__traceback__ is not tb:
                raise value.with_traceback(tb)
>           raise value
E           concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
/opt/anaconda/lib/python3.6/site-packages/six.py:693: CancelledError
----------------------------- Captured stderr call -----------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f3505aef9b0> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
    **kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
    af, addr, stream = yield connector.start()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
    self.try_connect(iter(self.primary_addrs))
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
    future = self.connect(af, addr)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
    return stream.connect(addr)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
    self._add_io_state(self.io_loop.WRITE)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
    self.fileno(), self._handle_events, self._state)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
    self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f350c075da0> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1233, in _gather
    exception = st.exception
AttributeError: exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/dask-drmaa/dask_drmaa/tests/test_core.py", line 131, in test_stop_workers_politely
    data = client.gather(futures)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1364, in gather
    asynchronous=asynchronous)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 547, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 270, in sync
    six.reraise(*error[0])
  File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 258, in f
    result[0] = yield make_coro()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1238, in _gather
    None)
  File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/dask_drmaa-0.1.0-py3.6.egg/dask_drmaa/core.py", line 193, in stop_workers
    close_workers=True)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 1951, in retire_workers
    yield [self.close_worker(worker=w) for w in workers]
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 828, in callback
    result_list.append(f.result())
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 564, in close_worker
    yield r.terminate(report=False)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 453, in send_recv_from_rpc
    comm = yield self.live_comm()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 429, in live_comm
    connection_args=self.connection_args)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
    yielded = next(result)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 185, in connect
    quiet_exceptions=EnvironmentError)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 921, in with_timeout
    timeout, timeout_callback)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 505, in add_timeout
    callback, *args, **kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 921, in call_at
    heapq.heappush(self._timeouts, timeout)
TypeError: heap argument must be a list
--------------------------- Captured stderr teardown ---------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f350c09f080> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1233, in _gather
    exception = st.exception
AttributeError: exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 196, in __init__
    self.result = func()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 182, in <lambda>
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 254, in _wrapped_call
    return call_outcome.get_result()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 279, in get_result
    raise ex[1].with_traceback(ex[2])
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 265, in __init__
    self.result = func()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 112, in pytest_runtest_call
    item.runtest()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/python.py", line 1169, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 254, in _wrapped_call
    return call_outcome.get_result()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 279, in get_result
    raise ex[1].with_traceback(ex[2])
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 265, in __init__
    self.result = func()
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "/opt/anaconda/lib/python3.6/site-packages/_pytest/python.py", line 143, in pytest_pyfunc_call
    testfunction(**testargs)
  File "/dask-drmaa/dask_drmaa/tests/test_core.py", line 131, in test_stop_workers_politely
    data = client.gather(futures)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1364, in gather
    asynchronous=asynchronous)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 547, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 270, in sync
    six.reraise(*error[0])
  File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 258, in f
    result[0] = yield make_coro()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1238, in _gather
    None)
  File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
    **kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
    af, addr, stream = yield connector.start()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
    self.try_connect(iter(self.primary_addrs))
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
    future = self.connect(af, addr)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
    return stream.connect(addr)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
    self._add_io_state(self.io_loop.WRITE)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
    self.fileno(), self._handle_events, self._state)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
    self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object

ref: https://travis-ci.org/dask/dask-drmaa/jobs/296857549

very inefficient use of `dask-worker`s

I am running a calculation that has the following structure:

r = []
for i in range(5):
    syst = delayed(make_2d_scatter)(i)
    for j in range(3000):
        r.append(delayed(transmission)(syst, j))

res = client.compute(r)

make_2d_scatter is a fast function, so most time is spent in transmission.

I have started 250 workers on a PBS cluster with dask-drmaa 250.

Now I check the output of qstat -ea | grep dask-worker and it gives me the following output (I truncated it):

8568.hpc05.hpc          basnijholt  q1       dask-worker       21262   --     --     --  9999:00:0 R  00:55:23
8569.hpc05.hpc          basnijholt  q1       dask-worker       21292   --     --     --  9999:00:0 R  00:56:16
8570.hpc05.hpc          basnijholt  q1       dask-worker       21334   --     --     --  9999:00:0 R  00:47:05
8571.hpc05.hpc          basnijholt  q1       dask-worker       21403   --     --     --  9999:00:0 R  01:49:45
8572.hpc05.hpc          basnijholt  q1       dask-worker       21461   --     --     --  9999:00:0 R  01:06:57
8573.hpc05.hpc          basnijholt  q1       dask-worker       21518   --     --     --  9999:00:0 R  01:58:19
8574.hpc05.hpc          basnijholt  q1       dask-worker       21544   --     --     --  9999:00:0 R  01:44:40
8575.hpc05.hpc          basnijholt  q1       dask-worker       21593   --     --     --  9999:00:0 R  01:56:53
8576.hpc05.hpc          basnijholt  q1       dask-worker       21655   --     --     --  9999:00:0 R  01:28:54
8577.hpc05.hpc          basnijholt  q1       dask-worker       21704   --     --     --  9999:00:0 R  01:13:07
8578.hpc05.hpc          basnijholt  q1       dask-worker       21721   --     --     --  9999:00:0 R  00:53:04
8579.hpc05.hpc          basnijholt  q1       dask-worker       21751   --     --     --  9999:00:0 R  01:17:29
8580.hpc05.hpc          basnijholt  q1       dask-worker        8048   --     --     --  9999:00:0 R  01:29:38
8581.hpc05.hpc          basnijholt  q1       dask-worker        8079   --     --     --  9999:00:0 R  00:00:25
8582.hpc05.hpc          basnijholt  q1       dask-worker        8118   --     --     --  9999:00:0 R  00:49:05
8583.hpc05.hpc          basnijholt  q1       dask-worker        8194   --     --     --  9999:00:0 R  01:34:04
8584.hpc05.hpc          basnijholt  q1       dask-worker        8248   --     --     --  9999:00:0 R  01:04:20
8585.hpc05.hpc          basnijholt  q1       dask-worker        8300   --     --     --  9999:00:0 R  00:00:26
8586.hpc05.hpc          basnijholt  q1       dask-worker        8322   --     --     --  9999:00:0 R  01:07:27
8587.hpc05.hpc          basnijholt  q1       dask-worker        8369   --     --     --  9999:00:0 R  01:09:28
8588.hpc05.hpc          basnijholt  q1       dask-worker        8428   --     --     --  9999:00:0 R  00:54:45
8589.hpc05.hpc          basnijholt  q1       dask-worker        8507   --     --     --  9999:00:0 R  01:00:15
8590.hpc05.hpc          basnijholt  q1       dask-worker        8560   --     --     --  9999:00:0 R  00:40:28
8591.hpc05.hpc          basnijholt  q1       dask-worker        8606   --     --     --  9999:00:0 R  00:00:28
8592.hpc05.hpc          basnijholt  q1       dask-worker        8629   --     --     --  9999:00:0 R  00:15:26
8593.hpc05.hpc          basnijholt  q1       dask-worker        8691   --     --     --  9999:00:0 R  01:24:24
8594.hpc05.hpc          basnijholt  q1       dask-worker        8750   --     --     --  9999:00:0 R  02:07:52
8595.hpc05.hpc          basnijholt  q1       dask-worker        8818   --     --     --  9999:00:0 R  00:00:24
8596.hpc05.hpc          basnijholt  q1       dask-worker        8866   --     --     --  9999:00:0 R  00:00:29
8597.hpc05.hpc          basnijholt  q1       dask-worker        8903   --     --     --  9999:00:0 R  00:48:07
8598.hpc05.hpc          basnijholt  q1       dask-worker        8926   --     --     --  9999:00:0 R  00:00:26
8599.hpc05.hpc          basnijholt  q1       dask-worker        8971   --     --     --  9999:00:0 R  00:58:24
8600.hpc05.hpc          basnijholt  q1       dask-worker        1815   --     --     --  9999:00:0 R  00:00:35
8601.hpc05.hpc          basnijholt  q1       dask-worker        1844   --     --     --  9999:00:0 R  00:00:32
8602.hpc05.hpc          basnijholt  q1       dask-worker        1889   --     --     --  9999:00:0 R  00:00:34
8603.hpc05.hpc          basnijholt  q1       dask-worker        1963   --     --     --  9999:00:0 R  00:00:35
8604.hpc05.hpc          basnijholt  q1       dask-worker        2013   --     --     --  9999:00:0 R  00:07:14
8605.hpc05.hpc          basnijholt  q1       dask-worker        2058   --     --     --  9999:00:0 R  00:00:33
8606.hpc05.hpc          basnijholt  q1       dask-worker        2089   --     --     --  9999:00:0 R  00:00:33
8607.hpc05.hpc          basnijholt  q1       dask-worker        2176   --     --     --  9999:00:0 R  00:00:35
8608.hpc05.hpc          basnijholt  q1       dask-worker        2234   --     --     --  9999:00:0 R  00:00:36
8609.hpc05.hpc          basnijholt  q1       dask-worker        2300   --     --     --  9999:00:0 R  00:00:33
8610.hpc05.hpc          basnijholt  q1       dask-worker        2371   --     --     --  9999:00:0 R  00:07:42
8611.hpc05.hpc          basnijholt  q1       dask-worker        2438   --     --     --  9999:00:0 R  00:00:39
8612.hpc05.hpc          basnijholt  q1       dask-worker        2473   --     --     --  9999:00:0 R  00:00:35
8613.hpc05.hpc          basnijholt  q1       dask-worker        2539   --     --     --  9999:00:0 R  00:03:58
8614.hpc05.hpc          basnijholt  q1       dask-worker        2601   --     --     --  9999:00:0 R  00:00:39
8615.hpc05.hpc          basnijholt  q1       dask-worker        2668   --     --     --  9999:00:0 R  00:35:26
8616.hpc05.hpc          basnijholt  q1       dask-worker        2721   --     --     --  9999:00:0 R  00:00:32
8617.hpc05.hpc          basnijholt  q1       dask-worker        2761   --     --     --  9999:00:0 R  00:14:29
8618.hpc05.hpc          basnijholt  q1       dask-worker        2781   --     --     --  9999:00:0 R  00:00:35
8619.hpc05.hpc          basnijholt  q1       dask-worker        2824   --     --     --  9999:00:0 R  00:00:34
8620.hpc05.hpc          basnijholt  q1       dask-worker         310   --     --     --  9999:00:0 R  00:00:38
8621.hpc05.hpc          basnijholt  q1       dask-worker         350   --     --     --  9999:00:0 R  00:00:37
8622.hpc05.hpc          basnijholt  q1       dask-worker         401   --     --     --  9999:00:0 R  00:00:38
8623.hpc05.hpc          basnijholt  q1       dask-worker         473   --     --     --  9999:00:0 R  00:00:37
8624.hpc05.hpc          basnijholt  q1       dask-worker         526   --     --     --  9999:00:0 R  00:00:36
8625.hpc05.hpc          basnijholt  q1       dask-worker         572   --     --     --  9999:00:0 R  00:01:47
8626.hpc05.hpc          basnijholt  q1       dask-worker         607   --     --     --  9999:00:0 R  00:27:28
8627.hpc05.hpc          basnijholt  q1       dask-worker         663   --     --     --  9999:00:0 R  00:00:36
8628.hpc05.hpc          basnijholt  q1       dask-worker         733   --     --     --  9999:00:0 R  00:00:36
8629.hpc05.hpc          basnijholt  q1       dask-worker         815   --     --     --  9999:00:0 R  00:00:36
8630.hpc05.hpc          basnijholt  q1       dask-worker         867   --     --     --  9999:00:0 R  00:00:37
8631.hpc05.hpc          basnijholt  q1       dask-worker         918   --     --     --  9999:00:0 R  00:00:37
8632.hpc05.hpc          basnijholt  q1       dask-worker         942   --     --     --  9999:00:0 R  00:00:36
8633.hpc05.hpc          basnijholt  q1       dask-worker        1001   --     --     --  9999:00:0 R  00:00:36
8634.hpc05.hpc          basnijholt  q1       dask-worker        1184   --     --     --  9999:00:0 R  00:00:39
8635.hpc05.hpc          basnijholt  q1       dask-worker        1065   --     --     --  9999:00:0 R  00:00:36
8636.hpc05.hpc          basnijholt  q1       dask-worker        1132   --     --     --  9999:00:0 R  00:37:28
8637.hpc05.hpc          basnijholt  q1       dask-worker        1220   --     --     --  9999:00:0 R  00:00:36
8638.hpc05.hpc          basnijholt  q1       dask-worker        1248   --     --     --  9999:00:0 R  00:22:40
8639.hpc05.hpc          basnijholt  q1       dask-worker        1290   --     --     --  9999:00:0 R  00:00:39
8640.hpc05.hpc          basnijholt  q1       dask-worker       34496   --     --     --  9999:00:0 R  00:00:34

In the last column you see the time that that core has actually been doing stuff. You see that some cores have been running code for more than an hour and some for only half a minute.

Am I doing something wrong, or is dask not dividing the tasks very well?

Wall time

My understanding is that job schedulers tend to schedule jobs based on their advertised wall times. Allocations of small short-running jobs can squeeze in to the cluster sooner than many long-running jobs.

Dask workers can be fairly flexible here. We can add and remove many single-node jobs frequently, handing data off from about-to-expire workers to workers with a long time to live. It's unclear to me how much value there is here or if this is something we should focus on.

The default setting for drmaa-python seems to be to totally ignore walltime settings. Is this common? Perhaps DRMAA-style clusters are often underutilized so this is not a big issue?

New release?

I think with the fixes in #74 , #78 and #75 a version 2.1 could be released, what do yo think?

Unable to start SGE workers via docker-compose on Windows 8.1

I'm able trouble reproducing the unit tests based on the given instructions, for Windows. I've done the following:

  • Replaced all carriage returns in all *.sh files in the top-level directory of this project.
  • Run docker-compose build --no-cache

When I run ./start-sge.sh I see the output Waiting for SGE slots to become available after an indefinite amount of waiting:

$ ./start-sge.sh
sge_master is up-to-date
slave_two is up-to-date
slave_one is up-to-date
Waiting for SGE slots to become available
# output of docker exec -it sge_master qhost;
HOSTNAME                ARCH         NCPU  LOAD  MEMTOT  MEMUSE  SWAPTO  SWAPUS
-------------------------------------------------------------------------------
global                  -               -     -       -       -       -       -
# and repeats

# docker contains are running
$ docker ps
CONTAINER ID        IMAGE                 COMMAND                  CREATED
       STATUS              PORTS                     NAMES
efa6eebce34a        daskdrmaa_slave-one   "bash /run-slave.sh"     About an hour
 ago   Up About an hour                              slave_one
9fae83c48356        daskdrmaa_slave-two   "bash /run-slave.sh"     About an hour
 ago   Up About an hour                              slave_two
b13940296988        daskdrmaa_master      "bash -x /run-master"   About an hour
ago   Up About an hour    6444-6446/tcp, 8000/tcp   sge_master

# example of output when I try to run unit tests
$ docker exec -it sge_master /bin/bash -c "cd /dask-drmaa; py.test dask_drmaa -sv -k test_adaptive_memory"
============================= test session starts ==============================
platform linux -- Python 3.6.4, pytest-3.4.2, py-1.5.2, pluggy-0.6.0 -- /opt/ana
conda/bin/python
cachedir: .pytest_cache
rootdir: /dask-drmaa, inifile:
collected 23 items

dask_drmaa/tests/test_adaptive.py::test_adaptive_memory

distributed.utils - ERROR - code 17: denied: host "sge_master" is no submit host
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 623, in log_errors
    yield
  File "/dask-drmaa/dask_drmaa/core.py", line 203, in start_workers
    ids = get_session().runBulkJobs(jt, 1, n, 1)
  File "/opt/anaconda/lib/python3.6/site-packages/drmaa/session.py", line 340, in runBulkJobs
    return list(run_bulk_job(jobTemplate, beginIndex, endIndex, step))
  File "/opt/anaconda/lib/python3.6/site-packages/drmaa/helpers.py", line 286, in run_bulk_job
    c(drmaa_run_bulk_jobs, jids, jt, start, end, incr)
  File "/opt/anaconda/lib/python3.6/site-packages/drmaa/helpers.py", line 303, in c
    return f(*(args + (error_buffer, sizeof(error_buffer))))
  File "/opt/anaconda/lib/python3.6/site-packages/drmaa/errors.py", line 151, in error_check
    raise _ERRORS[code - 1](error_string)
drmaa.errors.DeniedByDrmException: code 17: denied: host "sge_master" is no submit host

Unfortunately I have no familiarity with SGE. Any suggestions as to how I can debug/fix this? Apologies if this is not appropriate for this issue tracker.

CI failure in test_stop_single_worker on Python 3

Ran into an issue in test_stop_single_worker with the Python 3 where the worker failed to clean up its scratch space directory. More details in this build log. Snippet with the test failure also included below. The same failure did not occur on Python 2.

=================================== FAILURES ===================================
___________________________ test_stop_single_worker ____________________________
loop = <tornado.platform.epoll.EPollIOLoop object at 0x7f260a3a22b0>
    def test_stop_single_worker(loop):
        with DRMAACluster(scheduler_port=0) as cluster:
            with Client(cluster, loop=loop) as client:
                cluster.start_workers(2)
                future = client.submit(lambda x: x + 1, 1)
                assert future.result() == 2
                while len(client.ncores()) < 2:
                    sleep(0.1)
    
                a, b = cluster.workers
                local_dir = client.run(lambda dask_worker: dask_worker.local_dir,
                                       workers=[a])[a]
                assert os.path.exists(local_dir)
    
                cluster.stop_workers(a)
                start = time()
                while len(client.ncores()) != 1:
                    sleep(0.2)
                    assert time() < start + 60
>       assert not os.path.exists(local_dir)
E       AssertionError: assert not True
E        +  where True = <function exists at 0x7f262a9ae950>('/dask-drmaa/dask-worker-space/worker-w5gj31no')
E        +    where <function exists at 0x7f262a9ae950> = <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'>.exists
E        +      where <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'> = os.path
dask_drmaa/tests/test_core.py:117: AssertionError
========== 1 failed, 20 passed, 1 skipped, 1 xfailed in 96.55 seconds ==========

Refresh DRMAACluster docstring

Seems like there is a bit of a mismatch between the arguments to DRMAACluster's constructor and those listed in its docstring. Would be good to refresh the docstring to match the currently accepted arguments.

How to limit number of threads for a DRMAACluster worker?

Hello,

When starting workers on a DRMAACluster, they always use many threads

distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          28
distributed.worker - INFO -                Memory:                    280GB

The actual number of threads differs from one worker to the next.
Is there a way to limit workers to one thread? Using so many threads causes my jobs to exceed memory limits and die.

Thanks!

CLI test failure

I'm trying to run the tests locally (mimicking steps in the Travis-Ci configuration file), but I get this error:

$ docker exec -it sge_master /bin/bash -c "cd /dask-drmaa; py.test dask_drmaa/cli -vs --tb=native"
========================================================== test session starts ==========================================================
platform linux -- Python 3.6.0, pytest-3.0.7, py-1.4.33, pluggy-0.4.0 -- /opt/anaconda/bin/python
cachedir: .cache
rootdir: /dask-drmaa, inifile:
collected 1 items 

dask_drmaa/cli/tests/test_dask_drmaa.py::test_dask_drmaa distributed.utils - ERROR - Timed out trying to connect to 'tcp://127.0.0.1:8786' after 5 s: ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 169, in connect
    quiet_exceptions=EnvironmentError)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
tornado.gen.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
    result[0] = yield make_coro()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 518, in _start
    yield self._ensure_connected(timeout=timeout)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 550, in _ensure_connected
    connection_args=self.connection_args)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 178, in connect
    _raise(error)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 161, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:8786' after 5 s: ConnectionRefusedError: [Errno 111] Connection refused


Print from stderr
=================
b'distributed.scheduler - INFO -   Scheduler at:    tcp://172.18.0.2:32890\n'b'distributed.scheduler - INFO -       bokeh at:              0.0.0.0:8787\n'b'distributed.scheduler - INFO - Scheduler closing...\n'b'distributed.scheduler - INFO - Scheduler closing all comms\n'

Print from stdout
=================
FAILED

Judging by the stderr, it seems expected that connecting to tcp://127.0.0.1:8786 fails if the scheduler is actually listening on tcp://172.18.0.2:32890 (i.e. on a different IP and port)...

Pass through dask-worker configuration options

The dask-worker process can be configured in a variety of ways:

  • How many cores should it use?
  • With what mix of threads and processes should it use those cores?
  • How much RAM should it use? What fraction of available RAM should it occupy before spilling to disk?
  • Should it serve HTTP diagnostics on a particular port
  • etc..

We can pass these through fairly easily through the DRMAACluster.__init__ or start_workers methods somewhat trivially.

What is less trivial is if we want to control the number of CPUs or memory used by the DRMAA job and pair this with the dask-worker configuration. I'm somewhat concerned that DRMAA expects us to take up one CPU by default when we're impolitely taking up all of the CPUs on the host. It would be nice to either request a certain host configuration from the job scheduler or else observe what our expected budget is after our job is allocated.

Tracking DRMAA Job IDs in Dask Scheduler

When DRMAA launches a worker it gives us a job id like 34.1. This starts a worker on a random port which registers with the scheduler. Now each worker has two identifiers:

  1. A DRMAA job id like 34.1
  2. A Dask ip:port address like 192.168.0.1:37482

The Dask scheduler knows when it should clean up a worker. It goes ahead and terminates the worker remotely. However, despite the process terminating, the DRMAA job appears to continue to run. This causes some confusion, especially when trying to understand how many workers we have in flight when determining if we should scale down the cluster.

I see two solutions here:

  1. Somehow ensure that DRMAA jobs finish when their process finishes
  2. Maintain a mapping between DRMAA Job IDs and Dask ip:port worker addresses

The second option would be useful generally. We can do it by passing the Job ID as the worker's name/alias

dask-worker scheduler-address:8786 --name 34.1

Ideally we would use a job-scheduler-provided environment variable here

dask-worker scheduler-address:8786 --name $JOBID
dask-worker scheduler-address:8786 --name $drmaa_incr_ph$
...

However, it appears that environment variables can not be used within the args, but only with a batch script. Currently we specify a job template by pointing to the dask-worker process directly

wt = get_session().createJobTemplate()
wt.jobName = 'dask-drmaa'
wt.remoteCommand = 'dask-worker
wt.args = [scheduler_address, '--name', '$JOBID']
wt.outputPath = ...
wt.errorPath = ...

However, as stated above using environment variables with args seems to not work with DRMAA. Instead, it is recommended to use environment variables within scripts

# worker script
dask-worker $1 --name $JOBID $@
wt = get_session().createJobTemplate()
wt.jobName = 'dask-drmaa'
wt.remoteCommand = 'my-worker-script
wt.args = [scheduler_address]
wt.outputPath = ...
wt.errorPath = ...

I tried this out and didn't have much success. I suspect that I'm missing something simple. Additionally it would be good have different output paths for different jobs. Currently they all dump to the same file.

There is an xfailed test in test_core.py that checks for the correct job names as worker names. If anyone more familiar with DRMAA can help to make this test pass I would be grateful.

@pytest.mark.xfail(reason="Can't use job name environment variable as arg")
def test_job_name_as_name(loop):
    with DRMAACluster(scheduler_port=0) as cluster:
        cluster.start_workers(2)
        while len(cluster.scheduler.workers) < 1:
            sleep(0.1)
            names = {cluster.scheduler.worker_info[w]['name']
                     for w in cluster.scheduler.workers}

            assert names == set(cluster.workers)

cc @davidr

Does not work with user install of dask

This fails with a pip install --user installation, because then dask worker does not reside in

worker_bin_path = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')

But in .local/bin

Deployment Policies

Currently deployment is handled by manually scaling the cluster up and down

jobids = cluster.start_workers(10)
cluster.stop_workers(jobids)

This system is straightforward and effective, but there are other options that we could consider, such as adaptively scaling up or down based on load or on memory needs.

There was some preliminary work done a while ago in this direction described here: http://matthewrocklin.com/blog/work/2016/09/22/cluster-deployments

Releasing?

Any thoughts on getting this tagged, on to PyPI, and on to conda-forge? Is this stable enough at this point to do those things or are there other things needed to be done first?

Add start_worker/stop_worker

For compatibility with Distributed's LocalCluster, it would be nice to have start_worker and stop_worker functions for DRMAACluster.

'Client' object has no attribute 'start_workers'

In the README the following is written

from dask_drmaa import DRMAACluster
cluster = DRMAACluster()

from dask.distributed import Client
client = Client(cluster)
client.start_workers(2)

However, this leads to: AttributeError: 'Client' object has no attribute 'start_workers'

What does work is:

from dask_drmaa import DRMAACluster
cluster = DRMAACluster()
cluster.start_workers(10) 

Don't know if this is incorrect in the README or a bug. I use PBS.

test_dont_request_on_many_short_tasks CI failure

Seeing this test failure on the CI.

Test failure snippet
____________________ test_dont_request_on_many_short_tasks _____________________
loop = <tornado.platform.epoll.EPollIOLoop object at 0x7f8ba7153b00>
    def test_dont_request_on_many_short_tasks(loop):
        with SGECluster(scheduler_port=0) as cluster:
            adapt = Adaptive(cluster, interval=50, startup_cost=10)
            with Client(cluster, loop=loop) as client:
                cluster.scheduler.task_duration['slowinc'] = 0.001
                futures = client.map(slowinc, range(1000), delay=0.001)
    
                while not cluster.scheduler.workers:
                    sleep(0.01)
    
                for i in range(20):
                    sleep(0.1)
>                   assert len(cluster.workers) < 2
E                   AssertionError: assert 2 < 2
E                    +  where 2 = len({'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')})
E                    +    where {'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')} = <SGECluster: 2 workers>.workers
dask_drmaa/tests/test_adaptive.py:105: AssertionError
----------------------------- Captured stderr call -----------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f8ba715ac88> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
    **kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
    af, addr, stream = yield connector.start()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
    self.try_connect(iter(self.primary_addrs))
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
    future = self.connect(af, addr)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
    return stream.connect(addr)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
    self._add_io_state(self.io_loop.WRITE)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
    self.fileno(), self._handle_events, self._state)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
    self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f8ba6a460f0> exception was never retrieved: Traceback (most recent call last):
  File "/dask-drmaa/dask_drmaa/tests/test_adaptive.py", line 105, in test_dont_request_on_many_short_tasks
    assert len(cluster.workers) < 2
AssertionError: assert 2 < 2
 +  where 2 = len({'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')})
 +    where {'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')} = <SGECluster: 2 workers>.workers
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/dask_drmaa-0.1.0-py3.6.egg/dask_drmaa/core.py", line 193, in stop_workers
    close_workers=True)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 1951, in retire_workers
    yield [self.close_worker(worker=w) for w in workers]
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 828, in callback
    result_list.append(f.result())
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 564, in close_worker
    yield r.terminate(report=False)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 453, in send_recv_from_rpc
    comm = yield self.live_comm()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 429, in live_comm
    connection_args=self.connection_args)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
    yielded = next(result)
  File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 185, in connect
    quiet_exceptions=EnvironmentError)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 921, in with_timeout
    timeout, timeout_callback)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 505, in add_timeout
    callback, *args, **kwargs)
  File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 921, in call_at
    heapq.heappush(self._timeouts, timeout)
TypeError: heap argument must be a list

ref: https://travis-ci.org/dask/dask-drmaa/jobs/296660993

Make resource constraints first class citizens

Various schedulers have different ways of specifying resource limits for jobs (e.g. memory, CPU, etc.). It would be nice to explore making these first class citizens in dask-drmaa. As DRMAA doesn't really have a way to specify this itself, it would amount to detecting the DRMAA implementation used and passing along the right flags to the nativeSpecification parameter of the JobTemplate. That said, we have already had some success with this strategy w.r.t. log files as demonstrated by PR ( #56 ). So should be pretty reasonable to extend the strategy to this case as well.

ref: https://www.ibm.com/support/knowledgecenter/en/SSWRJV_10.1.0/lsf_admin/resource_usage_limits_supported.html
ref: https://slurm.schedmd.com/sbatch.html
ref: http://gridscheduler.sourceforge.net/htmlman/htmlman5/queue_conf.html
ref: http://apps.man.poznan.pl/trac/slurm-drmaa#Nativespecification

Give absolute path to worker script

Starting workers fails (at least on slurm) if the template['workingDirectory'] is not the current working directory because:

/var/spool/slurm/d/job1177731/slurm_script: line 2: ./dask-worker-scriptz8gmf_cpsh: No such file or directory

Allow setting ip of scheduler

Our hpc cluster has several subnets, dask-drmaa uses the ip address of the outside network, that is only used to connect to the gateways, for the scheduler, as a result, workers cannot connect.

SGECluster missing docstrings

I would like SGECluster.start_workers to have a docstring. I'm happy to submit a PR for this. Any warnings or advice before I get started?

Here is my current understanding of the meaning of each argument:

  • nativeSpecification: Options native to the job scheduler
  • cpus: number of threads per dask worker
  • memory: number of bytes of RAM per dask worker
  • memory_fraction: fraction of memory used before dask workers begin to cache data to disk

Modify workerTemplate for particular submissions

Currently we create a single job template to create all workers.

class DRMAACluster(object):
    def __init__(self, **kwargs):
        self.local_cluster = LocalCluster(n_workers=0, **kwargs)
        self.session = drmaa.Session()
        self.session.initialize()

        self.worker_template = self.session.createJobTemplate()
        self.worker_template.remoteCommand = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')
        self.worker_template.jobName = 'dask-worker'
        self.worker_template.args = ['%s:%d' % (socket.gethostname(), self.local_cluster.scheduler.port)]
        self.worker_template.outputPath = ':/%s/out' % os.getcwd()
        self.worker_template.errorPath = ':/%s/err' % os.getcwd()
        self.worker_template.workingDirectory = os.getcwd()

        self.workers = set()

This job template is easily accessible and so provides a nice and familiar release valve for expert users of python-drmaa who want to customize their setup.

However as we submit slightly different kinds of worker jobs we'll want to modify this template a bit, by specifying extra native specifications and by adding extra keywords to the dask-worker command.

def start_workers(self, n=1, .. want to add extra arguments here ...):
    and safely create new worker template here for submission

I'm unable to find a way to copy-and-append an existing job template, which leads me to instead consider storing all of the job template attributes (args, native spec, command, error path, etc.) bare, instead of within a job template object. Then we'll create a new job template for each call to start_workers.

class DRMAACluster(object):
    def __init__(self, **kwargs):
        self.local_cluster = LocalCluster(n_workers=0, **kwargs)
        self.session = drmaa.Session()
        self.session.initialize()

        # self.worker_template = self.session.createJobTemplate()
        self.remoteCommand = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')
        self.jobName = 'dask-worker'
        self.args = ['%s:%d' % (socket.gethostname(), self.local_cluster.scheduler.port)]
        self.outputPath = ':/%s/out' % os.getcwd()
        self.errorPath = ':/%s/err' % os.getcwd()
        self.workingDirectory = os.getcwd()

        self.workers = set()

    def start_workers(self, args, ...)
        wt = self.session.createJobTemplate()
        wt.args = self.args + args
        ...

Does anyone see a better way?

cc @davidr

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.