Giter Club home page Giter Club logo

dask's Introduction

Dask

Build Status Coverage status Documentation Status Discuss Dask-related things and ask for help Version Status NumFOCUS

Dask is a flexible parallel computing library for analytics. See documentation for more information.

LICENSE

New BSD. See License File.

dask's People

Contributors

charlesbluca avatar cowlicks avatar cpcloud avatar crusaderky avatar dependabot[bot] avatar eriknw avatar farrajota avatar fjetter avatar genevievebuckley avatar hendrikmakait avatar j-bennet avatar jacobtomlinson avatar jakirkham avatar jcrist avatar jrbourbeau avatar jsignell avatar madsbk avatar marianotepper avatar martindurant avatar milesgranger avatar mrocklin avatar pavithraes avatar pentschev avatar phofl avatar qulogic avatar rjzamora avatar scharlottej13 avatar shoyer avatar sinhrks avatar tomaugspurger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask's Issues

Memory leak

When I run the large computation at the start of this blog post my process uses up a bunch of memory. All dask.threaded specific diagnostics say that this shouldn't be the case (we seem to be releasing memory appropriately).

I don't really know what's going on here. Using guppy on the python process doesn't report the data. Checking the size of the data in the state['cache'] during execution looks great. Running heavy concurrent HDF5 reads doesn't result in a leak. Turning off HDF5 writing (more often known to cause leaks) doesn't solve the problem. Flushing the file at each write also doesn't do anything.

So in short, I don't know what's going on. Fixing this is probably the largest looming issue.

dask.array.where broken on scalar arguments

In [5]: data = da.from_array(np.array([1, np.nan, 3]), blockshape=(3,))

In [6]: da.where(da.notnull(data), data, 2)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-6-e8628cc9b5ea> in <module>()
----> 1 da.where(da.notnull(data), data, 2)

/Users/shoyer/dev/dask/dask/array/core.pyc in where(condition, x, y)
   1144 @wraps(np.where)
   1145 def where(condition, x, y):
-> 1146     return choose(condition, [y, x])
   1147
   1148

/Users/shoyer/dev/dask/dask/array/core.pyc in choose(a, choices)
   1139 @wraps(np.choose)
   1140 def choose(a, choices):
-> 1141     return elemwise(variadic_choose, a, *choices)
   1142
   1143

/Users/shoyer/dev/dask/dask/array/core.pyc in elemwise(op, *args, **kwargs)
   1067         vals = [np.empty((1,), dtype=a.dtype) if hasattr(a, 'dtype') else a
   1068                 for a in args]
-> 1069         dt = op(*vals).dtype
   1070
   1071     if other:

/Users/shoyer/dev/dask/dask/array/core.pyc in variadic_choose(a, *choices)
   1135
   1136 def variadic_choose(a, *choices):
-> 1137     return np.choose(a, choices)
   1138
   1139 @wraps(np.choose)

/Users/shoyer/miniconda/envs/xray-dev/lib/python2.7/site-packages/numpy/core/fromnumeric.pyc in choose(a, choices, out, mode)
    345     except AttributeError:
    346         return _wrapit(a, 'choose', choices, out=out, mode=mode)
--> 347     return choose(choices, out=out, mode=mode)
    348
    349

TypeError: Cannot cast array data from dtype('float64') to dtype('int64') according to the rule 'safe'

This works with the normal numpy/pandas functions:

In [9]: np.where(pd.notnull(data), data, 2)
Out[9]: array([ 1.,  2.,  3.])

Blaze slicing

We want to implement slicing on chunked dask graphs representing arrays. E.g. if I have a 100 by 100 array, split into 10 by 10 blocks and I compute

>>> x[:15, :15]

Then I want the upper left block completely, and half of the block above and below, and a quarter of the block diagonally lower-right. Note that doing this logic correctly depends not only on the dask, but also in the shape-metadata in the Array object.

Slicing might become complex (there is a lot of potential book-keeping here) but a partial solution is probably good enough for now.

Example

Given a dask Array object like the following

>>> x = np.ones((20, 20))
>>> dsk = {'x': x}
>>> a = into(Array, x, blockshape=(5, 5), name='y')
>>> a.dask
{'y': array([[ 1.,  1.,  1.,  1.,  1 ...
 ('y', 0, 0): (<function dask.array.ndslice>, 'y', (5, 5), 0, 0),
 ('y', 0, 1): (<function dask.array.ndslice>, 'y', (5, 5), 0, 1),
...
 ('y', 3, 2): (<function dask.array.ndslice>, 'y', (5, 5), 3, 2),
 ('y', 3, 3): (<function dask.array.ndslice>, 'y', (5, 5), 3, 3)}
}

and a blaze expression like the following

>>> from blaze import symbol, compute
>>> s = symbol('s', '20 * 20 * int')
>>> expr = s[:8, :8]

We'd like to be able to compute a new dask.obj.Array object with the following dask

def sliceit(x, *inds):
    return x[*inds]

{('y_1', 0, 0): ('y', 0, 0),
 ('y_1', 0, 1): (sliceit, ('y', 0, 1), slice(None, None), slice(None, 3)),
 ('y_1', 1, 0): (sliceit, ('y', 1, 0), slice(None, 3), slice(None, None)),
 ('y_1', 1, 1): (sliceit, ('y', 1, 1), slice(None, 3), slice(None, 3)),
 ...

Note on code complexity

Dask is likely to be refactored. Things like the Array class are experimental and likely to change shape in the future. We want to avoid work when that refactor occurs. Because of this it's nice to keep as much gritty detail work like this independent from the current conventions for as long as possible. E.g. there is likely a std-lib only function that creates a dictionary from pure-python terms (tuples, dicts, names, slices) and then a dask.obj.Array-aware function. This is the approach behind array.top and obj.atop

stack followed by transpose fails

import numpy as np
import dask
import dask.array as da

da.set_options(get=dask.get)
x = da.from_array(np.array([1]), blockshape=(1,))
da.stack([x]).transpose().compute()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-14-2da89db80151> in <module>()
      5 da.set_options(get=dask.get)
      6 x = da.from_array(np.array([1]), blockshape=(1,))
----> 7 da.stack([x]).transpose().compute()

/Users/shoyer/dev/dask/dask/array/core.py in compute(self, **kwargs)
    595     @wraps(compute)
    596     def compute(self, **kwargs):
--> 597         return compute(self, **kwargs)
    598 
    599     __float__ = __int__ = __bool__ = __complex__ = compute

/Users/shoyer/dev/dask/dask/array/core.py in compute(*args, **kwargs)
    430     dsk = merge(*[arg.dask for arg in args])
    431     keys = [arg._keys() for arg in args]
--> 432     results = get(dsk, keys, **kwargs)
    433 
    434     results2 = [rec_concatenate(x) if arg.shape else unpack_singleton(x)

/Users/shoyer/dev/dask/dask/array/core.py in get(dsk, keys, get, **kwargs)
    823     dsk3 = remove_full_slices(dsk2)
    824     dsk4 = inline_functions(dsk3, fast_functions=fast_functions)
--> 825     return get(dsk4, keys, **kwargs)
    826 
    827 

/Users/shoyer/dev/dask/dask/core.pyc in get(d, key, get, concrete, **kwargs)
    111         v = (get(d, k, get=get, concrete=concrete) for k in key)
    112         if concrete:
--> 113             v = list(v)
    114     elif ishashable(key) and key in d:
    115         v = d[key]

/Users/shoyer/dev/dask/dask/core.pyc in <genexpr>((k,))
    109     get = get or _get
    110     if isinstance(key, list):
--> 111         v = (get(d, k, get=get, concrete=concrete) for k in key)
    112         if concrete:
    113             v = list(v)

/Users/shoyer/dev/dask/dask/core.pyc in get(d, key, get, concrete, **kwargs)
    111         v = (get(d, k, get=get, concrete=concrete) for k in key)
    112         if concrete:
--> 113             v = list(v)
    114     elif ishashable(key) and key in d:
    115         v = d[key]

/Users/shoyer/dev/dask/dask/core.pyc in <genexpr>((k,))
    109     get = get or _get
    110     if isinstance(key, list):
--> 111         v = (get(d, k, get=get, concrete=concrete) for k in key)
    112         if concrete:
    113             v = list(v)

/Users/shoyer/dev/dask/dask/core.pyc in get(d, key, get, concrete, **kwargs)
    111         v = (get(d, k, get=get, concrete=concrete) for k in key)
    112         if concrete:
--> 113             v = list(v)
    114     elif ishashable(key) and key in d:
    115         v = d[key]

/Users/shoyer/dev/dask/dask/core.pyc in <genexpr>((k,))
    109     get = get or _get
    110     if isinstance(key, list):
--> 111         v = (get(d, k, get=get, concrete=concrete) for k in key)
    112         if concrete:
    113             v = list(v)

/Users/shoyer/dev/dask/dask/core.pyc in get(d, key, get, concrete, **kwargs)
    122         if get is _get:
    123             # use non-recursive method by default
--> 124             return _get_task(d, v)
    125         func, args = v[0], v[1:]
    126         args2 = [get(d, arg, get=get, concrete=False) for arg in args]

/Users/shoyer/dev/dask/dask/core.pyc in _get_task(d, task, maxdepth)
     59         func, args, results = stack[-1]
     60         if not args:
---> 61             val = func(*results)
     62             if len(stack) == 1:
     63                 return val

/Users/shoyer/miniconda/envs/xray-dev/lib/python2.7/site-packages/toolz/functoolz.pyc in __call__(self, *args, **kwargs)
    214     def __call__(self, *args, **kwargs):
    215         try:
--> 216             return self._partial(*args, **kwargs)
    217         except TypeError:
    218             # If there was a genuine TypeError

/Users/shoyer/miniconda/envs/xray-dev/lib/python2.7/site-packages/numpy/core/fromnumeric.pyc in transpose(a, axes)
    535     except AttributeError:
    536         return _wrapit(a, 'transpose', axes)
--> 537     return transpose(axes)
    538 
    539 

ValueError: axes don't match array

Idea: deferred errors

Sometimes, it's nice to have defensive checks to ensure that the input is as you expect. But with dask, you might not want to evaluate these checks eagerly, because doing so might very expensive. Instead, we could have errors raised only when I finally run dask.compute().

We might allow writing something like:

da.assert_((x > 0).all(), ValueError, 'x must be positive')

The tricky part would be figuring out how to insert an assert statement into the task dependency graph without making a mess of global state.

inline always inlines constants

The docstring and shallow review of the code imply that this is only desired as a default when no keys are given. This doesn't seem to be current behavior.

Here is a failing test.

def test_inline_doesnt_always_collapse_constants():
    d = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b'), 'd': (add, 'a', 'c')}
    assert inline(d, ['b', 'c']) == {'a': 1, 'd': (add, 1, (inc, (inc, 1)))}

Another thought, I wonder if we might want to specify inlining constants separately from the keys=None case. We might want to both inline constants and certain keys. Perhaps a keyword argument?

Reduce I/O by fusing repeated getitems on arrays

Consider the following term

    # x[1000:2000][15:20]
    (getitem,
      (getitem, x, (slice(1000, 2000),)),
      slice(15, 20)

This is equivalent to the following much faster term

    # x[1015: 1020]
    (getitem, x, (slice(1015, 1020),))

This happens a lot when we use from_array and then slice

>>> d = da.from_array(x, blockshape=(1000,))  # First round of getitem
>>> d[1015:1020]  # Second round of getitem

If we can optimize then we cut down a lot on I/O. This is also pretty typical as people block in a bulk way but pull out thin slices. This ends up being very important when slicing into large volumetric arrays (e.g. @shoyer 's work with xray) and gives us a lot of leeway with choosing bad chunk sizes.

I would break this problem into two sub-problems

  1. A function to operate on terms, enacting the transformation shown in the above example, although being aware of multiple slices and possibly integer values as well. This requires attention to detail but is not very complex from a dask perspective. If we can get either of them to dive into these problems again @nevermindewe and @PeterDSteinberg have shown talent here.
  2. A function to apply such a transformation within nested terms (e.g. what happens if this subterm is within yet another function call). This is more in line with previous work done by @eriknw or @jcrist. In particular in relation to recent work on Rewrite Rules #82 how do we cleanly extend traversing terms and applying general callables? What's the best way to enable these transformations?

Optimization to reorder commutative computations

Some function pairs can commute to achieve the same result but with less cost. It would be nice to optimize the graph to commute these functions accordingly.

E.g. if we have (f, (g, x)) and know that this is equivalent-but-slower-than (g, (f, x)) then we might want to make that switch for all such x in the graph.

This is very important when interacting with column-stores like BColz. It's very nice to change from

(getitem, (convert_to_dataframe, my_column_store), 'my-column')

to

(convert_to_dataframe, (getitem, my_column_store, 'my-column'))

This often confers an order of magnitude reduction of I/O. cc @eriknw

Slice with list

Desired functionality is something like the following in the 1d case

>>> dask_slice('y', 'x', (100,), [(20, 20, 20, 20, 20)], ([5, 1, 27],))  # x[[5, 1, 27]]
{('x', 0): (np.concatenate, [(getitem, ('x', 0), [5, 1]), 
                             (getitem, ('x', 1), [7])])}

Though I suspect that there are other complications.

One is if we index with a list with elements that don't access blocks consecutively as in [5, 27, 1]. In this case we'll maybe want to slice each block only once and then reshuffle the results. This could probably be done by sorting the input list [5, 27, 1] -> [1, 5, 27] then doing the concatenate and finally doing another getitem on top of that to reorder the indices (in this case. (getitem, ..., [2, 0, 1])).

I'm also uncertain if this will be much more complex in n-dimensions when we mix with integers and slices.

CCing @nevermindewe in case he has ideas here.

One motivation for this is to support nd-grouping applications as is done in xray (cc @shoyer)

dask.array should import its into

This snippet fails:

import dask.array as da
from blaze import Data, into
x = da.random.normal(10, 0.1, size=(10, 10), blockshape=(5, 5))
d = Data(x)  # this line bombs

but, this snippet works.

import dask.array as da
from dask.array.into import into
from blaze import Data, into
x = da.random.normal(10, 0.1, size=(10, 10), blockshape=(5, 5))
d = Data(x)  # this line bombs

It looks like the only way dask.array's Array type gets registered to discovery is when dask/array/into.py is executed. But, it is not imported in dask/array/init.py.

Thus, I think we should:

--- __init__.py 2015-02-21 08:25:20.000000000 -0500
+++ __init__-updated.py 2015-02-21 10:43:51.000000000 -0500
@@ -8,3 +8,4 @@
 from .reductions import (sum, mean, std, var, any, all, min, max, vnorm)
 from . import random, linalg
 from .wrap import ones, zeros, empty
+from . import into

Linspace and arange functions

Functions like linspace and arange might be interesting problems to work out for people new to dask graphs. They would be useful to have too.

Support for __len__

NumPy enables __len__ to return the shape at axis 0. If the array is a dimensionless scalar, it will fail with TypeError: len() of unsized object. Hopefully, this is an easy add.

median/nanmedian

I'm not entirely sure what algorithms for out of core median look like, but if this is relatively straightforward it would be a nice addition to dask.

into -> odo name change

It looks like "into" has become "odo" with some small api changes (link). Into is still available but dask should probably switch to odo. This looks like it will be pretty easy, grep -r into . | wc -l shows only 39 instances of into.

Rules for automatic reblocking

xref #58, #60

Automatic reblocking in dask operations will be a super useful follow-on to manual reblock functionality. Here are my initial thoughts. For now, I'm going to simplify things by assuming that each array has a fixed blockshape (in practice, we could use the mean of the blockdims along each axis).

  1. For operations where one argument has a strictly larger blockshape (e.g., (100, 100) and (1000, 1000)), we probably want to reblock to the larger blockshape to minimize growth of the scheduling graph.
  2. For operations with differently shaped blockshapes (e.g., (100, 10000) and (10000, 100)) but comparable chunk sizes let's consider using the geometric mean of the blockshape along each dimension. This has the nice property of conserving the size of each chunk, and I suspect it would also minimize total amount of concat/slicing work necessary for the reblocking itself. For this example, this yields a blockshape of (1000, 1000).
  3. When given arrays with block shapes (10000, 1) and (1, 10000), we probably don't want chunks of shape (100, 100) -- that's too small. So shapes calculated from the geometric mean should probably be scaled up to ensure some minimum size (e.g., one million elements, or the largest chunk size currently in the dask).

"Uncut" dimensions deserve special consideration, because they are quite likely to arise in practice, both from array broadcasting (they'll have length 1) and from coercing fresh numpy arrays. But I think they are covered reasonably well by these rules.

For example, consider multiplying arrays with shapes/blockshapes (100, 1, 1) and (1, 1000, 1000). With the geometric mean, we get about 10000 chunks of shape (10, 32, 32), which after the scaling adjustment leaves us with about 100 chunks of size (47, 147, 147).

Detach from mrocklin/dask

First I'd like to thanks @mrocklin for discussion on dask.array yesterday, I feel like the readme does not emphasis this part of the project. But that's my point of view.

Also it seem that this is the main repository, though it's still appear as a fork of mrockling/dask which decrease the visibility bias the statistics. My I suggest asking GitHub to "detach" this repository as described in their help section ?

Stable parallel computation of moments

There probably exist parallel algorithms to compute moments like variance, skewness, etc..

Our current solution for variance is numerically unstable. It would be nice to replace it with something. If that something happened to also include general moments that would be really nice. This task seems intellectually challenging but somewhat simple from an implementation point of view. It might be good for an ambitious new contributor.

Dask.array dtype tracking to match numpy

Dask.arrays don't currently track dtype metadata. The dtype property actually has to compute a little bit of the array. For some computations this can be expensive. Perhaps we should add a new field to the dask.array.core.Array class and track dtype information through all operations. This will require us to replicate the numpy dtype coercion/promotion rules precisely which might get tricky.

At first glance this doesn't seem trivial.

Slice with None / newaxis

Desired functionality

>>> dask_slice('y', 'x', (100,), [(20, 20, 20, 20, 20)], (slice(10, 35)))   # Current behavior
{('y', 0): (getitem, ('x', 0), (slice(10, 20),)),
 ('y', 1): (getitem, ('x', 1), (slice( 0, 15),))}

>>> dask_slice('y', 'x', (100,), [(20, 20, 20, 20, 20)], (slice(10, 35), None))   # Adding None gives extra output dimensions
{('y', 0, 0): (getitem, ('x', 0), (slice(10, 20),)),
 ('y', 1, 0): (getitem, ('x', 1), (slice( 0, 15),))}

My hope is that this is actually pretty simple. it's probably just inserting an [0] appropriately into the calls to itertools.product in dask_slice

Reblock

We would like to change the block structure of a dask array. This is particularly important when two dask arrays with differing block structures need to interact. One way to do this is to make the following function

def reblock(x, blockdims):
    ...

>>> x.blockdims
((10, 10, 10, 10, 10),)

>>> y = reblock(x, ((15, 15, 15, 5),))
>>> y.blockdims
((15, 15, 15, 5),)
>>> (x.compute() == y.compute()).all()
True

This looks a bit like a simpler version of slicing. I think it makes sense to copy @nevermindewe's approach of making a _slice_1d function for each dimension of blockdims and then slicing and concatenating as necessary.

A very simple common case is the promotion of numpy arrays to interact with dask arrays. A clean solution to this would be to define Array.__array_wrap__ to call da.from_array(x, blockshape=x.shape) and then lean on reblocking to define the block structure after the fact in the elemwise call. This came up in conversation with @shoyer.

dask.array.concatenate

Really two issues here

  1. We should implement a concatenate function that concatenates many dask.Array objects into a single one. This is relatively straightforward.
  2. We need to think about how users should specify collections of arrays as a single dask.Array

For example given a directory of netCDF files, each with several datasets, we might want something magical like the following

into(da.Array, '/data/myfile.*.h5::/data/path/to/X', blockshape=(1, 1000, 1000))

Or maybe this is too magical. Maybe we need to have a more verbose function that allows users to specify their intent more precisely. One approach with concatenate would be

uris = [fn + '::/data/path/to/X' for fn in sorted(glob('./data/myfile.*.h5'))]
arrays = [into(Array, uri, blockshape=(1000, 1000)) for uri in uris]
arr = da.stack(arrays, axis=0)

I'd value @shoyer 's opinion here.

Naive use of da.reblock with blockshape fails

A bit of follow on work for reblocking. The following lines of reblock code seem likely to be typed in by users when they assume that they can provide a blockshape to reblock without a keyword. Currently this experience yields an uninformative error.

In [1]: import dask.array as da

In [2]: x = da.ones((4, 4), blockshape=(1, 4))

In [3]: y = x.reblock((4, 1))
     28     return [tuple(zip((const,) * (1 + len(bds)),
     29                       list(accumulate(add, (0,) + bds))))
---> 30               for bds in blockdims ]
     31 
     32 

TypeError: object of type 'int' has no len()

Probably we should do one of the following

  1. Inspect the first input and decide if we think it's a blockshape (tuple of ints) or a blockdims (tuple of tuples of ints) argument
  2. Raise an informative error.

cc @PeterDSteinberg

Batch or online learning algorithm

We should start to develop some basic machine learning capability in dask.array. In principle any algorithm in scikit-learn with a partial_fit method should be amenable to blocked computation. It might be interesting to find the simplest such algorithm and then consider how we might build a dask graph on a tall-and-skinny dask.array.

Task Fusion

We want an optimization pass that fuses linear sequences of tasks. Note the disappearance of 'w' and 'y'

>>> d = {'v': (inc, 'y'),
...      'u': (inc, 'w'),
...      'w': (inc, 'x'), 
...      'x': (inc, 'y'), 
...      'y': (inc, 'z'), 
...      'z': (add, 'a', 'b'), 
...      'a': (inc, 'c'), 'b': (inc, 'd'),
...      'c': 1, 'd': 2}
>>> fuse(d)
{'u': (inc, (inc, 'x')),
 'x': (inc, 'y'),
 'v': (inc, 'y'),
 'y': ((inc, (add, 'a', 'b'))), 
 'a': (inc, 'c'), 'b': (inc, 'd'),
 'c': 1, 'd': 2}

We only want to do this when the task has one real dependency. We don't want to destroy potential parallelism.

@eriknw this seems up your alley. From my perspective this just became relatively high priority. I might take a crack in the next few days so do let me know if you take it on.

CyToolz support

I tried to install CyToolz as a dependency, but it appears that you use Toolz. Do you find Toolz better than CyToolz in your case or would CyToolz work just as well? If it is the former, could you share your insight with us? If it is the latter, could we have CyToolz support, as well? Thanks.

Merge sorted sequences of sorted numpy arrays

In the near future we may want to implement an external sort. One missing piece in our ecosystem is an out-of-core merge sorted operation for sequences of numpy arrays. E.g. given

  1. A few iterators of sorted numpy arrays in sorted order (e.g. a list of iterables of numpy ndarrays)
  2. An on-disk store supporting numpy setitem syntax (e.g. hdf5, bcolz)

We want to walk through each iterator, pull off a numpy array from each, and then perform a merge sorted operation (see toolz api for example) on these blocks, building up an intermediate block which we periodically push off on to the external store. In principle this should be almost O(N) and probably I/O bound.

It is safe to assume that having one block from each of the iterators still fits in memory.

We will also need the original index of the values.

One approach might be to just use cytoolz.merge_sorted. I assume that this is slower than a straight C/Cython solution operating on binary arrays but it's worth checking out.

This seems like the kind of thing that might interest @eriknw @cpcloud @nevermindewe

Reblocking along some subset of axes

For the user-facing reblock API in xray, I would like to be able to reblock along only some subset of axes, e.g., ds.reblock(blockshape={'time': 100}) should only modify blockdims along the time dimension, even if there are also latitude and longitude dimensions.

This looks like a little tricky to do with the current reblock API.

Two options:

  1. Expose blockdims_from_blockshape in the public API. That way, I can calculate blockdims myself, filling in ones I don't want to change.
  2. Support providing only some axes to blockdims and blockshape in reblock. For example, to indicate that only axis 0 should be reblocked but others should be left alone (in a 3D array), we could write blockshape=(100, None, None) or blockshape={0: 100}.

Thoughts?

CC @PeterDSteinberg

dask should not assume indexing an input array creates a numpy.ndarray

Consider:

import dask.array as da
import dask
import numpy as np

class Wrapper(object):
    def __init__(self, array):
        self.array = array
        self.shape = array.shape
        self.dtype = array.dtype

    def __getitem__(self, key):
        return Wrapper(self.array[key])

    def __array__(self, dtype=None):
        return np.array(self.array, dtype=dtype)

x = da.from_array(Wrapper(np.arange(5)), blockshape=(5,))
with dask.set_options(get=dask.get):
    (x + 1).compute()
TypeError: unsupported operand type(s) for +: 'Wrapper' and 'int'

Many wrappers classes created numpy arrays when indexed (it's often the simplest thing to do), but this is by no means always the case (e.g., consider wrapping an alternate implementation of dask arrays).

Thus, for the purposes of arithmetic, at least, input should be coerced to numpy arrays using np.asarray. It is probably a good idea to do this on all arrays as soon as the computation gets rolling, though this might have some unfortunate complications for fused indexing (#103).

Optimization to remove aliases

Sometimes we create dasks with aliases

{'a': 1, 'b': 'a', 'c': (inc, 'b')}

In these cases it would remove some work from the schedulers if we could short circuit the unnecessary alias

{'a': 1, 'c': (inc, 'a')}

Global or context managed threadpools

Sometimes, it's difficult to avoiding doing a bit of blocking computation (e.g., to calculate dtypes #64).

With the current implementation of dask, it takes about 100 ms to start up a threadpool to start executing tasks, unless I manually specify the basic get method. I suspect we could eliminate most of this delay if we only created the threadpool once.

Two possible solutions:

  1. Create a global threadpool the first time the relevant compute method is used. This is a little unclean, but would be highly practical. I think numexpr does something like this.

  2. Create a context manager to mark blocks of computation that should use the same pool, e.g.,

    with dask.pooled:
        assert x.dtype == float
        assert (x > 0).all()
        x.store(dest)

batched consumption of dask.bag

Migrated from mrocklin fork. Originally posted by @phrrngtn

How do you suggest consuming a dask.bag in chunks of N? My particular use-case is writing/merging cooked output to the database (which I want to be done by a single writer to avoid unnecessary contention at the database). I want something that iterates over the bag (ideally gathering back whatever results are available for whatever partition) and returns a block/batch that in turn is iterable to get at the items themselves. E.g.

for batch in bag.batch(N):
    e.execute(q, batch)

where e is a sqlalchemy engine, q is a sqlalchemy insert query.

I have hacked together something similar with itertools.islice on top of ipython parallel.
http://ipython.org/ipython-doc/dev/parallel/asyncresult.html#map-results-are-iterable

how to incorporate hierarchical caching into dask?

Migrated from mrocklin fork. Originally posted by @phrrngtn

[another feature request coming out of sidebar conversation with mrocklin]

apologies for the very half-baked suggestion. I have not worked with dask enough to identify clearly what it is that I think I want!

Use case: I want complicated dasks on immutable/append-only data to re-use previously computed results if they are available.

hierarchical means that dask sub-trees can be cached (not just the whole thing). OK with cache busting if the scheduler/optimizer picks a different tree. Assume that cache configuration has to be tied to the scheduler (i.e. the thing that runs the dask rather than the dask itself?)

Add `numpy.where` equivalent

If I'm just missing this, I'm sorry I didn't catch it and am sorry to trouble you over it. If not, my proposal is below. Please let me know your thoughts. Also, I don't know all the details of your system. So, if there is something I have missed, please let me know.

I think it would be really cool to have a numpy.where type function in dask, especially, as the arrays themselves cannot be set. Though, using a numpy.where approach maybe setting in certain context would be acceptable, as well. I'm thinking this could just behave as a simple branch that is inserted into the the graph. Additionally, if the condition is trivial like a single bool, this could result in removal of the unused portion of the graph.

Create multiple output arrays with map_blocks

If a user provides a map function that produces multiple outputs, e.g.

def f(block):
    return block - 1, block + 1

Then they should be able to specify that somehow in the call to mapblocks

a, b = x.map_blocks(f, blockshape=[(10, 10), (10, 10)])

and receive two dask Array objects that share a common dask graph.

Remove dask.array.into, dask.bag.into, and dask.array.blaze

Remove into modules once they have been moved to Odo and remove the blaze module once it has been added to Blaze.

  • array.into in odo
  • array.into out of dask
  • bag.into in odo
  • bag.into out of dask
  • dask.array.blaze in blaze
  • dask.array.blaze out odask

Cull unnecessary tasks

Consider the following dask

In [2]: def inc(x): return x + 1

In [3]: d = {'x': 1, 'y': (inc, 'x'), 'z': (inc, 'x'), 'out': (add, 'y', 10)}

mydask

If we go ahead and get out the out key

>>> from dask.threaded import get
>>> get(d, 'out')

We end up doing excess work computing 'z'.

It would be nice to have a function that takes a dask and a key or list of keys and removes all tasks not necessary to compute those keys.

>>> cull(d, ['out'])
{'x': 1, 'y': (inc, 'x'), 'out': (add, 'y', 10)}

mydask

dtype promotion rules are not quite right for element-wise ufuncs

Many ufuncs actually always converge this argument to float, e.g., consider:

In [13]: values = np.random.randint(10, size=(4, 6))

In [14]: data = da.from_array(values, blockshape=(2, 2))

In [15]: data.dtype
Out[15]: dtype('int64')

In [16]: da.sin(data).dtype
Out[16]: dtype('int64')

In [17]: da.sin(data).compute()
Out[17]:
array([[ 0.14112001,  0.14112001,  0.14112001,  0.90929743,  0.41211849,
         0.        ],
       [ 0.6569866 ,  0.14112001, -0.2794155 , -0.2794155 ,  0.14112001,
         0.90929743],
       [ 0.        ,  0.90929743,  0.84147098, -0.7568025 ,  0.98935825,
         0.90929743],
       [-0.95892427,  0.90929743,  0.14112001,  0.14112001,  0.        ,
         0.        ]])

slices with stop=0 give incorrect result

In [4]: import numpy as np

In [5]: import dask.array as da

In [6]: da.ones(10, blockshape=(10,))[:0].compute()
Out[6]: array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.])

In [7]: np.ones(10)[:0]
Out[7]: array([], dtype=float64)

KeyError when running stack/concatenate

Below is how I ran into the error with the resulting traceback. I am using Dask version 0.2.6, Python 2.7.6, and NumPy 1.7.0. For what its worth, I also tried calling c = dask.array.stack([a, b], axis=0), but it made no difference. Also, I tried c.compute(), but this had the same result. Lastly, had this result with dask.array.concatenate, as well, but with a slightly different error ( KeyError: ('concatenate-1', 0, 0) ) otherwise the traceback was the same. Repetition in either case results in the number in the key name increasing (e.g. concatenate-2). Calling numpy.array(a) or numpy.array(b) behaved correctly.

>>> import dask
>>> import dask.array
>>> import numpy
>>> 
>>> a = dask.array.empty(shape=(10,10), blockshape=(10,10), dtype=float)
>>> b = dask.array.empty(shape=(10,10), blockshape=(10,10), dtype=float)
>>> c = dask.array.stack([a, b])
>>> 
>>> numpy.array(c)

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-1-646ff5d0cb10> in <module>()
      7 c = dask.array.stack([a, b])
      8 
----> 9 numpy.array(c)

/xopt_ilastik/local/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/dask-0.2.6-py2.7.egg/dask/array/core.pyc in __array__(self, dtype, **kwargs)
    402 
    403     def __array__(self, dtype=None, **kwargs):
--> 404         x = self.compute()
    405         if dtype and x.dtype != dtype:
    406             x = x.astype(dtype)

/xopt_ilastik/local/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/dask-0.2.6-py2.7.egg/dask/array/core.pyc in compute(self, **kwargs)
    437 
    438     def compute(self, **kwargs):
--> 439         result = get(self.dask, self._keys(), **kwargs)
    440         if self.shape:
    441             result = rec_concatenate(result)

/xopt_ilastik/local/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/dask-0.2.6-py2.7.egg/dask/array/core.pyc in get(dsk, keys, get, **kwargs)
    629     dsk3 = inline_functions(dsk2, fast_functions=fast_functions)
    630     dsk4 = dsk3
--> 631     return get(dsk4, keys, **kwargs)
    632 
    633 

/xopt_ilastik/local/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/dask-0.2.6-py2.7.egg/dask/threaded.pyc in get(dsk, result, nthreads, cache, debug_counts, **kwargs)
     46         results = get_async(pool.apply_async, nthreads, dsk, result,
     47                             cache=cache, debug_counts=debug_counts,
---> 48                             queue=queue, **kwargs)
     49     finally:
     50         pool.close()

/xopt_ilastik/local/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/dask-0.2.6-py2.7.egg/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, debug_counts, queue, **kwargs)
    440             raise type(res)(" Execption in remote Process\n\n"
    441                 + str(res) + "\n\nTraceback:\n" + tb)
--> 442         finish_task(dsk, key, res, state, results)
    443         while state['ready'] and len(state['running']) < num_workers:
    444             fire_task()

/xopt_ilastik/local/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/dask-0.2.6-py2.7.egg/dask/async.pyc in finish_task(dsk, key, result, state, results)
    257 
    258     for dep in state['dependents'][key]:
--> 259         s = state['waiting'][dep]
    260         s.remove(key)
    261         if not s:

KeyError: ('stack-1', 0, 0, 0)

Improve Chest

Sometimes we need a cache that spills to disk.

>>> from chest import Chest
>>> cache = Chest()
>>> dsk = ...
>>> dask.threaded.get(dsk, keys, cache=cache)

My current solution to this is Chest an object that satisfies the MutableMapping interface that spills large contents to disk as pickle files. When using this in production it worked, but not great. We need something better.

One solution would be to improve chest. A first step there would be to build a dask-like workflow to test chest and see what's falling down. Is it concurrency, the lack of an LRU mechanism, the O(n) things we do on each spill to disk?

Another solution would be to look around and find something better, possibly shove or something else in the ecosystem (or something new altogether.)

On disk caching will probably be necessary.

Synchronize keys between two dasks

Given two dasks

d = {'a': 1, 'b': (add, 'a', 10), 'c': (mul, 'b', 5)}
e = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}

We may want to merge them in a way that is blind to renamings of variables.

>>> merge_sync(d, e)
{'a': 1, 'b': (add, 'a', 10), 'c': (mul, 'b', 5), 'z': (mul, 'b', 2)}

I think that @eriknw may have brought up this idea before. I now have a driving use case for it and find it very valuable.

I'm not sure of the best way to design this. It might be best to have a merge function like above or perhaps just a synchronize names function that is orthogonal to merging.

consolidate testing utilities

The same eq is redifined and used for testing dask arrays in a bunch of separate modules. We should put it in its own module of testing utilities.

$ grep -ri "def eq" .
./array/tests/test_creation.py:def eq(a, b):
./array/tests/test_ghost.py:def eq(a, b):
./array/tests/test_array_core.py:def eq(a, b):
./array/tests/test_into_array.py:def eq(a, b):
./array/tests/test_percentiles.py:def eq(a, b):
./array/tests/test_chunk.py:def eq(a, b):
./array/tests/test_reductions.py:def eq(a, b):
./array/tests/test_blaze.py:def eq(a, b):
./frame/tests/test_frame.py:def eq(a, b):

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.