_________________________ test_stop_workers_politely __________________________
self = <Client: not connected>
futures = [<Future: status: cancelled, type: int, key: int-5c8a950061aa331153f4a172bbcbfd1b>, <Future: status: cancelled, type: ...9541ea58b401f115b751e79eabbff>, <Future: status: cancelled, type: int, key: int-ce9a05dd6ec76c6a6d171b0c055f3127>, ...]
errors = 'raise', direct = False, local_worker = None
@gen.coroutine
def _gather(self, futures, errors='raise', direct=None, local_worker=None):
futures2, keys = unpack_remotedata(futures, byte_keys=True)
keys = [tokey(key) for key in keys]
bad_data = dict()
if direct is None:
try:
w = get_worker()
except Exception:
direct = False
else:
if w.scheduler.address == self.scheduler.address:
direct = True
@gen.coroutine
def wait(k):
""" Want to stop the All(...) early if we find an error """
st = self.futures[k]
yield st.event.wait()
if st.status != 'finished':
raise AllExit()
while True:
logger.debug("Waiting on futures to clear before gather")
with ignoring(AllExit):
yield All([wait(key) for key in keys if key in self.futures])
failed = ('error', 'cancelled')
exceptions = set()
bad_keys = set()
for key in keys:
if (key not in self.futures or
self.futures[key].status in failed):
exceptions.add(key)
if errors == 'raise':
try:
st = self.futures[key]
> exception = st.exception
E AttributeError: exception
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1233: AttributeError
During handling of the above exception, another exception occurred:
loop = <tornado.platform.epoll.EPollIOLoop object at 0x7f3505aedf28>
def test_stop_workers_politely(loop):
with DRMAACluster(scheduler_port=0) as cluster:
with Client(cluster, loop=loop) as client:
cluster.start_workers(2)
while len(client.ncores()) < 2:
sleep(0.1)
futures = client.scatter(list(range(10)))
a, b = cluster.workers
cluster.stop_workers(a)
> data = client.gather(futures)
dask_drmaa/tests/test_core.py:131:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1364: in gather
asynchronous=asynchronous)
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:547: in sync
return sync(self.loop, func, *args, **kwargs)
/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py:270: in sync
six.reraise(*error[0])
/opt/anaconda/lib/python3.6/site-packages/six.py:693: in reraise
raise value
/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py:258: in f
result[0] = yield make_coro()
/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py:1055: in run
value = future.result()
/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py:238: in result
raise_exc_info(self._exc_info)
<string>:4: in raise_exc_info
???
/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py:1063: in run
yielded = self.gen.throw(*exc_info)
/opt/anaconda/lib/python3.6/site-packages/distributed/client.py:1238: in _gather
None)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tp = <class 'concurrent.futures._base.CancelledError'>, value = None, tb = None
def reraise(tp, value, tb=None):
try:
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
> raise value
E concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
/opt/anaconda/lib/python3.6/site-packages/six.py:693: CancelledError
----------------------------- Captured stderr call -----------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f3505aef9b0> exception was never retrieved: Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
**kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
yielded = self.gen.send(value)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
af, addr, stream = yield connector.start()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
self.try_connect(iter(self.primary_addrs))
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
future = self.connect(af, addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
return stream.connect(addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
self._add_io_state(self.io_loop.WRITE)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
self.fileno(), self._handle_events, self._state)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f350c075da0> exception was never retrieved: Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1233, in _gather
exception = st.exception
AttributeError: exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/dask-drmaa/dask_drmaa/tests/test_core.py", line 131, in test_stop_workers_politely
data = client.gather(futures)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1364, in gather
asynchronous=asynchronous)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 547, in sync
return sync(self.loop, func, *args, **kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 270, in sync
six.reraise(*error[0])
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 258, in f
result[0] = yield make_coro()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1238, in _gather
None)
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/dask_drmaa-0.1.0-py3.6.egg/dask_drmaa/core.py", line 193, in stop_workers
close_workers=True)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 1951, in retire_workers
yield [self.close_worker(worker=w) for w in workers]
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 828, in callback
result_list.append(f.result())
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/scheduler.py", line 564, in close_worker
yield r.terminate(report=False)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 453, in send_recv_from_rpc
comm = yield self.live_comm()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/core.py", line 429, in live_comm
connection_args=self.connection_args)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/core.py", line 185, in connect
quiet_exceptions=EnvironmentError)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 921, in with_timeout
timeout, timeout_callback)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 505, in add_timeout
callback, *args, **kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 921, in call_at
heapq.heappush(self._timeouts, timeout)
TypeError: heap argument must be a list
--------------------------- Captured stderr teardown ---------------------------
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f350c09f080> exception was never retrieved: Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1233, in _gather
exception = st.exception
AttributeError: exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 196, in __init__
self.result = func()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 182, in <lambda>
return CallInfo(lambda: ihook(item=item, **kwds), when=when)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 745, in __call__
return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 339, in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 334, in <lambda>
_MultiCall(methods, kwargs, hook.spec_opts).execute()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 613, in execute
return _wrapped_call(hook_impl.function(*args), self.execute)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 254, in _wrapped_call
return call_outcome.get_result()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 279, in get_result
raise ex[1].with_traceback(ex[2])
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 265, in __init__
self.result = func()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 614, in execute
res = hook_impl.function(*args)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/runner.py", line 112, in pytest_runtest_call
item.runtest()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/python.py", line 1169, in runtest
self.ihook.pytest_pyfunc_call(pyfuncitem=self)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 745, in __call__
return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 339, in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 334, in <lambda>
_MultiCall(methods, kwargs, hook.spec_opts).execute()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 613, in execute
return _wrapped_call(hook_impl.function(*args), self.execute)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 254, in _wrapped_call
return call_outcome.get_result()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 279, in get_result
raise ex[1].with_traceback(ex[2])
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 265, in __init__
self.result = func()
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/vendored_packages/pluggy.py", line 614, in execute
res = hook_impl.function(*args)
File "/opt/anaconda/lib/python3.6/site-packages/_pytest/python.py", line 143, in pytest_pyfunc_call
testfunction(**testargs)
File "/dask-drmaa/dask_drmaa/tests/test_core.py", line 131, in test_stop_workers_politely
data = client.gather(futures)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1364, in gather
asynchronous=asynchronous)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 547, in sync
return sync(self.loop, func, *args, **kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 270, in sync
six.reraise(*error[0])
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/opt/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 258, in f
result[0] = yield make_coro()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/client.py", line 1238, in _gather
None)
File "/opt/anaconda/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
concurrent.futures._base.CancelledError: int-7ec5d3339274cee5cb507a4e4d28e791
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/opt/anaconda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 305, in connect
**kwargs)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/opt/anaconda/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
yielded = self.gen.send(value)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 183, in connect
af, addr, stream = yield connector.start()
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 82, in start
self.try_connect(iter(self.primary_addrs))
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 97, in try_connect
future = self.connect(af, addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/tcpclient.py", line 224, in _create_stream
return stream.connect(addr)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 1128, in connect
self._add_io_state(self.io_loop.WRITE)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/iostream.py", line 971, in _add_io_state
self.fileno(), self._handle_events, self._state)
File "/opt/anaconda/lib/python3.6/site-packages/tornado/ioloop.py", line 728, in add_handler
self._impl.register(fd, events | self.ERROR)
ValueError: I/O operation on closed epoll object