spartan-array / spartan Goto Github PK
View Code? Open in Web Editor NEWDistributed Numpy
Home Page: http://spartan-array.github.io/spartan/
License: Other
Distributed Numpy
Home Page: http://spartan-array.github.io/spartan/
License: Other
When I dot product a matrix with vector, in the dot mapper, I found the fetch is slow even it's a local fetch. The reason is in np.all in TileExtent.get:
if self.mask is None or np.all(self.mask[subslice]):
if self.mask is not None:
self.mask = None
#util.log_info('%s %s %s', self.data, self.mask, subslice)
return self.data[subslice]
Suppose the tile has shape of (N, N), the time complexity of multiplying it with a vector of shape (N,) is O(N^2)
, but in the local fetch, it spends O(N^2)
to check the mask array.
Specifically, in my test, I spent 0.4 seconds in dot mapper, but I spent 0.12 seconds in the local fetch.
If we assume once all the elements of the tile are initialized, it can not be masked again, we can optimize it in this way:
if self.mask is None or np.all(self.mask[subslice]):
# means all elements of this tile is initialized.
if self.mask is not None and shape(subslices) == self.shape:
self.mask = None
#util.log_info('%s %s %s', self.data, self.mask, subslice)
return self.data[subslice]
So once we know all the elements are initialized, we won't call np.all to check mask array anymore.
This will improve the dot performance from 0.4 seconds to 0.28 seconds in my test.
I'm not sure if this optimization is necessary for following reasons:
Use something better than cPickle for serializing important messages like updates and fetches.
Matrix multiply should have it's own expression type.
Right now, it forces the execution of the 2 inputs in order to determine their shape information, and then uses the shuffle
operator. This makes it impossible to optimize through, which we'll want to do in the future.
We should pull it out into a separate file, and make it an expression node.
This depends on #18; we should have an optimization which fuses dot product with a reduction; this will allow us to write applications like k-means in a natural way:
closest = argmin(dot(pts, clusters) - sum(clusters**2, axis=1))
All assert methods should have the following format:
def eq(a, b, msg='', *args):
assert a == b, 'Test (a == b) failed. a: %s, b: %s. %s' % (a, b, msg % args)
This avoids evaluating message arguments in the common case where the assert passes. (Theoretically, Python won't evaluate the right half of the assert statement unless the left half fails).
When we have a crash now, it always shows up in the evaluation; this isn't always very useful for debugging.
What would be better is a traceback showing where the expression that failed was created. We'll need to capture the traceback: (traceback.format_stack()) when an expression is first created, and keep track of it whenever an expression is copied (for optimizations).
The following will cause an error:
import spartan as S
S.initialize()
x = ones((100,))
y = log(x) + log(x)
y.optimized().optimized().glom()
The second optimization pass ends up building an incorrect LocalExpr
.
Workers should send a heartbeat to the master, or vice-versa. Too many missing heartbeats indicates a worker failure.
Transpose should not make a copy of the input array. Instead, it should wrap the input with a converter which swaps rows and columns for fetching, like Broadcast:
class Transpose(DistArray):
def __init__(self, base, dim_shuffle):
self.base = base
self.dim_shuffle = dim_shuffle
def fetch(self, ex):
ex_t = extent.create(ex.ul[dim_shuffle], ex.lr[dim_shuffle], ...)
return self.base.fetch(ex_t)
Cluster mode breaks IPython and Python repls because of FileWatchdog
, which runs os._exit(1)
if anything is observed on stdin or stderr (IPython uses stdin for input).
Ideally, Spartan would exit when the interactive mode ends, i.e. the IPython and Python repls exit.
Another option is to implement signals (e.g. ctrl-C
and ctrl-D
), but IPython uses those too, so we need a way for IPython to forward requests.
Also, 5 tests are skipped when running all tests in cluster mode.
When i'm defining a matrix like this:
a=np.array([[0,1,1,0],
[1,0,0,1],
[1,0,0,1],
[1,1,0,0]],dtype=float)
Then, defining a function:
def PageRank(mat):
diff_sum=Inf
point_num=mat.shape[0]
pr=sp.ones((point_num,1),float)
pr=pr/point_num
for i in range(n_point):
cal=0
for j in range(n_point):
cal+=mat[j,i]
for j in range(n_point):
mat[j,i]/=cal
for cal_times in range(10):
mat.glom()
print (expr.dot(mat,pr)).glom()
new_pr=(expr.dot(mat,pr))DI+pr(1-DI)
diff=sp.sub(new_pr, pr)
diff_sum=sp.sum(diff)
print diff_sum.glom()
#debug(diff_sum)
pr=new_pr
#debug(pr)
return pr
when I'm doing PageRank(a), the result can't use the function glom() ( #res.glom() is error# )
The errors:
Traceback (most recent call last):
File "/home/stmatengss/workspace/testPy/pageRankMy.py", line 90, in
res=PageRank(buff)
File "/home/stmatengss/workspace/testPy/pageRankMy.py", line 41, in PageRank
print (expr.dot(mat,pr)).glom()
File "/home/stmatengss/spartan/spartan/expr/operator/base.py", line 502, in glom
return glom(self)
File "/home/stmatengss/spartan/spartan/expr/operator/base.py", line 657, in glom
value = evaluate(value)
File "/home/stmatengss/spartan/spartan/expr/operator/base.py", line 685, in evaluate
return node.evaluate()
File "/home/stmatengss/spartan/spartan/expr/operator/base.py", line 299, in evaluate
value = self._evaluate(ctx, deps)
File "/home/stmatengss/spartan/spartan/expr/operator/map.py", line 323, in _evaluate
sparse=(arrays[0].sparse and arrays[1].sparse))
We should try to organize our various Cython code a bit.
@MaggieQi, can you migrate sparse_multiply and sparse_update to array/sparse.pyx?
For numpy.ma.MaskedArray, True means marked. Therefore we have different result from numpy for following example:
a = expr.arange((100,))
a = eager(a)
a = a[a > 50]
In order for us to do slice+map fusion, we'll need broadcast objects to support mapping over tiles.
@fegin, I think we talked about this previously. I'll be working on getting the optimization going, if you can look at getting mapping over a broadcast object working?
The SSH watchdog doesn't always work for some reason.
Since we are sending worker heartbeats anyway, we can use them to determine if the master is down, and terminate the worker.
We should generalize the array writing expression to support writing an arbitrary slice of the target array. Copying a numpy array should become a special case of this.
e.g. we should support these 3 operations at least:
# "update" x with data
x = write(x, slice, data)
# copy a numpy array to a distarray
x = to_distarray(numpy_array)
# "update" x with a distarray
x = write(x, slice, some_distarray[slice])
I'm starting this issue to track our documentation progress. We're much better at having code documentation than we were in the past, but we still need to do quite a bit of work on things like the tutorial: https://github.com/spartan-array/spartan/wiki/Tutorial.
The current implementation of arange
can be confusing to users of NumPy. From the docs (spartan/expr/builtins.py
), spartan.arange is equivalent to np.arange(np.prod(shape)).reshape(shape)
. This gives an unfamiliar interface for users expecting the standard np.arange([start], stop[, step], dtype=None)
interface.
As a temporary fix, I'm adding spartan.arange1d(start, stop, step)
to prevent breaking tests.
After talking with @fegin, we think the existing spartan.arange
should be renamed to something more suitable. Then arange1d
would be renamed to the familiar arange
.
When we have arrays with more than a few hundred tiles, I've noticed that our performance drops significantly; this is almost certainly due to the various extent operations needed to compute tiles. We can move the extent code to Cython which would give us a big speedup.
Also, the vast majority of arrays have tiles that are all the same shape; we can leverage this to avoid scanning a tile list, and instead use the tile shape to find the target tile, e.g.
pos_to_tile(pos, tile_shape):
tx = pos[0] / tile_shape[0]
ty = pos[1] / tile_shape[1]
...
num_tiles_x = array.shape[0] / tile_shape.x
return ty * num_tiles_x + tx
What exactly is the difference between the two? Is it just that we send the map request to a subset of the workers?
It needs some commenting to make it clear what's going on.
We'd like to be able to support many common machine learning operations. Fortunately, scikit-learn has implemented many of these (for single machine operations). If we can implement most of the common scikit-learn operations, we'll be in a good place.
The full list of algorithms is here: http://scikit-learn.org/stable/user_guide.html
Let's update this as we add features:
When I try to add optimization into some applications, I find that there are something wrong for map and shuffle fusion. The example is as follows:
In [5]: x = expr.ndarray((10,), dtype=np.float, reduce_fn=np.add)
In [6]: y = expr.ones((10,10))
In [7]: expr.shuffle(y, lambda array, ex: [(extent.index_for_reduction(ex, axis=1), array.fetch(ex).sum(axis=1))], target=x).optimized().force()
Out[7]: DistArrayImpl(id=75153680, shape=(10,), dtype=float64)
In [8]: x.glom()
Out[8]:
array([ 6.92288859e-310, 6.92288859e-310, 6.92288859e-310,
6.92288859e-310, 6.92288859e-310, 6.92288859e-310,
6.92288859e-310, 6.92288859e-310, 6.92288859e-310,
6.92288859e-310])
In [9]: expr.shuffle(y, lambda array, ex: [(extent.index_for_reduction(ex, axis=1), array.fetch(ex).sum(axis=1))], target=x).force()
Out[9]: DistArrayImpl(id=75154064, shape=(10,), dtype=float64)
In [10]: x.glom()
Out[10]: array([ 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.])
I think when we try to set a target for shuffle, we cannot fusion the target create map with this shuffle since we will use this target in the future. Moreover, can we add this shuffle expr as a dependency of expr that uses this target? Then we will not need to force it each time we use this kind of shuffle.
Currently eager discards the expression graph, which makes re-computation impossible on failure. It should keep the expression graph somehow.
We have number of benchmarks for performance testing.
We need to combine these into a performance regression test so we can ensure changes aren't hurting performance. Ideally, we'd run a smaller version of each benchmark and print out something like:
linear-regression,1.2,0.5,2.5
k-means,10,10,1.0
...
The baseline is the performance as measured when we first write the test. If the factor between the current time and the baseline becomes too large, we should print something loud or throw an exception.
We currently are converting both inputs for the _dot_mapper to sparse matrices before multiplying.
We should have a special case to dense*dense matrices to avoid converting.
Add support for a checkpoint expression or operation, which saves array state to disk.
There's an existing library called traits (http://docs.enthought.com/traits/traits_user_manual/index.html) which supports almost everything we're currently using node_type for. It's also implemented in Cython, so it should be faster. It also supports type checking, which is nice. Translation is pretty straightforward. Instead of:
class Expr(object):
_members = [..]
we use
from traits.api import HasTraits, Int, Float, String, Tuple
class Expr(HasTraits):
expr_id = Int()
stack_trace = Trait(ExprTrace)
Who's up for converting all of the node_type
classes?
Changing any of the FLAGS
configuration options in one test affects those options for all subsequent tests.
A proposed solution is for ClusterTest
in tests/test_common
to save the current state of FLAGS
before running a test and then restore the previous state after the test has completed.
We currently track references to whole arrays or expressions at a time. Expressions are tracked using an explicit reference counting strategy (in expr/base.py
); when an array is deleted, we add all of it's tiles to a cleanup list to be destroyed whenever a new array is created later.
Certain operations like Cholesky factorization can end up creating new arrays with mostly the same content. If we can somehow allow arrays to share tiles, then we can support these types of operations efficiently, and still retain the ability to do things like checkpoints nodes.
http://en.wikipedia.org/wiki/Cholesky_decomposition#The_Cholesky_algorithm
Can we add support for running operations on GPU arrays?
RIght now we rely on local numpy primitives to implement tile level operations; if we wanted to use GPU arrays, we would likely need to implement operations ourselves instead so we could generate CUDA code as necessary. This is easy for simple operations like add
or multiply
, but much harder if the user wants to do:
def f(data):
# some python code
return new_data
Y = map(f, X)
Here we would need to analyze arbitrary Python code. I'm not sure how to handle this case without some serious amount of work. It might be possible to capture the internal operations using a lazy expression:
input = lazy_expr()
result = f(input)
Now the result will be the cumulative effect of any internal user operations. But this relies on the user not requiring things like control flow, or trying to print a result.
The README uses "Sparrow", but "Spartan" is used everywhere else. I'm just wondering how I should refer to this project.
We have a bunch of local operations that register data or query with the master:
Since these aren't actually RPC's; it's confusing that they're in blob_ctx
. At the very least, we should have something like master.get()
to return a reference to the master itself; then these operations can just talk to the master directly.
The corresponding Message
s should be pulled out of core.pyx
as well.
We currently defined the reduction/accum functions on arrays themselves, but this is kind of non-intuitive: what I really want is for the reduction to be attached to the operation, so I could say:
x = zeros()
scatter(src, target=x map_fn=..., accum_fn=np.add)
We're currently using the not_idempotent
decorator to mark certain functions as not being fusable. This works, but it's a little tricky to check all of the situations.
(For example, if caching is disabled, we still need a fallback to avoid evaluating non-idempotent operations more than once).
Should we instead just make "expressions must be deterministic" be the rule, and handle non-deterministic operations by forcing them immediately?
Distarray references are getting cleared from the evaluation cache, but a reference is getting held by register_blob which prevents it from getting deleted. I "fixed" this by disabling register_blob, but it should be fixed correctly soon.
You can see the leak by running many iterations of the linear regression benchmark
Do we have an API for logical operations or
and and
?
TypeError: unsupported operand type(s) for &: 'MapExpr' and 'MapExpr'
sp.scan giving incorrect behavior.
In [0]: sp.scan(sp.ones((2, 5)), scan_fn=np.cumsum, 1).glom()
Out[0]: array([[1., 2., 2. , 3., 3.],
[1., 2., 2. , 3., 3.]])
In [1]: np.cumsum(np.ones((2, 5)), 1)
Out[1]: array([[1., 2., 3., 4., 5.],
[1., 2., 3., 4., 5.]])
When we create a new distarray, we currently fetch one tile from the array in order to figure out the dtype of the array. This is expensive when tiles are big, so we should instead have an operation like:
Worker:
tile_op(fn, blob_if): return fn(tile, blob_id)
(This is sort of a single element map). This will speed up creating new arrays.
We're using a default timeout of 60 seconds for RPC requests, which is annoyingly long for some operations and too short for others.
We should make the RPC timeout configurable.
Hey Russell,
I'm reading the kmeans code in our example, while I'm reading the code, I got some questions about the code and I'm not sure if it's I understand our code wrong or it's the problem of the code.
for i in range(10):
_ = expr.shuffle(pts,
_find_cluster_mapper,
kw={'d_pts' : pts,
'old_centers' : centers,
'new_centers' : new_centers,
'new_counts' : new_counts })
_.force()
new_centers = expr.eager(new_centers / new_counts)
First is in the for loop, we always use the the old centers in the shuffle kernel, we never swap the old centers with new centers.
def _find_closest(pts, centers):
idxs = np.zeros(pts.shape[0])
for i in range(pts.shape[0]):
min_dist = 1e9
min_idx = 0
p = pts[i]
for j in range(len(centers.shape)):
c = centers[j]
dist = np.dot(p, c)
if dist < min_dist:
min_dist = dist
min_idx = j
idxs[i] = min_idx
return idxs
Second is in _find_closest, specifically it's for j in range(len(centers.shape))
.
Here I think you want to compute the distance between the point with each center of the cluster, so probably we should use for j in range(centers.shape[0])
since centers is an array of shape (n_clusters, n_features). len(centers.shape)
will always return 2.
new_centers.update(extent.from_shape(new_centers.shape), l_centers)
new_counts.update(extent.from_shape(new_counts.shape), l_counts)
Third is that we update the new_centers and new_counts with I_centers and I_counts. Because I_centers and I_counts are arrays of shape (n_clusters, n_features/1) and the new_centers and new_counts don't have a reduce function, so for each tile, we overwrite the result of previous tile. Maybe we should use a combiner/reducer here?
There are two issues of optimizaions.
@rjpower I found the reaons why second issue happens. The difference between the matrix produced with optimization and the matrix produced without optimization is very small (< 10e-14). If the matrices only contain integers, we can pass the test. But I have no clues which part affects the precision.
I will fix the order issue first.
Hi Russell,
The following two lines will cause some tests failed...
in the spartan/expr/base.py:
+182 if not FLAGS.optimization or not FLAGS.opt_expression_cache:
+183 return None
The reason why some tests fail is that some random expr should not be evaluated twice since we will compare it to the local numpy computation result ... Could we check if the expr is idempotent when doing caching? If it is not idempotent, we should force it to be cached?
When I want to write spartan in this way:
x = expr.randn(10, 3)
x_mean = x.mean(axis=0)
x -= x_mean
x.force()
This is OK in numpy
But spartan reports:
Traceback (most recent call last):
File "test.py", line 17, in <module>
x.force()
File "/home/yisheng/workplace/spartan/spartan/expr/base.py", line 332, in force
return self.evaluate()
File "/home/yisheng/workplace/spartan/spartan/expr/base.py", line 233, in evaluate
value = self._evaluate(ctx, deps)
File "/home/yisheng/workplace/spartan/spartan/expr/map.py", line 79, in _evaluate
kw = { 'children' : children, 'op' : op })
File "/home/yisheng/workplace/spartan/spartan/array/distarray.py", line 135, in map_to_array
results = self.foreach_tile(mapper_fn=mapper_fn, kw=kw)
File "/home/yisheng/workplace/spartan/spartan/array/distarray.py", line 219, in foreach_tile
kw=kw)
File "/home/yisheng/workplace/spartan/spartan/blob_ctx.py", line 191, in map
kw)
File "/home/yisheng/workplace/spartan/spartan/blob_ctx.py", line 179, in partial_map
futures = self._send_all('run_kernel', req, targets=targets)
File "/home/yisheng/workplace/spartan/spartan/blob_ctx.py", line 75, in _send_all
return futures.wait()
File "/home/yisheng/workplace/spartan/spartan/rpc/common.py", line 253, in wait
results.append(f.wait())
File "/home/yisheng/workplace/spartan/spartan/rpc/common.py", line 227, in wait
raise RemoteException(self.result.py_exc)
spartan.rpc.common.RemoteException: RemoteException:
Traceback (most recent call last):
:: File "/home/yisheng/workplace/spartan/spartan/worker.py", line 136, in _run_kernel
:: map_result = req.mapper_fn(blob_id, blob, **req.kw)
:: File "/home/yisheng/workplace/spartan/spartan/array/distarray.py", line 85, in _tile_mapper
:: return user_fn(ex, **kw)
:: File "/home/yisheng/workplace/spartan/spartan/expr/map.py", line 20, in tile_mapper
:: lv = gv.fetch(ex)
:: File "/home/yisheng/workplace/spartan/spartan/array/distarray.py", line 639, in fetch
:: fetched = self.base.fetch(ex)
:: File "/home/yisheng/workplace/spartan/spartan/array/distarray.py", line 231, in fetch
:: assert np.all(region.lr <= self.shape), (region, self.shape)
:: AssertionError: (extent(9:10), (3,))
::
Instead, I need to write it in this way to make spartan run:
x = expr.randn(10, 3)
x_mean = x.mean(axis=0)
x -= expr.reshape(x_mean, (1, x_mean.shape[0]))
x.force()
Determine if load-balancing is worthwhile, and how to do it while retaining tile alignment.
Since we're capturing the expression graphs, we're in a good position to support automatic differentiation as well. This would require expressions to track a bit more information (the actual arithmetic operation being performed).
While I ran the code:
A = expr.ones((3, 3)).force()
B = expr.zeros((3, 3))
res = A - B
print res.glom()
The result is:
[[-1. -1. -1.]
[-1. -1. -1.]
[-1. -1. -1.]]
If I run it in this way:
A = expr.ones((3, 3))
B = expr.zeros((3, 3))
res = A - B
print res.glom()
The result is correct.
If I run it in this way:
A = expr.ones((3, 3)).force()
B = expr.zeros((3, 3)).force()
res = A - B
print res.glom()
It will report:
TypeError: unsupported operand type(s) for -: 'DistArrayImpl' and 'DistArrayImpl'
The reason is that DistArray doesn't overload sub method, only Expr does. So
if we subtract a DistArray object with an Expr object, it will call the sub operation
of Expr object, which causes the result in opposite sign.
def __sub__(self, other):
#Now the self is the Expression(B), other is DistArray(A)
return _map(self, other, fn=np.subtract)
Why don't we also overload these operators in DistArray?
We're seeing performance issues with scaling to larger numbers of workers. The performance bottleneck seems to be concentrated in the RPC system.
Thing to try:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.