dask / dask-drmaa Goto Github PK
View Code? Open in Web Editor NEWDeploy Dask on DRMAA clusters
License: BSD 3-Clause "New" or "Revised" License
Deploy Dask on DRMAA clusters
License: BSD 3-Clause "New" or "Revised" License
I am having an issue with submitting tasks on SGE where is appears that cleanup_closed_workers callback is failing to execute correctly due to an authentication issue.
Job submission appears to work, as does job deletion at exit, just the polling from the callback fails.
In [1]: from dask_drmaa import DRMAACluster, Adaptive
In [2]: cluster = DRMAACluster()
In [3]: cluster.start_workers(1)
In [4]: tornado.application - ERROR - Exception in callback <bound method DRMAACluster.cleanup_closed_workers of <DRMAACluster: 1 workers>>
Traceback (most recent call last):
File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/tornado/ioloop.py", line 1026, in _run
return self.callback()
File "/home/ac1mpt/repos/dask-drmaa/dask_drmaa/core.py", line 251, in cleanup_closed_workers
if get_session().jobStatus(jid) in ('closed', 'done'):
File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/drmaa/session.py", line 520, in jobStatus
c(drmaa_job_ps, jobId, byref(status))
File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/drmaa/helpers.py", line 303, in c
return f(*(args + (error_buffer, sizeof(error_buffer))))
File "/home/ac1mpt/.conda/envs/condagpu/lib/python3.6/site-packages/drmaa/errors.py", line 151, in error_check
raise _ERRORS[code - 1](error_string)
drmaa.errors.DrmCommunicationException: code 2: MUNGE authentication failed: Invalid credential format
What happened:
Using dask-drmaa with recent versions of dask.distributed fails with the following exception:
~/miniconda3/lib/python3.6/site-packages/dask_drmaa/__init__.py in <module>
----> 1 from .core import DRMAACluster, get_session
2 from .sge import SGECluster
3 from .adaptive import Adaptive
4
5 from ._version import get_versions
~/miniconda3/lib/python3.6/site-packages/dask_drmaa/core.py in <module>
14 from distributed import LocalCluster
15 from distributed.deploy import Cluster
---> 16 from distributed.utils import log_errors, ignoring
17 from distributed.utils import PeriodicCallback
18
ImportError: cannot import name 'ignoring'
What you expected to happen:
No exception.
Minimal Complete Verifiable Example:
Install dask.distributed 2.19.0 and dask-drmaa; then run the following code:
import dask_drmaa
Anything else we need to know?:
distributed.utils.ignoring
has been removed from dask.distributed as of 5/20/2020; dask-drmaa should be updated to use contextlib.suppress
instead.
Environment:
On cluster.close()
, this error is thrown on current master:
Traceback (most recent call last):
File "/home/smmanoet/.local/lib/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
yielded = next(result)
File "/home/smmanoet/.local/lib/python3.6/site-packages/dask_drmaa/core.py", line 285, in stop_workers
v['name']: k for k, v in self.scheduler.worker_info.items()
File "/home/smmanoet/.local/lib/python3.6/site-packages/dask_drmaa/core.py", line 285, in <dictcomp>
v['name']: k for k, v in self.scheduler.worker_info.items()
KeyError: 'name'
I looked at the entries of worker_info
, they only have the keys
cpu
memory
time
read_bytes
write_bytes
num_fds
executing
in_memory
ready
in_flight
Any thoughts on getting this tagged, on to PyPI, and on to conda-forge? Is this stable enough at this point to do those things or are there other things needed to be done first?
In the README
the following is written
from dask_drmaa import DRMAACluster
cluster = DRMAACluster()
from dask.distributed import Client
client = Client(cluster)
client.start_workers(2)
However, this leads to: AttributeError: 'Client' object has no attribute 'start_workers'
What does work is:
from dask_drmaa import DRMAACluster
cluster = DRMAACluster()
cluster.start_workers(10)
Don't know if this is incorrect in the README
or a bug. I use PBS.
Do we want to launch the dask-scheduler
process on the cluster or do we want to keep it local?
Keeping the scheduler local simplifies things, but makes it harder for multiple users to share the same scheduler.
Launching the scheduler on the cluster is quite doable, but we'll need to learn on which machine the scheduler launched. I know how to do this through the SGE interface, but it's not clear to me that it is exposed through DRMAA. See this stackoverflow question. Alternatively we could pass the location of the scheduler back through some other means. This could be through a file on NFS (do we want to assume the presence of a shared filesystem?) or by setting up a tiny TCP Server to which the scheduler connects with its information.
The current implementation just launches the scheduler on the user's machine. Barring suggestions to the contrary my intention is to move forward with this approach until there is an obvious issue.
This fails with a pip install --user
installation, because then dask worker does not reside in
worker_bin_path = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')
But in .local/bin
In responses to both #3 and #4 it seems like there are a number of useful options that are not explicitly handled by DRMAA. How should we handle this? Some thoughts:
dask-sge
alongside dask-drmaa
Starting workers fails (at least on slurm) if the template['workingDirectory']
is not the current working directory because:
/var/spool/slurm/d/job1177731/slurm_script: line 2: ./dask-worker-scriptz8gmf_cpsh: No such file or directory
I work on a server with a Jupyterhub and have access to a pbs cluster, both machines have the same Python environments.
Right now I do the following (manual work):
dask-drmaa 10
ssh -NL 39408:localhost:39408 cluster
import distributed
client = distributed.Client('localhost:39408')
It would be great if this could somehow be automated ๐
The dask-worker
process can be configured in a variety of ways:
We can pass these through fairly easily through the DRMAACluster.__init__
or start_workers
methods somewhat trivially.
What is less trivial is if we want to control the number of CPUs or memory used by the DRMAA job and pair this with the dask-worker configuration. I'm somewhat concerned that DRMAA expects us to take up one CPU by default when we're impolitely taking up all of the CPUs on the host. It would be nice to either request a certain host configuration from the job scheduler or else observe what our expected budget is after our job is allocated.
Seems like there is a bit of a mismatch between the arguments to DRMAACluster
's constructor and those listed in its docstring. Would be good to refresh the docstring to match the currently accepted arguments.
Currently our _retire_workers
implementation implementation deviates from Distributed's _retire_workers
implementation. Would be nice if we could bring them back into correspondence and/or eliminate our copy of _retire_workers
entirely in favor of the Adaptive
base class implementation.
ref: #63 (review)
Currently we assume that the dask-worker
executable is in the same absolute location on all slaves as it is on the client machine. I'm not sure what the standard assumption is. It seems that there are a few options:
dask-worker
and trust PATH on the slaves to find the right locationdask-worker
and convey the local user environment to jobsI am running a calculation that has the following structure:
r = []
for i in range(5):
syst = delayed(make_2d_scatter)(i)
for j in range(3000):
r.append(delayed(transmission)(syst, j))
res = client.compute(r)
make_2d_scatter
is a fast function, so most time is spent in transmission
.
I have started 250 workers on a PBS cluster with dask-drmaa 250
.
Now I check the output of qstat -ea | grep dask-worker
and it gives me the following output (I truncated it):
8568.hpc05.hpc basnijholt q1 dask-worker 21262 -- -- -- 9999:00:0 R 00:55:23
8569.hpc05.hpc basnijholt q1 dask-worker 21292 -- -- -- 9999:00:0 R 00:56:16
8570.hpc05.hpc basnijholt q1 dask-worker 21334 -- -- -- 9999:00:0 R 00:47:05
8571.hpc05.hpc basnijholt q1 dask-worker 21403 -- -- -- 9999:00:0 R 01:49:45
8572.hpc05.hpc basnijholt q1 dask-worker 21461 -- -- -- 9999:00:0 R 01:06:57
8573.hpc05.hpc basnijholt q1 dask-worker 21518 -- -- -- 9999:00:0 R 01:58:19
8574.hpc05.hpc basnijholt q1 dask-worker 21544 -- -- -- 9999:00:0 R 01:44:40
8575.hpc05.hpc basnijholt q1 dask-worker 21593 -- -- -- 9999:00:0 R 01:56:53
8576.hpc05.hpc basnijholt q1 dask-worker 21655 -- -- -- 9999:00:0 R 01:28:54
8577.hpc05.hpc basnijholt q1 dask-worker 21704 -- -- -- 9999:00:0 R 01:13:07
8578.hpc05.hpc basnijholt q1 dask-worker 21721 -- -- -- 9999:00:0 R 00:53:04
8579.hpc05.hpc basnijholt q1 dask-worker 21751 -- -- -- 9999:00:0 R 01:17:29
8580.hpc05.hpc basnijholt q1 dask-worker 8048 -- -- -- 9999:00:0 R 01:29:38
8581.hpc05.hpc basnijholt q1 dask-worker 8079 -- -- -- 9999:00:0 R 00:00:25
8582.hpc05.hpc basnijholt q1 dask-worker 8118 -- -- -- 9999:00:0 R 00:49:05
8583.hpc05.hpc basnijholt q1 dask-worker 8194 -- -- -- 9999:00:0 R 01:34:04
8584.hpc05.hpc basnijholt q1 dask-worker 8248 -- -- -- 9999:00:0 R 01:04:20
8585.hpc05.hpc basnijholt q1 dask-worker 8300 -- -- -- 9999:00:0 R 00:00:26
8586.hpc05.hpc basnijholt q1 dask-worker 8322 -- -- -- 9999:00:0 R 01:07:27
8587.hpc05.hpc basnijholt q1 dask-worker 8369 -- -- -- 9999:00:0 R 01:09:28
8588.hpc05.hpc basnijholt q1 dask-worker 8428 -- -- -- 9999:00:0 R 00:54:45
8589.hpc05.hpc basnijholt q1 dask-worker 8507 -- -- -- 9999:00:0 R 01:00:15
8590.hpc05.hpc basnijholt q1 dask-worker 8560 -- -- -- 9999:00:0 R 00:40:28
8591.hpc05.hpc basnijholt q1 dask-worker 8606 -- -- -- 9999:00:0 R 00:00:28
8592.hpc05.hpc basnijholt q1 dask-worker 8629 -- -- -- 9999:00:0 R 00:15:26
8593.hpc05.hpc basnijholt q1 dask-worker 8691 -- -- -- 9999:00:0 R 01:24:24
8594.hpc05.hpc basnijholt q1 dask-worker 8750 -- -- -- 9999:00:0 R 02:07:52
8595.hpc05.hpc basnijholt q1 dask-worker 8818 -- -- -- 9999:00:0 R 00:00:24
8596.hpc05.hpc basnijholt q1 dask-worker 8866 -- -- -- 9999:00:0 R 00:00:29
8597.hpc05.hpc basnijholt q1 dask-worker 8903 -- -- -- 9999:00:0 R 00:48:07
8598.hpc05.hpc basnijholt q1 dask-worker 8926 -- -- -- 9999:00:0 R 00:00:26
8599.hpc05.hpc basnijholt q1 dask-worker 8971 -- -- -- 9999:00:0 R 00:58:24
8600.hpc05.hpc basnijholt q1 dask-worker 1815 -- -- -- 9999:00:0 R 00:00:35
8601.hpc05.hpc basnijholt q1 dask-worker 1844 -- -- -- 9999:00:0 R 00:00:32
8602.hpc05.hpc basnijholt q1 dask-worker 1889 -- -- -- 9999:00:0 R 00:00:34
8603.hpc05.hpc basnijholt q1 dask-worker 1963 -- -- -- 9999:00:0 R 00:00:35
8604.hpc05.hpc basnijholt q1 dask-worker 2013 -- -- -- 9999:00:0 R 00:07:14
8605.hpc05.hpc basnijholt q1 dask-worker 2058 -- -- -- 9999:00:0 R 00:00:33
8606.hpc05.hpc basnijholt q1 dask-worker 2089 -- -- -- 9999:00:0 R 00:00:33
8607.hpc05.hpc basnijholt q1 dask-worker 2176 -- -- -- 9999:00:0 R 00:00:35
8608.hpc05.hpc basnijholt q1 dask-worker 2234 -- -- -- 9999:00:0 R 00:00:36
8609.hpc05.hpc basnijholt q1 dask-worker 2300 -- -- -- 9999:00:0 R 00:00:33
8610.hpc05.hpc basnijholt q1 dask-worker 2371 -- -- -- 9999:00:0 R 00:07:42
8611.hpc05.hpc basnijholt q1 dask-worker 2438 -- -- -- 9999:00:0 R 00:00:39
8612.hpc05.hpc basnijholt q1 dask-worker 2473 -- -- -- 9999:00:0 R 00:00:35
8613.hpc05.hpc basnijholt q1 dask-worker 2539 -- -- -- 9999:00:0 R 00:03:58
8614.hpc05.hpc basnijholt q1 dask-worker 2601 -- -- -- 9999:00:0 R 00:00:39
8615.hpc05.hpc basnijholt q1 dask-worker 2668 -- -- -- 9999:00:0 R 00:35:26
8616.hpc05.hpc basnijholt q1 dask-worker 2721 -- -- -- 9999:00:0 R 00:00:32
8617.hpc05.hpc basnijholt q1 dask-worker 2761 -- -- -- 9999:00:0 R 00:14:29
8618.hpc05.hpc basnijholt q1 dask-worker 2781 -- -- -- 9999:00:0 R 00:00:35
8619.hpc05.hpc basnijholt q1 dask-worker 2824 -- -- -- 9999:00:0 R 00:00:34
8620.hpc05.hpc basnijholt q1 dask-worker 310 -- -- -- 9999:00:0 R 00:00:38
8621.hpc05.hpc basnijholt q1 dask-worker 350 -- -- -- 9999:00:0 R 00:00:37
8622.hpc05.hpc basnijholt q1 dask-worker 401 -- -- -- 9999:00:0 R 00:00:38
8623.hpc05.hpc basnijholt q1 dask-worker 473 -- -- -- 9999:00:0 R 00:00:37
8624.hpc05.hpc basnijholt q1 dask-worker 526 -- -- -- 9999:00:0 R 00:00:36
8625.hpc05.hpc basnijholt q1 dask-worker 572 -- -- -- 9999:00:0 R 00:01:47
8626.hpc05.hpc basnijholt q1 dask-worker 607 -- -- -- 9999:00:0 R 00:27:28
8627.hpc05.hpc basnijholt q1 dask-worker 663 -- -- -- 9999:00:0 R 00:00:36
8628.hpc05.hpc basnijholt q1 dask-worker 733 -- -- -- 9999:00:0 R 00:00:36
8629.hpc05.hpc basnijholt q1 dask-worker 815 -- -- -- 9999:00:0 R 00:00:36
8630.hpc05.hpc basnijholt q1 dask-worker 867 -- -- -- 9999:00:0 R 00:00:37
8631.hpc05.hpc basnijholt q1 dask-worker 918 -- -- -- 9999:00:0 R 00:00:37
8632.hpc05.hpc basnijholt q1 dask-worker 942 -- -- -- 9999:00:0 R 00:00:36
8633.hpc05.hpc basnijholt q1 dask-worker 1001 -- -- -- 9999:00:0 R 00:00:36
8634.hpc05.hpc basnijholt q1 dask-worker 1184 -- -- -- 9999:00:0 R 00:00:39
8635.hpc05.hpc basnijholt q1 dask-worker 1065 -- -- -- 9999:00:0 R 00:00:36
8636.hpc05.hpc basnijholt q1 dask-worker 1132 -- -- -- 9999:00:0 R 00:37:28
8637.hpc05.hpc basnijholt q1 dask-worker 1220 -- -- -- 9999:00:0 R 00:00:36
8638.hpc05.hpc basnijholt q1 dask-worker 1248 -- -- -- 9999:00:0 R 00:22:40
8639.hpc05.hpc basnijholt q1 dask-worker 1290 -- -- -- 9999:00:0 R 00:00:39
8640.hpc05.hpc basnijholt q1 dask-worker 34496 -- -- -- 9999:00:0 R 00:00:34
In the last column you see the time that that core has actually been doing stuff. You see that some cores have been running code for more than an hour and some for only half a minute.
Am I doing something wrong, or is dask
not dividing the tasks very well?
Seeing this test failure on the CI.
____________________ test_dont_request_on_many_short_tasks _____________________
loop = <tornado.platform.epoll.EPollIOLoop object at 0x7f8ba7153b00>
def test_dont_request_on_many_short_tasks(loop):
with SGECluster(scheduler_port=0) as cluster:
adapt = Adaptive(cluster, interval=50, startup_cost=10)
with Client(cluster, loop=loop) as client:
cluster.scheduler.task_duration['slowinc'] = 0.001
futures = client.map(slowinc, range(1000), delay=0.001)
while not cluster.scheduler.workers:
sleep(0.01)
for i in range(20):
sleep(0.1)
> assert len(cluster.workers) < 2
E AssertionError: assert 2 < 2
E + where 2 = len({'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')})
E + where {'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')} = <SGECluster: 2 workers>.workers
dask_drmaa/tests/test_adaptive.py:105: AssertionError
----------------------------- Captured stderr call -----------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f8ba715ac88> exception was never retrieved: Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
**kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
yielded = self.gen.send(value)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
af, addr, stream = yield connector.start()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
self.try_connect(iter(self.primary_addrs))
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
future = self.connect(af, addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
return stream.connect(addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
self._add_io_state(self.io_loop.WRITE)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
self.fileno(), self._handle_events, self._state)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f8ba6a460f0> exception was never retrieved: Traceback (most recent call last):
File "/dask-drmaa/dask_drmaa/tests/test_adaptive.py", line 105, in test_dont_request_on_many_short_tasks
assert len(cluster.workers) < 2
AssertionError: assert 2 < 2
+ where 2 = len({'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')})
+ where {'15.1': WorkerSpec(job_id='15.1', kwargs={'nativeSpecification': '', 'cpus': 1, 'memory': None, 'memory_fraction': 0...., 'memory': None, 'memory_fraction': 0.5}, stdout='/dask-drmaa/worker.16.1.out', stderr='/dask-drmaa/worker.16.1.err')} = <SGECluster: 2 workers>.workers
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/dask_drmaa-0.1.0-py3.6.egg/dask_drmaa/core.py", line 193, in stop_workers
close_workers=True)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 1951, in retire_workers
yield [self.close_worker(worker=w) for w in workers]
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 828, in callback
result_list.append(f.result())
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 564, in close_worker
yield r.terminate(report=False)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 453, in send_recv_from_rpc
comm = yield self.live_comm()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 429, in live_comm
connection_args=self.connection_args)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 185, in connect
quiet_exceptions=EnvironmentError)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 921, in with_timeout
timeout, timeout_callback)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 505, in add_timeout
callback, *args, **kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 921, in call_at
heapq.heappush(self._timeouts, timeout)
TypeError: heap argument must be a list
When the load on all workers is not evenly distributed, no work stealing is performed.
One way to provoke this, is to submit the jobs first and start the workers afterwards. Then the worker that starts first will get all jobs assigned and all other workers will be idle. However, this is also happening if the workers get started first and the execution time of the jobs differs very much.
In the web interface under /stealing
, the idle and saturated workers are correctly displayed.
I wrote a little MWE:
import time
import dask.multiprocessing
from dask.distributed import Client
from dask_drmaa import SGECluster
def execute(sleep_time):
import time
time.sleep(sleep_time)
return sleep_time
if __name__ == "__main__":
times = [60, 60, 60, 60, 1, 1, 1, 1]
lazy_results = [dask.delayed(execute)(sleep_time) for sleep_time in times]
cluster = SGECluster()
client = Client(cluster)
cluster.start_workers(2)
results = dask.compute(*lazy_results, get=client.get)
print(results)
cluster.close()
I don't know if this is really a bug in dask-drmaa or if it is in dask.distributed, but I submitted it here because I can't really reproduce this with LocalCluster
.
I use a fully updated version of Anaconda with Python 3.6 on Linux. The packages are installed in the following versions:
dask 0.16.0 py36h73d177f_0
dask-core 0.16.0 py36ha827fd6_0
dask-drmaa 0.1.0 <pip>
distributed 1.20.2 py36_0
Happy holidays and thanks for your help. Please let me know if you need any further information.
I would like SGECluster.start_workers to have a docstring. I'm happy to submit a PR for this. Any warnings or advice before I get started?
Here is my current understanding of the meaning of each argument:
There hasn't been any activity here for a couple of year. Is this project still used? Has it been replaced with something like dask-jobqueue?
Should we archive it?
Hi, I'm using SLURM 17.02.3.
In my environment, I noticed filenames of stdout and stderr are wrong (e.g., worker.$JOB_ID.err/out)
https://github.com/dask/dask-drmaa/blob/master/dask_drmaa/core.py#L42-L43
'outputPath': ':' + worker_out_path_template % dict(jid='$JOB_ID.$drmaa_incr_ph$', kind='out'),
'errorPath': ':' + worker_out_path_template % dict(jid='$JOB_ID.$drmaa_incr_ph$', kind='err'),
Instead of $JOB_ID
, SLURM uses %j
placeholder, please see https://slurm.schedmd.com/sbatch.html
jid='$JOB_ID.$drmaa_incr_ph$'
should be jid='%j.$drmaa_incr_ph$'
in slurm.
I confirmed that when I hard-cord it in slurm but I don't know how to determine slurm or any other grid engines.
Appears that test_adaptive
's test_adaptive_memory
times out. This can be seen in this CI build. Though it can also be reproduced locally in the Docker image. So this issue is not restricted to CI.
Hello,
When starting workers on a DRMAACluster, they always use many threads
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 28
distributed.worker - INFO - Memory: 280GB
The actual number of threads differs from one worker to the next.
Is there a way to limit workers to one thread? Using so many threads causes my jobs to exceed memory limits and die.
Thanks!
I'm able trouble reproducing the unit tests based on the given instructions, for Windows. I've done the following:
*.sh
files in the top-level directory of this project.docker-compose build --no-cache
When I run ./start-sge.sh
I see the output Waiting for SGE slots to become available
after an indefinite amount of waiting:
$ ./start-sge.sh
sge_master is up-to-date
slave_two is up-to-date
slave_one is up-to-date
Waiting for SGE slots to become available
# output of docker exec -it sge_master qhost;
HOSTNAME ARCH NCPU LOAD MEMTOT MEMUSE SWAPTO SWAPUS
-------------------------------------------------------------------------------
global - - - - - - -
# and repeats
# docker contains are running
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED
STATUS PORTS NAMES
efa6eebce34a daskdrmaa_slave-one "bash /run-slave.sh" About an hour
ago Up About an hour slave_one
9fae83c48356 daskdrmaa_slave-two "bash /run-slave.sh" About an hour
ago Up About an hour slave_two
b13940296988 daskdrmaa_master "bash -x /run-master" About an hour
ago Up About an hour 6444-6446/tcp, 8000/tcp sge_master
# example of output when I try to run unit tests
$ docker exec -it sge_master /bin/bash -c "cd /dask-drmaa; py.test dask_drmaa -sv -k test_adaptive_memory"
============================= test session starts ==============================
platform linux -- Python 3.6.4, pytest-3.4.2, py-1.5.2, pluggy-0.6.0 -- /opt/ana
conda/bin/python
cachedir: .pytest_cache
rootdir: /dask-drmaa, inifile:
collected 23 items
dask_drmaa/tests/test_adaptive.py::test_adaptive_memory
distributed.utils - ERROR - code 17: denied: host "sge_master" is no submit host
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 623, in log_errors
yield
File "/dask-drmaa/dask_drmaa/core.py", line 203, in start_workers
ids = get_session().runBulkJobs(jt, 1, n, 1)
File "/opt/anaconda/lib/python3.6/site-packages/drmaa/session.py", line 340, in runBulkJobs
return list(run_bulk_job(jobTemplate, beginIndex, endIndex, step))
File "/opt/anaconda/lib/python3.6/site-packages/drmaa/helpers.py", line 286, in run_bulk_job
c(drmaa_run_bulk_jobs, jids, jt, start, end, incr)
File "/opt/anaconda/lib/python3.6/site-packages/drmaa/helpers.py", line 303, in c
return f(*(args + (error_buffer, sizeof(error_buffer))))
File "/opt/anaconda/lib/python3.6/site-packages/drmaa/errors.py", line 151, in error_check
raise _ERRORS[code - 1](error_string)
drmaa.errors.DeniedByDrmException: code 17: denied: host "sge_master" is no submit host
Unfortunately I have no familiarity with SGE. Any suggestions as to how I can debug/fix this? Apologies if this is not appropriate for this issue tracker.
Seeing this test failure on the CI.
_________________________ test_stop_workers_politely __________________________
self = <Client: not connected>
futures = [<Future: status: cancelled, type: int, key: int-5c8a950061aa331153f4a172bbcbfd1b>, <Future: status: cancelled, type: ...9541ea58b401f115b751e79eabbff>, <Future: status: cancelled, type: int, key: int-ce9a05dd6ec76c6a6d171b0c055f3127>, ...]
errors = 'raise', direct = False, local_worker = None
@gen.coroutine
def _gather(self, futures, errors='raise', direct=None, local_worker=None):
futures2, keys = unpack_remotedata(futures, byte_keys=True)
keys = [tokey(key) for key in keys]
bad_data = dict()
if direct is None:
try:
w = get_worker()
except Exception:
direct = False
else:
if w.scheduler.address == self.scheduler.address:
direct = True
@gen.coroutine
def wait(k):
""" Want to stop the All(...) early if we find an error """
st = self.futures[k]
yield st.event.wait()
if st.status != 'finished':
raise AllExit()
while True:
logger.debug("Waiting on futures to clear before gather")
with ignoring(AllExit):
yield All([wait(key) for key in keys if key in self.futures])
failed = ('error', 'cancelled')
exceptions = set()
bad_keys = set()
for key in keys:
if (key not in self.futures or
self.futures[key].status in failed):
exceptions.add(key)
if errors == 'raise':
try:
st = self.futures[key]
> exception = st.exception
E AttributeError: exception
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1233: AttributeError
During handling of the above exception, another exception occurred:
loop = <tornado.platform.epoll.EPollIOLoop object at 0x7f3505aedf28>
def test_stop_workers_politely(loop):
with DRMAACluster(scheduler_port=0) as cluster:
with Client(cluster, loop=loop) as client:
cluster.start_workers(2)
while len(client.ncores()) < 2:
sleep(0.1)
futures = client.scatter(list(range(10)))
a, b = cluster.workers
cluster.stop_workers(a)
> data = client.gather(futures)
dask_drmaa/tests/test_core.py:131:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1364: in gather
asynchronous=asynchronous)
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:547: in sync
return sync(self.loop, func, *args, **kwargs)
/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py:270: in sync
six.reraise(*error[0])
/opt/anaconda/lib/python3.6/site-packages/six.py:693: in reraise
raise value
/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py:258: in f
result[0] = yield make_coro()
/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py:1055: in run
value = future.result()
/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py:238: in result
raise_exc_info(self._exc_info)
<string>:4: in raise_exc_info
???
/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py:1063: in run
yielded = self.gen.throw(*exc_info)
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1238: in _gather
None)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tp = <class 'concurrent.futures._base.CancelledError'>, value = None, tb = None
def reraise(tp, value, tb=None):
try:
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
> raise value
E concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
/opt/anaconda/lib/python3.6/site-packages/six.py:693: CancelledError
----------------------------- Captured stderr call -----------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f3505aef9b0> exception was never retrieved: Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
**kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
yielded = self.gen.send(value)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
af, addr, stream = yield connector.start()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
self.try_connect(iter(self.primary_addrs))
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
future = self.connect(af, addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
return stream.connect(addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
self._add_io_state(self.io_loop.WRITE)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
self.fileno(), self._handle_events, self._state)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f350c075da0> exception was never retrieved: Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1233, in _gather
exception = st.exception
AttributeError: exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/dask-drmaa/dask_drmaa/tests/test_core.py", line 131, in test_stop_workers_politely
data = client.gather(futures)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1364, in gather
asynchronous=asynchronous)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 547, in sync
return sync(self.loop, func, *args, **kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 270, in sync
six.reraise(*error[0])
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 258, in f
result[0] = yield make_coro()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1238, in _gather
None)
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/dask_drmaa-0.1.0-py3.6.egg/dask_drmaa/core.py", line 193, in stop_workers
close_workers=True)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 1951, in retire_workers
yield [self.close_worker(worker=w) for w in workers]
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 828, in callback
result_list.append(f.result())
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 564, in close_worker
yield r.terminate(report=False)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 453, in send_recv_from_rpc
comm = yield self.live_comm()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 429, in live_comm
connection_args=self.connection_args)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 185, in connect
quiet_exceptions=EnvironmentError)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 921, in with_timeout
timeout, timeout_callback)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 505, in add_timeout
callback, *args, **kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 921, in call_at
heapq.heappush(self._timeouts, timeout)
TypeError: heap argument must be a list
--------------------------- Captured stderr teardown ---------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f350c09f080> exception was never retrieved: Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1233, in _gather
exception = st.exception
AttributeError: exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 196, in __init__
self.result = func()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 182, in <lambda>
return CallInfo(lambda: ihook(item=item, **kwds), when=when)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 745, in __call__
return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 339, in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 334, in <lambda>
_MultiCall(methods, kwargs, hook.spec_opts).execute()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 613, in execute
return _wrapped_call(hook_impl.function(*args), self.execute)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 254, in _wrapped_call
return call_outcome.get_result()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 279, in get_result
raise ex[1].with_traceback(ex[2])
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 265, in __init__
self.result = func()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 614, in execute
res = hook_impl.function(*args)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 112, in pytest_runtest_call
item.runtest()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/python.py", line 1169, in runtest
self.ihook.pytest_pyfunc_call(pyfuncitem=self)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 745, in __call__
return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 339, in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 334, in <lambda>
_MultiCall(methods, kwargs, hook.spec_opts).execute()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 613, in execute
return _wrapped_call(hook_impl.function(*args), self.execute)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 254, in _wrapped_call
return call_outcome.get_result()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 279, in get_result
raise ex[1].with_traceback(ex[2])
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 265, in __init__
self.result = func()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 614, in execute
res = hook_impl.function(*args)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/python.py", line 143, in pytest_pyfunc_call
testfunction(**testargs)
File "/dask-drmaa/dask_drmaa/tests/test_core.py", line 131, in test_stop_workers_politely
data = client.gather(futures)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1364, in gather
asynchronous=asynchronous)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 547, in sync
return sync(self.loop, func, *args, **kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 270, in sync
six.reraise(*error[0])
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 258, in f
result[0] = yield make_coro()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1238, in _gather
None)
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
**kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
yielded = self.gen.send(value)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
af, addr, stream = yield connector.start()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
self.try_connect(iter(self.primary_addrs))
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
future = self.connect(af, addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
return stream.connect(addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
self._add_io_state(self.io_loop.WRITE)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
self.fileno(), self._handle_events, self._state)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object
Various schedulers have different ways of specifying resource limits for jobs (e.g. memory, CPU, etc.). It would be nice to explore making these first class citizens in dask-drmaa. As DRMAA doesn't really have a way to specify this itself, it would amount to detecting the DRMAA implementation used and passing along the right flags to the nativeSpecification
parameter of the JobTemplate
. That said, we have already had some success with this strategy w.r.t. log files as demonstrated by PR ( #56 ). So should be pretty reasonable to extend the strategy to this case as well.
ref: https://www.ibm.com/support/knowledgecenter/en/SSWRJV_10.1.0/lsf_admin/resource_usage_limits_supported.html
ref: https://slurm.schedmd.com/sbatch.html
ref: http://gridscheduler.sourceforge.net/htmlman/htmlman5/queue_conf.html
ref: http://apps.man.poznan.pl/trac/slurm-drmaa#Nativespecification
For compatibility with Distributed's LocalCluster
, it would be nice to have start_worker
and stop_worker
functions for DRMAACluster
.
Currently deployment is handled by manually scaling the cluster up and down
jobids = cluster.start_workers(10)
cluster.stop_workers(jobids)
This system is straightforward and effective, but there are other options that we could consider, such as adaptively scaling up or down based on load or on memory needs.
There was some preliminary work done a while ago in this direction described here: http://matthewrocklin.com/blog/work/2016/09/22/cluster-deployments
Related to #76
Currently we create a single job template to create all workers.
class DRMAACluster(object):
def __init__(self, **kwargs):
self.local_cluster = LocalCluster(n_workers=0, **kwargs)
self.session = drmaa.Session()
self.session.initialize()
self.worker_template = self.session.createJobTemplate()
self.worker_template.remoteCommand = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')
self.worker_template.jobName = 'dask-worker'
self.worker_template.args = ['%s:%d' % (socket.gethostname(), self.local_cluster.scheduler.port)]
self.worker_template.outputPath = ':/%s/out' % os.getcwd()
self.worker_template.errorPath = ':/%s/err' % os.getcwd()
self.worker_template.workingDirectory = os.getcwd()
self.workers = set()
This job template is easily accessible and so provides a nice and familiar release valve for expert users of python-drmaa who want to customize their setup.
However as we submit slightly different kinds of worker jobs we'll want to modify this template a bit, by specifying extra native specifications and by adding extra keywords to the dask-worker
command.
def start_workers(self, n=1, .. want to add extra arguments here ...):
and safely create new worker template here for submission
I'm unable to find a way to copy-and-append an existing job template, which leads me to instead consider storing all of the job template attributes (args, native spec, command, error path, etc.) bare, instead of within a job template object. Then we'll create a new job template for each call to start_workers
.
class DRMAACluster(object):
def __init__(self, **kwargs):
self.local_cluster = LocalCluster(n_workers=0, **kwargs)
self.session = drmaa.Session()
self.session.initialize()
# self.worker_template = self.session.createJobTemplate()
self.remoteCommand = os.path.join(sys.exec_prefix, 'bin', 'dask-worker')
self.jobName = 'dask-worker'
self.args = ['%s:%d' % (socket.gethostname(), self.local_cluster.scheduler.port)]
self.outputPath = ':/%s/out' % os.getcwd()
self.errorPath = ':/%s/err' % os.getcwd()
self.workingDirectory = os.getcwd()
self.workers = set()
def start_workers(self, args, ...)
wt = self.session.createJobTemplate()
wt.args = self.args + args
...
Does anyone see a better way?
cc @davidr
Ran into an issue in test_stop_single_worker
with the Python 3 where the worker failed to clean up its scratch space directory. More details in this build log. Snippet with the test failure also included below. The same failure did not occur on Python 2.
=================================== FAILURES ===================================
___________________________ test_stop_single_worker ____________________________
loop = <tornado.platform.epoll.EPollIOLoop object at 0x7f260a3a22b0>
def test_stop_single_worker(loop):
with DRMAACluster(scheduler_port=0) as cluster:
with Client(cluster, loop=loop) as client:
cluster.start_workers(2)
future = client.submit(lambda x: x + 1, 1)
assert future.result() == 2
while len(client.ncores()) < 2:
sleep(0.1)
a, b = cluster.workers
local_dir = client.run(lambda dask_worker: dask_worker.local_dir,
workers=[a])[a]
assert os.path.exists(local_dir)
cluster.stop_workers(a)
start = time()
while len(client.ncores()) != 1:
sleep(0.2)
assert time() < start + 60
> assert not os.path.exists(local_dir)
E AssertionError: assert not True
E + where True = <function exists at 0x7f262a9ae950>('/dask-drmaa/dask-worker-space/worker-w5gj31no')
E + where <function exists at 0x7f262a9ae950> = <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'>.exists
E + where <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'> = os.path
dask_drmaa/tests/test_core.py:117: AssertionError
========== 1 failed, 20 passed, 1 skipped, 1 xfailed in 96.55 seconds ==========
Noticed that test_stop_single_worker
has started failing on CI. Seems to be a consistent failure. However it wasn't failing on the same code a month ago. So something else has changed (possibly in Distributed) related to how Dask worker directories are managed.
=================================== FAILURES ===================================
___________________________ test_stop_single_worker ____________________________
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f270bb33978>
def test_stop_single_worker(loop):
with DRMAACluster(scheduler_port=0) as cluster:
with Client(cluster, loop=loop) as client:
cluster.start_workers(2)
future = client.submit(lambda x: x + 1, 1)
assert future.result() == 2
while len(client.ncores()) < 2:
sleep(0.1)
a, b = cluster.workers
local_dir = client.run(lambda dask_worker: dask_worker.local_dir,
workers=[a])[a]
assert os.path.exists(local_dir)
cluster.stop_workers(a)
start = time()
while len(client.ncores()) != 1:
sleep(0.2)
assert time() < start + 60
> assert not os.path.exists(local_dir)
E AssertionError: assert not True
E + where True = <function exists at 0x7f27547ba950>('/dask-drmaa/dask-worker-space/worker-c7271lb9')
E + where <function exists at 0x7f27547ba950> = <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'>.exists
E + where <module 'posixpath' from '/opt/anaconda/lib/python3.6/posixpath.py'> = os.path
dask_drmaa/tests/test_core.py:128: AssertionError
=============== 1 failed, 23 passed, 1 xpassed in 120.47 seconds ===============
ref: https://travis-ci.org/dask/dask-drmaa/jobs/396654145#L2188-L2219
ref: https://travis-ci.org/dask/dask-drmaa/builds/385798967
Our hpc cluster has several subnets, dask-drmaa uses the ip address of the outside network, that is only used to connect to the gateways, for the scheduler, as a result, workers cannot connect.
My understanding is that job schedulers tend to schedule jobs based on their advertised wall times. Allocations of small short-running jobs can squeeze in to the cluster sooner than many long-running jobs.
Dask workers can be fairly flexible here. We can add and remove many single-node jobs frequently, handing data off from about-to-expire workers to workers with a long time to live. It's unclear to me how much value there is here or if this is something we should focus on.
The default setting for drmaa-python seems to be to totally ignore walltime settings. Is this common? Perhaps DRMAA-style clusters are often underutilized so this is not a big issue?
I'm trying to run the tests locally (mimicking steps in the Travis-Ci configuration file), but I get this error:
$ docker exec -it sge_master /bin/bash -c "cd /dask-drmaa; py.test dask_drmaa/cli -vs --tb=native"
========================================================== test session starts ==========================================================
platform linux -- Python 3.6.0, pytest-3.0.7, py-1.4.33, pluggy-0.4.0 -- /opt/anaconda/bin/python
cachedir: .cache
rootdir: /dask-drmaa, inifile:
collected 1 items
dask_drmaa/cli/tests/test_dask_drmaa.py::test_dask_drmaa distributed.utils - ERROR - Timed out trying to connect to 'tcp://127.0.0.1:8786' after 5 s: ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 169, in connect
quiet_exceptions=EnvironmentError)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
tornado.gen.TimeoutError: Timeout
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 212, in f
result[0] = yield make_coro()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 518, in _start
yield self._ensure_connected(timeout=timeout)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 550, in _ensure_connected
connection_args=self.connection_args)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 178, in connect
_raise(error)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 161, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:8786' after 5 s: ConnectionRefusedError: [Errno 111] Connection refused
Print from stderr
=================
b'distributed.scheduler - INFO - Scheduler at: tcp://172.18.0.2:32890\n'b'distributed.scheduler - INFO - bokeh at: 0.0.0.0:8787\n'b'distributed.scheduler - INFO - Scheduler closing...\n'b'distributed.scheduler - INFO - Scheduler closing all comms\n'
Print from stdout
=================
FAILED
Judging by the stderr, it seems expected that connecting to tcp://127.0.0.1:8786
fails if the scheduler is actually listening on tcp://172.18.0.2:32890
(i.e. on a different IP and port)...
How would one go about using this with say Amazon ECS?
In cases where there are lots of workers stop_workers
is slow. This appears to be caused by closing each worker in a for
-loop.
As the common case where multiple workers are submitted to be stopped is just submitting all workers to stop, it would be good to special case this and pass JOB_IDS_SESSION_ALL
to DRMAA. This would bypass the need for the loop.
There are a few options:
worker_ids
parameter optional and the default value equal to all workers (easy, straightforward)close
(easy, hacky without 1 or 2)Thoughts?
Currently when we create workers in SGE we add the memory used as a dask resource that can be tracked and used within constraints. We might consider doing the same for cores so that tasks that required many cores could be appropriately assigned to larger nodes. Alternatively, we could also allow for arbitrary resources to be passed, so allow people to handle this in their own way.
When DRMAA launches a worker it gives us a job id like 34.1
. This starts a worker on a random port which registers with the scheduler. Now each worker has two identifiers:
34.1
192.168.0.1:37482
The Dask scheduler knows when it should clean up a worker. It goes ahead and terminates the worker remotely. However, despite the process terminating, the DRMAA job appears to continue to run. This causes some confusion, especially when trying to understand how many workers we have in flight when determining if we should scale down the cluster.
I see two solutions here:
The second option would be useful generally. We can do it by passing the Job ID as the worker's name/alias
dask-worker scheduler-address:8786 --name 34.1
Ideally we would use a job-scheduler-provided environment variable here
dask-worker scheduler-address:8786 --name $JOBID
dask-worker scheduler-address:8786 --name $drmaa_incr_ph$
...
However, it appears that environment variables can not be used within the args, but only with a batch script. Currently we specify a job template by pointing to the dask-worker process directly
wt = get_session().createJobTemplate()
wt.jobName = 'dask-drmaa'
wt.remoteCommand = 'dask-worker
wt.args = [scheduler_address, '--name', '$JOBID']
wt.outputPath = ...
wt.errorPath = ...
However, as stated above using environment variables with args seems to not work with DRMAA. Instead, it is recommended to use environment variables within scripts
# worker script
dask-worker $1 --name $JOBID $@
wt = get_session().createJobTemplate()
wt.jobName = 'dask-drmaa'
wt.remoteCommand = 'my-worker-script
wt.args = [scheduler_address]
wt.outputPath = ...
wt.errorPath = ...
I tried this out and didn't have much success. I suspect that I'm missing something simple. Additionally it would be good have different output paths for different jobs. Currently they all dump to the same file.
There is an xfailed test in test_core.py
that checks for the correct job names as worker names. If anyone more familiar with DRMAA can help to make this test pass I would be grateful.
@pytest.mark.xfail(reason="Can't use job name environment variable as arg")
def test_job_name_as_name(loop):
with DRMAACluster(scheduler_port=0) as cluster:
cluster.start_workers(2)
while len(cluster.scheduler.workers) < 1:
sleep(0.1)
names = {cluster.scheduler.worker_info[w]['name']
for w in cluster.scheduler.workers}
assert names == set(cluster.workers)
cc @davidr
Using SGE and wanting to initialize dask fully from python.
My main node is dual-homed, with 1G and 10G interfaces. The 10G is the one that my SGE cluster uses.
from dask_drmaa import DRMAACluster
from dask.distributed import Client
In [9]: cluster = DRMAACluster(hostname='master-10g')
INFO:dask_drmaa.core:Start local scheduler at master-10g
In [10]: cluster.scheduler_address
Out[10]: 'tcp://10.22.150.194:37386' . # this is the master-1g IP, not the one I want
Meanwhile, the workers are spinning trying to connect to the 1G IP:
tail worker.23523.1.err
distributed.worker - INFO - Trying to connect to scheduler: tcp://10.22.150.194:37386
Can this be extended to allow one to specify the scheduler interface / hostname / IP to give to the workers?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.