Running through the tutorial examples I found a problem with the following array example:
%load solutions/Array-03.py
import h5py
from glob import glob
import os
filenames = sorted(glob(os.path.join('data', 'weather-big', '*.hdf5')))
dsets = [h5py.File(filename, mode='r')['/t2m'] for filename in filenames]
import dask.array as da
arrays = [da.from_array(dset, chunks=(500, 500)) for dset in dsets]
x = da.stack(arrays, axis=0)
result = x[:, ::2, ::2]
da.to_hdf5(os.path.join('data', 'myfile.hdf5'), '/output', result)
TypeError Traceback (most recent call last)
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
37 try:
---> 38 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
39 if len(result) < 1000:
TypeError: can't pickle _thread._local objects
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
in ()
14 result = x[:, ::2, ::2]
15
---> 16 da.to_hdf5(os.path.join('data', 'myfile.hdf5'), '/output', result)
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/dask/array/core.py in to_hdf5(filename, *args, **kwargs)
3388 if chunks is True else chunks, **kwargs)
3389 for dp, x in data.items()]
-> 3390 store(list(data.values()), dsets)
3391
3392
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
949
950 if compute:
--> 951 result.compute()
952 return None
953 else:
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
133 dask.base.compute
134 """
--> 135 (result,) = compute(self, traverse=False, **kwargs)
136 return result
137
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
331 postcomputes = [a.dask_postcompute() if is_dask_collection(a)
332 else (None, a) for a in args]
--> 333 results = get(dsk, keys, **kwargs)
334 results_iter = iter(results)
335 return tuple(a if f is None else f(next(results_iter), *a)
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
1991 futures = self._graph_to_futures(dsk, set(flatten([keys])),
1992 restrictions, loose_restrictions,
-> 1993 resources=resources)
1994 packed = pack_data(keys, futures)
1995 if sync:
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, resources, retries)
1952
1953 self._send_to_scheduler({'op': 'update-graph',
-> 1954 'tasks': valmap(dumps_task, dsk3),
1955 'dependencies': valmap(list, dependencies),
1956 'keys': list(flatkeys),
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/toolz/dicttoolz.py in valmap(func, d, factory)
82 """
83 rv = factory()
---> 84 rv.update(zip(iterkeys(d), map(func, itervalues(d))))
85 return rv
86
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
701 elif not any(map(_maybe_complex, task[1:])):
702 return {'function': dumps_function(task[0]),
--> 703 'args': pickle.dumps(task[1:])}
704 return to_serialize(task)
705
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
49 except Exception:
50 try:
---> 51 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
879 try:
880 cp = CloudPickler(file, protocol=protocol)
--> 881 cp.dump(obj)
882 return file.getvalue()
883 finally:
~/anaconda2/envs/dask-tutorial/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
266 self.inject_addons()
267 try:
--> 268 return Pickler.dump(self, obj)
269 except RuntimeError as e:
270 if 'recursion' in e.args[0]:
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in dump(self, obj)
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in save_tuple(self, obj)
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
~/anaconda2/envs/dask-tutorial/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "reduce_ex", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "reduce", None)
TypeError: can't pickle _thread._local objects