Giter Club home page Giter Club logo

dagger.jl's Introduction

Dagger.jl


A framework for out-of-core and parallel computing

Documentation Build Status
Build Status

At the core of Dagger.jl is a scheduler heavily inspired by Dask. It can run computations represented as directed-acyclic-graphs (DAGs) efficiently on many Julia worker processes and threads, as well as GPUs via DaggerGPU.jl.

The DTable has been moved out of this repository. You can now find it here.

Installation

Dagger.jl can be installed using the Julia package manager. Enter the Pkg REPL mode by typing "]" in the Julia REPL and then run:

pkg> add Dagger

Or, equivalently, install Dagger via the Pkg API:

julia> import Pkg; Pkg.add("Dagger")

Usage

Once installed, the Dagger package can be loaded with using Dagger, or if you want to use Dagger for distributed computing, it can be loaded as:

using Distributed; addprocs() # Add one Julia worker per CPU core
using Dagger

You can run the following example to see how Dagger exposes easy parallelism:

# This runs first:
a = Dagger.@spawn rand(100, 100)

# These run in parallel:
b = Dagger.@spawn sum(a)
c = Dagger.@spawn prod(a)

# Finally, this runs:
wait(Dagger.@spawn println("b: ", b, ", c: ", c))

Use Cases

Dagger can support a variety of use cases that benefit from easy, automatic parallelism, such as:

This isn't an exhaustive list of the use cases that Dagger supports. There are more examples in the docs, and more use cases examples are welcome (just file an issue or PR).

Contributing Guide

PRs Welcome GitHub issues GitHub contributors

Contributions are encouraged.

There are several ways to contribute to our project:

Reporting Bugs: If you find a bug, please open an issue and describe the problem. Make sure to include steps to reproduce the issue and any error messages you receive regarding that issue.

Fixing Bugs: If you'd like to fix a bug, please create a pull request with your changes. Make sure to include a description of the problem and how your changes will address it.

Additional examples and documentation improvements are also very welcome.

Resources

List of recommended Dagger.jl resources:

Help and Discussion

For help and discussion, we suggest asking in the following places:

Julia Discourse and on the Julia Slack in the #dagger channel.

Acknowledgements

We thank DARPA, Intel, and the NIH for supporting this work at MIT.

dagger.jl's People

Contributors

andreasnoack avatar bicycle1885 avatar christopher-dg avatar dependabot[bot] avatar dilumaluthge avatar drchainsaw avatar fda-tome avatar femtocleaner[bot] avatar github-actions[bot] avatar jameswrigley avatar jeffbezanson avatar jpsamaroo avatar krynju avatar maleadt avatar mattwigway avatar nucklass avatar pranavtbhat avatar pszufe avatar rabab53 avatar ranjanan avatar rohitvarkey avatar scls19fr avatar scottpjones avatar shashi avatar staticfloat avatar tanmaykm avatar teresy avatar vchuravy avatar viralbshah avatar visr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dagger.jl's Issues

sort hangs on Julia 0.6

julia> using Dagger

julia> X = Distribute(BlockPartition(3), rand(1:10, 10))
Dagger.Distribute{Int64,1}(10,)

julia> @time gather(sort(X))
^CERROR: InterruptException:
Stacktrace:
 [1] process_events(::Bool) at ./libuv.jl:83
 [2] wait() at ./event.jl:171
 [3] wait(::Condition) at ./event.jl:27
 [4] wait(::Channel{Any}) at ./channels.jl:348
 [5] take_buffered at ./channels.jl:307 [inlined]
 [6] take!(::Channel{Any}) at ./channels.jl:305
 [7] take_ref(::Base.Distributed.RRID, ::Int64) at ./distributed/remotecall.jl:532
 [8] call_on_owner(::Function, ::RemoteChannel{Channel{Any}}, ::Int64, ::Vararg{Int64,N} where N) at ./distributed/remotecall.jl:438
 [9] compute(::Dagger.Context, ::Dagger.Thunk) at /home/shashi/.julia/v0.6/Dagger/src/basics/compute.jl:211
 [10] mapchunk_eager(::Dagger.##296#301{Dagger.#submedian,Array{UnitRange,2},DataType}, ::Dagger.Context, ::Dagger.Cat, ::Type{T} where T, ::Symbol) at /home/shashi/.julia/v0.6/Dagger/src/array/sort.jl:46
 [11] broadcast1(::Dagger.Context, ::Dagger.#submedian, ::Dagger.Cat, ::Array{UnitRange,2}, ::Type{T} where T) at /home/shashi/.julia/v0.6/Dagger/src/array/sort.jl:54
 [12] select(::Dagger.Context, ::Dagger.Cat, ::Array{Int64,1}, ::Base.Order.ForwardOrdering) at /home/shashi/.julia/v0.6/Dagger/src/array/sort.jl:87
 [13] compute(::Dagger.Context, ::Dagger.Sort) at /home/shashi/.julia/v0.6/Dagger/src/array/sort.jl:35
 [14] gather(::Dagger.Sort) at /home/shashi/.julia/v0.6/Dagger/src/basics/compute.jl:57

[RFC] Refactor scheduler to make it easy to modify by end-user code

The Dagger scheduler is (internally) a bit of a mess, with limited documentation and a lot of inter-dependent states which can break easily when modified. In order to support dynamic user-defined load balancing, dynamic DAG modification, GPU scheduling, and other features which touch core parts of the scheduler machinery, we should consider exposing more of the scheduler's decisions to the user while at the same time providing a semi-internal API which can be used to modify the scheduler's internal state without having to understand how everything fits together. In my opinion, the following things need to change in order for Dagger to support this:

  • Callbacks at scheduler decision points
  • Scheduler API to safely manipulate an active DAG (see below)
  • Add field to Thunk that users can modify and query at runtime (for arbitrary data, such as location information (GPU device, coordinate in user-specified grid, etc.))

Examples of scheduler API functions might include:

  • append_thunk - Appends a new Thunk to the DAG with provided inputs
  • replace_thunk - Replace an existing Thunk with a new one, only valid if the Thunk has not executed yet
  • delete_thunk - Deletes an existing Thunk from the DAG (including all dependent children), only valid if the Thunk has not executed yet
  • get_root/get_children/get_parents/etc. - Queries the DAG
  • copy_dag - Copy a DAG (potentially a subset of the whole graph) and make it ready for execution
  • spawn_scheduler - Spawn a new scheduler, potentially fully independent of the current one, and have it execute a given DAG

Of course, one can argue that you can always replace the entire scheduler via the plugin mechanism, however that's a very crude way to achieve the above goals. I think the above items would allow Dagger to evolve into more roles than it can currently fulfil, and also make it easy to refactor much of the core logic into a simpler, more readable form.

Redistribute doesn't change the layout from CutDimension{1} to CutDimension{2}.

 m = round(Int, rand(10,10))
dm = compute(Context(), distribute(m, cutdim(1)))

ComputeFramework.DistMemory(Any,Pair[2=>RemoteRef{Channel{Any}}(2,1,17),3=>RemoteRef{Channel{Any}}(3,1,18)],ComputeFramework.CutDimension{1}())

 dm = compute(Context(), redistribute(dm, cutdim(2)))

ComputeFramework.DistMemory(Any,Pair[2=>RemoteRef{Channel{Any}}(2,1,25),3=>RemoteRef{Channel{Any}}(3,1,26)],ComputeFramework.CutDimension{1}())

The layout changes from CutDimension{2} to CutDimension{1} but doesn't change form CutDimension{1} to CutDimension{2}.

Serialization should be done on a separate thread

Consider the following sequence of events:

T+1: Worker 1 finished a task (Task 1), and notifies the scheduler about it
T+2: The scheduler replies with a new task for Worker 1 (which, let's say, only depends on local data to keep this example simple)
T+3: Worker 1 starts working on the new task (Task 2)
T+4: Worker 2 which was at work all this while finishes and notified the scheduler
T+5: The scheduler replies with a new task (Task 3) for Worker 2. This task depends on the output of Task 1 which is still with Worker 1.
T+6: Worker 2 tries to fetch the data required to start working on Task 3, but blocks because Worker 2 is still busy with Task 2 (typically in a compute intensive for loop) and cannot serve Worker 2 the data immediately.
....worker 2 waits....
....and waits...
T+7: Worker 1 finishes Task 2 (and notifies the scheduler).
...worker 2 is still waiting...
T+8: Worker 1 reads Worker 2's request and responds to it with the requested data.
T+9: Finally the wait is over! Worker 2 can start on Task 3.
T+10: Worker 1 reads Task 4 from the scheduler, and Task 4 better not depend on Task 2, because then Worker 1 would have to wait for Worker 2 to finish Task 3........... God help these vengeful quibbling workers.

It works this way because:

  1. We don't want the scheduler process doing serialize/deserialize while it should really be scheduling things as fast as possible.
  2. It would double the number of times we do serialize/deserialize

The workers really shouldn't be waiting on each other. This happens all the time in matrix multiplication, and sometimes when it doesn't, it's much faster! There are a few solutions to this:

  1. Have half the workers (or maybe just 2 workers) just reading and relaying intermediate data. So every worker sends its output to these processes after its done and reads from these processes when it needs some other worker's data before starting a new task.
  2. Create a special kind of task that says "Wait for someone to take the data that you have computed for Task 1 and then start working on this Task 2"
  3. Do a hybrid of 1 and 2. i.e. have a deadline for the wait in 2 and fall back to 1..

This problem arises because we just use the shared memory scheduler from Dask (which doesn't have this problem, because well you don't need to communicate in a shared-memory set up). I believe the multiprocessing scheduler in Dask was just sending results to the scheduler at the time. So I figured this might not be great for embarassingly parallel workloads.

A great way to solve this issue is to hook into distributed scheduler and implement its client interface. This, I beleive will not only solve this issue, but make sure other things (like data locality in HDFS) work great in a cluster set-up. (@mrocklin had got me bootstrapped with this process. here are the simple send_msg and recv_msg functions https://gist.github.com/shashi/e8f37c5f61bab4219555cd3c4fef1dc4) see dask/distributed#586 for a discussion on this.

Allow users to prevent moving of data between nodes

Sometimes the user knows that their data is just too huge or complicated to transfer over a network, and they want to be able to ensure that computations on that data only occurs locally relative to the data. Additionally, in certain industries like healthcare and finance, data storage and transmission is legally restricted, however computing on such data locally (relative to where the data is stored) and then only transferring summarized information is legally acceptable. We should provide a mechanism by which users can annotate a thunk argument as being non-moveable, such as providing a wrapper type that is specially handled by the scheduler.

error showing error during unit tests

Scheduler options: threads: Error During Test at /home/expandingman/.julia/dev/Dagger/test/scheduler.jl:36
  Got exception outside of a @test
  MethodError: no method matching show_backtrace(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}, ::Nothing)
  Closest candidates are:
    show_backtrace(::IO, !Matched::Array{Any,1}) at errorshow.jl:637
    show_backtrace(::IO, !Matched::Array{T,1} where T) at errorshow.jl:608
    show_backtrace(::IO, !Matched::JuliaInterpreter.Frame) at /home/expandingman/.julia/packages/JuliaInterpreter/dEBFI/src/utils.jl:545
  Stacktrace:
   [1] showerror(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}, ::AssertionError, ::Nothing; backtrace::Bool) at ./errorshow.jl:79
   [2] showerror(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}, ::AssertionError, ::Nothing) at ./errorshow.jl:74
   [3] showerror(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}, ::TaskFailedException) at ./task.jl:76
   [4] (::Base.var"#649#650"{TaskFailedException})(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}) at ./errorshow.jl:76
   [5] with_output_color(::Function, ::Symbol, ::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}; bold::Bool) at ./util.jl:385
   [6] with_output_color(::Function, ::Symbol, ::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}) at ./util.jl:383
   [7] showerror(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}, ::TaskFailedException, ::Array{Any,1}; backtrace::Bool) at ./errorshow.jl:75
   [8] showerror(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}, ::CapturedException) at ./task.jl:25
   [9] showerror(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}, ::RemoteException) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Distributed/src/process_messages.jl:57
   [10] (::Base.var"#649#650"{RemoteException})(::IOContext{Base.GenericIOBuffer{Array{UInt8,1}}}) at ./errorshow.jl:76
   [11] with_output_color(::Function, ::Symbol, ::Base.GenericIOBuffer{Array{UInt8,1}}; bold::Bool) at ./util.jl:385
   [12] with_output_color at ./util.jl:383 [inlined]
   [13] showerror(::Base.GenericIOBuffer{Array{UInt8,1}}, ::RemoteException, ::Array{Union{Ptr{Nothing}, Base.InterpreterIP},1}; backtrace::Bool) at ./errorshow.jl:75
   [14] show_exception_stack(::Base.GenericIOBuffer{Array{UInt8,1}}, ::Array{Any,1}) at ./errorshow.jl:705
   [15] sprint(::Function, ::Array{Any,1}; context::Nothing, sizehint::Int64) at ./strings/io.jl:105
   [16] sprint(::Function, ::Array{Any,1}) at ./strings/io.jl:101
   [17] Test.Error(::Any, ::Any, ::Any, ::Any, ::Any) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Test/src/Test.jl:158
   [18] do_test(::Test.ExecutionResult, ::Any) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Test/src/Test.jl:518
   [19] top-level scope at /home/expandingman/.julia/dev/Dagger/test/scheduler.jl:42
   [20] top-level scope at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Test/src/Test.jl:1113
   [21] top-level scope at /home/expandingman/.julia/dev/Dagger/test/scheduler.jl:37
   [22] top-level scope at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.4/Test/src/Test.jl:1113
   [23] top-level scope at /home/expandingman/.julia/dev/Dagger/test/scheduler.jl:18
   [24] include(::String) at ./client.jl:439
   [25] top-level scope at /home/expandingman/.julia/dev/Dagger/test/runtests.jl:11
   [26] include(::String) at ./client.jl:439
   [27] top-level scope at none:6
   [28] eval(::Module, ::Any) at ./boot.jl:331
   [29] exec_options(::Base.JLOptions) at ./client.jl:264
   [30] _start() at ./client.jl:484

Not really sure what to make of this, but I get this error when doing unit tests on my local machine on master.

Re-evaluation of evaluated nodes

I think if a node has been computed, and one tries to compute again, should we recompute, or make it a noop since it was already done and is stored in the node?

Get rid of partitions

As development has progressed it is no longer clear why Partition is needed as an abstraction. One can just create different DomainSplit structures and pass it to a single distribute generic function to do the distribution. The only use however seemed to be dispatching on base function like rand zeros etc. We can switch to dispatching on Chunks(10, 10) or something like that.

How do I modify distributed data?

This seems to be general problem. I can't write over the remote references. Is there some other way to modify distributed data?

addprocs(2)
a = [1,2]
da = compute(Context(), distribute(a))
fetch(da.refs[1][2])            # gives [1]
put!(da.refs[1][2], [5])        # gets stuck, no response

Filter

I was trying to write an example for filter. The following code causes a stackoverflow.

addprocs(3)

using ComputeFramework

numbers = [1, 2, 3 ,4, 5, 6, 7, 8, 9, 10]

parallel_numbers = distribute(numbers)
greaterthan_3 = filter(x -> x > 3, parallel_numbers)

@show compute(Context(), greaterthan_3)

no speed increase

I tested the following code in two situations:
(1) Run from Juno
When i run the code in Juno, the code should be run with “addprocs()” before “using Dagger” first. There is no speed increase for the first run. Then, delete the code “addprocs()”, run the code again. There is speed increase.
(2) Run from the command
When I run the code with/without “addprocs()” from the command. There is no speed increase.

Run on windows 7, 64 Bit, with 4 CPU cores, Julia0.61, Dagger v0.5.1

using Dagger     # comment this code after the first run
@show workers()

@everywhere function f1(x)
    sleep(1)
    rand()+x
end

vs = [rand(Int) for i in 1:10];
cks = Any[]
for v in vs
    push!(cks, delayed(f1)(v))
end

@everywhere combine(x...) = nothing
c = delayed(combine)(cks...)
f1(1)

## runnint time with Dagger
@time compute(c)

Determine work assignment and data movement based on runtime-collected metrics

While round-robin work assigment is fine when first launching work without any prior knowledge, it is less efficient when individual work items are widely varying in duration, and when data being moved varies in size. We have the necessary infrastructure already built-in to Dagger to support monitoring work and data movement latencies, we just need to tell the scheduler how to use this information for its benefit.

I believe that we could use a simple runtime-derived cost model, plus a numerical optimizer, to allow the scheduler to make better decisions. We can also add information about processor hierarchies to further refine the model to capture latencies due to memory transfer between levels of the processor hierarchy (e.g. NUMA domains, CPU-GPU transfers, disk-backed access latency, etc.).

Issues with reduce

I'm using the following example for reduce

addprocs(2)
using ComputeFramework

numbers = [1, 2, 3 ,4, 5, 6, 7, 8, 9, 10]
parallel_numbers = distribute(numbers)
sum_of_numbers = reduce(+, 0.0, parallel_numbers)
println(gather(Context(), sum_of_numbers))

The first problem is that reduce returns a single element array, unlike filter and map. The second problem is that +,- and other arithmetic operators don't have methods that can work with the required types.

MethodError: `+` has no method matching +(::Float64, ::ComputeFramework.Distribute{Array{Int64,1},ComputeFramework.CutDimension{1}})

However push! works just fine:

sum_of_numbers = reduce(push!, 0.0, parallel_numbers)

Documentation/Examples?

Hi, is there someplace with more documentation/examples/tutorials? Or is one required to read the source code?

Distribute to WorkerPools

Could we get a convenience function to make Dagger tasking limited to only a prescribed WorkerPool? In it's simplest form, this is just a vector of process ID's.

deserialization bug

this is a "small" reproducable example.

using ComputeFramework

x = ComputeFramework.SubChunk{Array{Array{Float64},1}}(Array{Array{Float64},1},ComputeFramework.ArrayDomain{1}((1:3,)),ComputeFramework.Bytes(24.0),ComputeFramework.ArrayDomain{1}((1:3,)),ComputeFramework.Chunk{ComputeFramework.DistMem,Array{Array{Float64},1}}(Array{Array{Float64},1},ComputeFramework.ArrayDomain{1}((1:10,)),ComputeFramework.Bytes(80.0),ComputeFramework.DistMem(RemoteChannel{Channel{Any}}(1,1,10))))

julia> remotecall_fetch(x->x, 2, x)
fatal error on 2: ERROR: TypeError: Array: in parameter, expected Type{T}, got ComputeFramework.SubChunk{Array{Array{Float64,N},1}}
 in deserialize_datatype(::SerializationState{TCPSocket}) at ./serialize.jl:728
 in handle_deserialize(::SerializationState{TCPSocket}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./int.jl:419
 in deserialize(::SerializationState{TCPSocket}, ::Type{SimpleVector}) at ./serialize.jl:515
 in handle_deserialize(::SerializationState{TCPSocket}, ::Int32) at ./serialize.jl:508
 [inlined code] from ./int.jl:419
 in deserialize_datatype(::SerializationState{TCPSocket}) at ./serialize.jl:727
 in handle_deserialize(::SerializationState{TCPSocket}, ::Int32) at ./serialize.jl:498
 in deserialize(::SerializationState{TCPSocket}, ::Type{ComputeFramework.SubChunk{Array{Array{Float64,N},1}}}) at ./serialize.jl:778
 in deserialize_datatype(::SerializationState{TCPSocket}) at ./serialize.jl:733
 in handle_deserialize(::SerializationState{TCPSocket}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./int.jl:419
 in ntuple(::Base.Serializer.##1#2{SerializationState{TCPSocket}}, ::Int64) at ./tuple.jl:44
 in handle_deserialize(::SerializationState{TCPSocket}, ::Int32) at ./serialize.jl:511
 in deserialize(::SerializationState{TCPSocket}, ::Type{Base.CallMsg{:call_fetch}}) at ./serialize.jl:778
 in deserialize_datatype(::SerializationState{TCPSocket}) at ./serialize.jl:733
 in handle_deserialize(::SerializationState{TCPSocket}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./int.jl:419
 in message_handler_loop(::TCPSocket, ::TCPSocket) at ./multi.jl:965
 in process_tcp_streams(::TCPSocket, ::TCPSocket) at ./multi.jl:954
 in (::Base.##247#248{TCPSocket,TCPSocket})() at ./task.jl:59
Worker 2 terminated.
ERROR: ProcessExitedException()

serialization actually fails when done explicitly

julia> io = IOBuffer()
IOBuffer(data=UInt8[...], readable=true, writable=true, seekable=true, append=false, size=0, maxsize=Inf, ptr=1, mark=-1)

julia> serialize(io, x)
ERROR: ArgumentError: IntSet elements cannot be negative
 in push!(::IntSet, ::Int64) at ./intset.jl:62
 [inlined code] from ./multi.jl:626
 in send_add_client(::RemoteChannel{Channel{Any}}, ::Int64) at ./multi.jl:638
 in serialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::RemoteChannel{Channel{Any}}, ::Bool) at ./multi.jl:657
 in serialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::RemoteChannel{Channel{Any}}) at ./multi.jl:653
 in serialize_any(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Any) at ./serialize.jl:453
 in serialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::ComputeFramework.DistMem) at ./serialize.jl:436
 in serialize_any(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Any) at ./serialize.jl:453
 in serialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::ComputeFramework.Chunk{ComputeFramework.DistMem,Array{Array{Float64,N},1}}) at ./serialize.jl:436
 in serialize_any(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Any) at ./serialize.jl:453
 [inlined code] from ./serialize.jl:436
 in serialize(::Base.AbstractIOBuffer{Array{UInt8,1}}, ::ComputeFramework.SubChunk{Array{Array{Float64,N},1}}) at ./serialize.jl:461
 in eval(::Module, ::Any) at ./boot.jl:267

But there's stuff in the IOBuffer....

julia> seekstart(io)
IOBuffer(data=UInt8[...], readable=true, writable=true, seekable=true, append=false, size=545, maxsize=Inf, ptr=1, mark=-1)

julia> deserialize(io)
ERROR: TypeError: Array: in parameter, expected Type{T}, got ComputeFramework.SubChunk{Array{Array{Float64,N},1}}
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:728
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./boot.jl:331
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{SimpleVector}) at ./serialize.jl:515
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:508
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:727
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{ComputeFramework.SubChunk{Array{Array{Float64,N},1}}}) at ./serialize.jl:778
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:733
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./array.jl:195
 in deserialize(::Base.AbstractIOBuffer{Array{UInt8,1}}) at ./serialize.jl:465
 in eval(::Module, ::Any) at ./boot.jl:267

julia> deserialize(io)
1

julia> deserialize(io)
ComputeFramework.ArrayDomain{1}((1:3,))

julia> deserialize(io)
ComputeFramework.Bytes(24.0)

julia> deserialize(io)
ComputeFramework.ArrayDomain{1}((1:3,))

julia> deserialize(io)
ERROR: KeyError: 1 not found
 [inlined code] from ./dict.jl:312
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:494
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./boot.jl:331
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{SimpleVector}) at ./serialize.jl:515
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:508
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:727
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./boot.jl:331
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{SimpleVector}) at ./serialize.jl:515
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:508
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:727
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./boot.jl:331
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{SimpleVector}) at ./serialize.jl:515
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:508
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:727
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./array.jl:195
 in deserialize(::Base.AbstractIOBuffer{Array{UInt8,1}}) at ./serialize.jl:465
 in eval(::Module, ::Any) at ./boot.jl:267

julia> deserialize(io)
1

julia> deserialize(io)
ERROR: KeyError: 1 not found
 [inlined code] from ./dict.jl:312
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:494
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./boot.jl:331
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{SimpleVector}) at ./serialize.jl:515
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:508
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:727
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./boot.jl:331
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{SimpleVector}) at ./serialize.jl:515
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:508
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:727
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./array.jl:195
 in deserialize(::Base.AbstractIOBuffer{Array{UInt8,1}}) at ./serialize.jl:465
 in eval(::Module, ::Any) at ./boot.jl:267

julia> deserialize(io)
1

julia> deserialize(io)
ComputeFramework.ArrayDomain{1}((1:10,))

julia> deserialize(io)
ComputeFramework.Bytes(80.0)

julia> deserialize(io)
ERROR: EOFError: read end of file
 [inlined code] from ./iobuffer.jl:88
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Type{ComputeFramework.DistMem}) at ./serialize.jl:776
 in deserialize_datatype(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:733
 in handle_deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}, ::Int32) at ./serialize.jl:498
 [inlined code] from ./iobuffer.jl:92
 in deserialize(::SerializationState{Base.AbstractIOBuffer{Array{UInt8,1}}}) at ./serialize.jl:468
 [inlined code] from ./array.jl:195
 in deserialize(::Base.AbstractIOBuffer{Array{UInt8,1}}) at ./serialize.jl:465
 in eval(::Module, ::Any) at ./boot.jl:267

julia> 

Detect network structure for improved scheduling and parallelism

Modern clusters (especially supercomputers) usually have more than one networking device between a given pair of nodes, and specialized network fabrics (e.g. Infiniband, NVLINK, ROCmRDMA, etc.) which can provide optimized transfers between processors when utilized. Our current processor infrastructure only represents (combined) memory and compute domains, but does not model the connections between and within domains. If we were able to query the network/memory connections between processors, and then transfer data between processors using only a specifically-selected connection, we could enable the scheduler to make much more efficient decisions about where to place work to optimize data transfer latencies and decrease network bottlenecking.

gather should do a tree-reduce

right now all data is first moved to master and then concatenated at once. Although this involves allocations, this is not fast at all when there are 1000s of chunks.

stage functions should not rely on the domain being a DomainBranch

a = compute(Distribute(BlockPartition(10, 1), spzeros(10, 10)))
da = compute(a*a)
compute(map(x->1, da))

This throws an error:

ERROR: type DenseDomain has no field children
 in stage at /home/pranav/.julia/v0.5/ComputeFramework/src/map-reduce.jl:13
 [inlined code] from /home/pranav/.julia/v0.5/ComputeFramework/src/compute.jl:50
 in compute at /home/pranav/.julia/v0.5/ComputeFramework/src/compute.jl:51
 in eval at ./boot.jl:265

Other operations on da still work.

Get rid of operation-level DAG

Since we now have things like syntactic loop fusion in the language, and in general it is possible to have the benefits of the high level API by syntactic means such as https://github.com/MikeInnes/DataFlow.jl, it will simplify implementation to just use:

immutable DaggerArray{T,N} <: AbstractArray{T,N}
    dag::AbstractChunk{<:AbstractArray{T,N}}
end

As the array type. Operations will simply return the next dag wrapped in an DaggerArray object.

This along with #45 should simplify this package drastically.

Use after free issue

Digger.jl's inaugural issue:

julia> let waterbuffalo = Distribute(BlockPartition(100,100),rand((100,100)))
               (sin(waterbuffalo)).'
           end
ERROR (unhandled task failure): KeyError: key Dagger.MemToken(1,110) not found

array constructors should have automatic mode

Currently, a user has to specify the block partition sizes. Ideally, there should be an automatic choice, perhaps by distributing on the last dimension, or some other well defined method.

MethodError with save function while trying out the example code

I am using the dagger version 0.2.1 and julia version 0.6.0. I was trying to do the example code from the README.md file:

addprocs()
using Dagger
A = rand(Blocks(4000, 4000), 30000, 30000) # 7.2GB of data
saved_A = compute(save(A, "A"))

result = compute(save(saved_A+saved_A', "ApAt"))

which is under the section "A note on keeping memory use in check". In particular, i received the following error:

ERROR: MethodError: no method matching save(::String, ::Dagger.AllocateArray{Float64,2})
Closest candidates are:
save(::Any, ::IO, ::Dagger.Chunk, ::Any) at /home/jrb/.julia/v0.6/Dagger/src/file-io.jl:55
save(::Any, ::IO, ::Dagger.DArray, ::AbstractString, ::AbstractArray) at /home/jrb/.julia/v0.6/Dagger/src/file-io.jl:72
save(::Any, ::IO, ::Dagger.DArray, ::Any) at /home/jrb/.julia/v0.6/Dagger/src/file-io.jl:84

when trying to execute the line

saved_A = compute(save(A, "A"))

Please let me know if I could provide you with any other details or if I just made a silly error.

mappart infinte loop

This produces an infinite loop:

gather(Context(), map( (x,y)->x, distribute(zeros(10)), distribute(zeros(10,10))))

The dimensions on the inputs don't match, but I need this to load the adjacency matrix and dist vector onto nodes.

The infinite recursion is in this function in compute-nodes.jl:

function compute(ctx, x::MapPartNode)
   compute(ctx, MapPartNode(x.f, map(inp -> compute(ctx, inp), x.input)))
end

Usually the recursion is terminated by this function in dist-memory.jl:

 function compute{N, T<:DistMemory}(ctx, node::MapPartNode{NTuple{N, T}})
    refsets = zip(map(x -> map(y->y[2], refs(x)), node.input)...) |> collect
    pids = map(x->x[1], refs(node.input[1]))
    pid_chunks = zip(pids, map(tuplize, refsets)) |> collect

    let f = node.f
        futures = Pair[pid => @spawnat pid f(map(fetch, rs)...)
                        for (pid, rs) in pid_chunks]
        DistMemory(futures, node.input[1].layout)
    end
end

DArray chunk reference counting issue

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> using Base.Test

julia> using Dagger

julia> D2 = remotecall_fetch(2, compute(Distribute(Blocks(10, 20), rand(40,40)))) do D
          D2 = D
       end
Dagger.DArray{Any,2,Base.#cat}(40, 40)

julia> @test size(collect(D2)) == (40,40)
Test Passed

julia> gc()

julia> @test size(collect(D2)) == (40,40)
Error During Test
  Test threw an exception of type KeyError
  Expression: size(collect(D2)) == (40, 40)
  KeyError: key 1 not found
  Stacktrace:
   [1] #571 at ./asyncmap.jl:178 [inlined]
   [2] foreach(::Base.##571#573, ::Array{Any,1}) at ./abstractarray.jl:1733
   [3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::Array{Union{Dagger.Chunk, Dagger.Thunk},2}, ::Vararg{Array{Union{Dagger.Chunk, Dagger.Thunk},2},N} where N) at ./asyncmap.jl:178
   [4] wrap_n_exec_twice(::Channel{Any}, ::Array{Any,1}, ::Int64, ::Function, ::Array{Union{Dagger.Chunk, Dagger.Thunk},2}, ::Vararg{Array{Union{Dagger.Chunk, Dagger.Thunk},2},N} where N) at ./asyncmap.jl:154
   [5] #async_usemap#556(::Int64, ::Void, ::Function, ::Base.#collect, ::Array{Union{Dagger.Chunk, Dagger.Thunk},2}, ::Vararg{Array{Union{Dagger.Chunk, Dagger.Thunk},2},N} where N) at ./asyncmap.jl:103
   [6] (::Base.#kw##async_usemap)(::Array{Any,1}, ::Base.#async_usemap, ::Function, ::Array{Union{Dagger.Chunk, Dagger.Thunk},2}, ::Vararg{Array{Union{Dagger.Chunk, Dagger.Thunk},2},N} where N) at ./<missing>:0
   [7] asyncmap(::Function, ::Array{Union{Dagger.Chunk, Dagger.Thunk},2}) at ./asyncmap.jl:81
   [8] #collect#85(::Bool, ::Function, ::Dagger.Context, ::Dagger.DArray{Any,2,Base.#cat}) at /home/tan/.julia/v0.6/Dagger/src/array/darray.jl:177
   [9] collect(::Dagger.DArray{Any,2,Base.#cat}) at /home/tan/.julia/v0.6/Dagger/src/array/darray.jl:74
   [10] eval(::Module, ::Any) at ./boot.jl:235
   [11] eval_user_input(::Any, ::Base.REPL.REPLBackend) at ./REPL.jl:66
   [12] macro expansion at ./REPL.jl:97 [inlined]
   [13] (::Base.REPL.##1#2{Base.REPL.REPLBackend})() at ./event.jl:73
ERROR: There was an error during testing

Make a new release?

Would it be possible to make a new release? Looks like there have been 32 commits to master since the last release.

cc: @jpsamaroo

Tensor operations

Implement notation similar to TensorOperations.jl and make it work with Dagger arrays.

Feature Request: macro to make `delayed` less painful

In the doc we have

using Dagger

add1(value) = value + 1
add2(value) = value + 2
combine(a...) = sum(a)

p = delayed(add1)(4)
q = delayed(add2)(p)
r = delayed(add1)(3)
s = delayed(combine)(p, q, r)

@assert collect(s) == 16

I was thinking if a we can make a macro called @delayed and it will be like

@delayed begin
  p = add1(4)
  q = add2(p)
  r = add1(3)
  s = combine(p, q, r)
end

getindex bounds error

julia> xs = distribute(rand(10,10), Blocks(2,2))
Dagger.DArray{Any,2,Base.#cat}(10, 10)

julia> compute(xs[1:5, 3:6])
ERROR: BoundsError: attempt to access 2×2 Array{Float64,2} at index [1:2, 0:1]

Blocks(5,5) works fine.

race condition in chunk ref counting?

seen often while running JuliaDB.jl tests:

ERROR (unhandled task failure): KeyError: key 2175 not found
getindex at ./dict.jl:474 [inlined]
datastore_delete(::Int64) at /home/tan/.julia/v0.6/MemPool/src/datastore.jl:123
pooldelete at /home/tan/.julia/v0.6/MemPool/src/datastore.jl:147 [inlined]
free!(::MemPool.DRef) at /home/tan/.julia/v0.6/Dagger/src/chunks.jl:110
(::Dagger.##79#80)() at ./task.jl:335
Stacktrace:
 [1] sync_end() at ./task.jl:287
 [2] macro expansion at ./task.jl:303 [inlined]
 [3] free_chunks(::Tuple{Dagger.Chunk{Array{Float64,1},MemPool.DRef},Dagger.ArrayDomain{1}}) at /home/tan/.julia/v0.6/Dagger/src/array/darray.jl:123
 [4] macro expansion at /home/tan/.julia/v0.6/Dagger/src/array/darray.jl:129 [inlined]
 [5] macro expansion at ./task.jl:302 [inlined]
 [6] free_chunks(::Array{Union{Dagger.Chunk, Dagger.Thunk},1}) at /home/tan/.julia/v0.6/Dagger/src/array/darray.jl:123
 [7] (::Dagger.##81#82{Dagger.DArray{Float64,1,Base.#cat}})() at ./event.jl:73
ERROR (unhandled task failure): KeyError: key 2177 not found
getindex at ./dict.jl:474 [inlined]
datastore_delete(::Int64) at /home/tan/.julia/v0.6/MemPool/src/datastore.jl:123
pooldelete at /home/tan/.julia/v0.6/MemPool/src/datastore.jl:147 [inlined]
free!(::MemPool.DRef) at /home/tan/.julia/v0.6/Dagger/src/chunks.jl:110
(::Dagger.##79#80)() at ./task.jl:335

deprecation warnings

Would be nice to fix all of these. Hopefully soon we can make 0.5 the minimum requirement.

julia> using ComputeFramework
INFO: Recompiling stale cache file /Users/viral/.julia/lib/v0.5/Compat.ji for module Compat.
WARNING: `@unix_only` is deprecated, use `@static if is_unix()` instead
 in depwarn(::String, ::Symbol) at ./deprecated.jl:64
 in @unix_only(::Any) at ./deprecated.jl:494
 in include_from_node1(::String) at ./loading.jl:426
 in macro expansion; at ./none:2 [inlined]
 in anonymous at ./<missing>:?
 in eval(::Module, ::Any) at ./boot.jl:225
 in process_options(::Base.JLOptions) at ./client.jl:243
 in _start() at ./client.jl:322
while loading /Users/viral/.julia/v0.5/Compat/src/Compat.jl, in expression starting on line 108

WARNING: deprecated syntax "Base.(:*)".
Use "Base.:*" instead.

WARNING: deprecated syntax "Base.(:+)".
Use "Base.:+" instead.

WARNING: deprecated syntax "Base.(:-)".
Use "Base.:-" instead.

WARNING: deprecated syntax "Base.(:/)".
Use "Base.:/" instead.

WARNING: deprecated syntax "Base.(:/)".
Use "Base.:/" instead.

WARNING: deprecated syntax "Base.(:*)".
Use "Base.:*" instead.
WARNING: Method definition free!(ComputeFramework.Cat) in module ComputeFramework at /Users/viral/.julia/v0.5/ComputeFramework/src/part.jl:211 overwritten at /Users/viral/.julia/v0.5/ComputeFramework/src/compute.jl:112.

Dagger with arrays containg missing values

I try to use Dagger on an array of the type Array{Union{Missing, Int64},2} but I get an error (see below). I tried to adapt this example https://github.com/JuliaParallel/Dagger.jl/blob/master/test/array.jl#L155 to find out how Dagger is working. Is there a problem with my code or is this a bug?
I use julia 1.0.1 and Dagger 0.7.1.

The code

using Dagger
using Missings
x = allowmissing(rand(1:10, 10, 5))
@show reduce(+, x, dims=1)  # works
X = Distribute(Blocks(3,3), x)
collect(reduce(+, X, dims=1)) # fails

The full error message is:

ERROR: ArgumentError: type does not have a definite number of fields
fieldcount(::Any) at ./reflection.jl:599
fixedlength(::Type, ::IdDict{Any,Any}) at /home/abarth/.julia/packages/MemPool/Z2LCh/src/io.jl:155
fixedlength(::Type) at /home/abarth/.julia/packages/MemPool/Z2LCh/src/io.jl:145
approx_size(::Type, ::Int64, ::Array{Union{Missing, Int64},2}) at /home/abarth/.julia/packages/MemPool/Z2LCh/src/MemPool.jl:78
approx_size at /home/abarth/.julia/packages/MemPool/Z2LCh/src/MemPool.jl:74 [inlined]
(::getfield(MemPool, Symbol("#kw##poolset")))(::NamedTuple{(:destroyonevict,),Tuple{Bool}}, ::typeof(MemPool.poolset), ::Array{Union{Missing, Int64},2}, ::Int64) at ./none:0 (repeats 2 times)
#tochunk#40(::Bool, ::Bool, ::Function, ::Array{Union{Missing, Int64},2}) at /home/abarth/.julia/packages/Dagger/yLPgg/src/chunks.jl:84
(::getfield(Dagger, Symbol("#kw##tochunk")))(::NamedTuple{(:persist, :cache),Tuple{Bool,Bool}}, ::typeof(Dagger.tochunk), ::Array{Union{Missing, Int64},2}) at ./none:0
do_task(::Context, ::OSProc, ::Int64, ::Function, ::Tuple{Array{Union{Missing, Int64},2}}, ::Bool, ::Bool, ::Bool) at /home/abarth/.julia/packages/Dagger/yLPgg/src/scheduler.jl:254
#143 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:339 [inlined]
run_work_thunk(::getfield(Distributed, Symbol("##143#144")){typeof(Dagger.Sch.do_task),Tuple{Context,OSProc,Int64,typeof(identity),Tuple{Array{Union{Missing, Int64},2}},Bool,Bool,Bool},Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}}, ::Bool) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Distributed/src/process_messages.jl:56
#remotecall_fetch#148(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.LocalProcess, ::Context, ::Vararg{Any,N} where N) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
remotecall_fetch(::Function, ::Distributed.LocalProcess, ::Context, ::Vararg{Any,N} where N) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:364
#remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Context, ::Vararg{Any,N} where N) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406
remotecall_fetch at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Distributed/src/remotecall.jl:406 [inlined]
macro expansion at /home/abarth/.julia/packages/Dagger/yLPgg/src/scheduler.jl:269 [inlined]
(::getfield(Dagger.Sch, Symbol("##13#14")){Context,OSProc,Int64,typeof(identity),Tuple{Array{Union{Missing, Int64},2}},Channel{Any},Bool,Bool,Bool})() at ./task.jl:259

Design Discussion: Parallel IR

Me and @shashi have discussed how we might use a graph-program-format as a kind of parallel IR.

The idea is to have an hourglass design; the DAG is a small core abstraction, above it you build parallel programs, libraries, algorithms, below it you build custom optimisation passes, integrate with other backends and hardware and so on. If things are loosely coupled enough then the problem statement and its implementation become orthogonal; everything is very plug-and-play, and you start building a parallel computing ecosystem rather than just a bunch of apps on top of a single framework.

From 10,000 feet, the building blocks are:

  1. Graph data structures and functions on them – they need to be as natural to work with as trees.
  2. A design for how to specify programs as graphs. (What protocol do operations in the graph satisfy, if any? Are they black boxes? Do they provide a cost model, hardware affinity? etc.)
  3. Splitting packages like ComputeFramework into modular pieces which communicate via the above.
  4. ???
  5. Profit

Flow.jl is a start on (1). Shashi has had (3) in mind while designing CF. (2) and (4) involve a lot of tough design decisions that we haven't figured out yet. For example, how do we get optimisation passes or schedulers to cooperate without explicitly knowing about each other? (5) will take a while.

Of course, it's absolutely key that the DAG is expressive enough to represent a useful subset of parallel programs. Dynamic sizing is enough to express recursive / irregular problems. What about other abstractions, like distributed channels – how do they fit in? Do they need to?

To reason about this I think it may be useful to evaluate the dimensions along which parallel programs may vary. Off the top of my head:

  • "Small model, big data" vs. "Large model, small data"
  • Evaluate-once vs. evaluate-repeatedly
  • Regular vs. irregular

e.g. BigLinalg is 001, NNs are 111. What other dimensions exist, and can we come up with practical examples across all of them?

Any obvious holes in this plan?

Create array interface

Would be nice to have all the usual array metadata methods, such as size etc. Also would be nice to have basic concatenation. setindex! is perhaps best to leave out for now.

Examples not working

Thank you for the presentation during JuliaCon 2016.

I've been trying to check out your package but I ran into problems. Please verify your examples after the package name change i.e. logreg.jl:

WARNING: Module ComputeFramework not defined on process 1
ERROR: LoadError: UndefVarError: ComputeFramework not defined
 in deserialize at serialize.jl:504
 in handle_deserialize at serialize.jl:477
 in deserialize_datatype at serialize.jl:640
 in handle_deserialize at serialize.jl:467
 in deserialize at serialize.jl:437
 in anonymous at serialize.jl:480
 in ntuple at ./tuple.jl:32
 in deserialize_tuple at serialize.jl:480
 in handle_deserialize at serialize.jl:458
 in load at /home/user/.julia/v0.4/Dagger/src/basics/file-io.jl:157
 in load at /home/user/.julia/v0.4/Dagger/src/basics/file-io.jl:130
 in include at ./boot.jl:261
 in include_from_node1 at ./loading.jl:320
while loading /home/user/julia/parallel/logreg.jl, in expression starting on line 40

Domain abstract type, View and Cat are redundant

With #45 and #46 we should end up with a single array type which is like:

immutable DaggerArray{T,N} <: AbstractArray{T,N}
    dag::AbstractChunk{<:AbstractArray{T,N}}
end

But we can simplify things even further if we have this instead:

immutable DaggerArray{T,N} <: AbstractArray{T,N}
    size::NTuple{N,Int}
    chunks::Associative{ArrayDomain, Any}
end

where chunks is an efficient mapping from sub-index-spaces to Chunks and/or Thunks.

To keep things simple we can stop storing domain in Chunks. Since the subdomains info has been moved to DaggerArray we can stop tracking it in Cat as well, which makes Cat pretty much an extraneous wrapper equivalent to Thunk(_hvcat, size(chunks), chunks...). By extension View also is Thunk(getindex, chunk ,idxs...).

code around Cat, View and Sub were too array-specific anyway and moving this stuff to DaggerArray will dump less baggage on non-array implementations on Dagger.

Finally it doesn't make sense for Domain to be an abstract type and ArrayDomain to be its subtype. Just drop that relationship and remove Domain. It seems "IndexSpace" is the standard term for this thing according to http://dl.acm.org/citation.cfm?id=898874 so maybe we can use that.

The only thing we will be giving up is having a View on a Cat be able to turn into a Cat of Views outside of a direct getindex (getindex should be handled efficiently by the newly proposed Array type) which doesn't seem to happen that often in the wild.

Scheduler not dispatching to workers

This may be the same issue as #71 .

I have 3 workers and I'm using Julia 0.6.2.

This simple graph has 3 leaf thunks (x2) and 1 master thunk (x3). It works properly.

julia> x2 = [delayed(_ -> (sleep(2); println(i)))(nothing) for i in 1:3]
3-element Array{Dagger.Thunk,1}:
 *29*
 *30*
 *31*

julia> x3 = delayed((x...) -> println("done: ", x))(x2...)
*32*

julia> @time compute(x3)
	From worker 3:	2
	From worker 4:	3
	From worker 2:	1
	From worker 2:	done: (nothing, nothing, nothing)
  2.320594 seconds (21.23 k allocations: 1.159 MiB)
Dagger.Chunk{Void,MemPool.DRef}(Void, Dagger.UnitDomain(), MemPool.DRef(2, 39, 0x0000000000000000), false)

This case is broken. The only difference is that the new leaf thunk (x1) was added for which x2 thunks depend on.

julia> x1 = delayed(x -> (sleep(2); println("root")))(1)
*33*

julia> x2 = [delayed(_ -> (sleep(2); println(i)))(x1) for i in 1:3]
3-element Array{Dagger.Thunk,1}:
 *34*
 *35*
 *36*

julia> x3 = delayed((x...) -> println("done: ", x))(x2...)
*37*

julia> @time compute(x3)
	From worker 2:	root
	From worker 2:	1
	From worker 2:	2
	From worker 2:	3
	From worker 2:	done: (nothing, nothing, nothing)
  8.153534 seconds (31.29 k allocations: 1.678 MiB)
Dagger.Chunk{Void,MemPool.DRef}(Void, Dagger.UnitDomain(), MemPool.DRef(2, 44, 0x0000000000000000), false)

Distribute scales linearly with proc count.

Ideally, the time taken to compute a dist node should not depend on the number of processes. I ran the following script for different proc counts:

num_procs = parse(Int, ARGS[1])
addprocs(num_procs)
using ComputeFramework
println("Number of procs = ", num_procs)
compute(Context(), distribute(BitArray(2^10)))
@time compute(Context(), distribute(BitArray(2^33)))

The output times seem to scale linearly with proc count:

Number of procs = 1
  2.239719 seconds (751 allocations: 2.000 GB, 0.51% gc time)
Number of procs = 2
  3.073824 seconds (770 allocations: 2.000 GB, 1.44% gc time)
Number of procs = 3
  4.186631 seconds (3.72 k allocations: 2.000 GB, 2.97% gc time)
Number of procs = 4
  5.485914 seconds (4.28 k allocations: 2.000 GB, 3.83% gc time)
Number of procs = 5
  6.766762 seconds (4.97 k allocations: 2.000 GB, 3.04% gc time)
Number of procs = 6
  7.805935 seconds (5.67 k allocations: 2.000 GB, 3.68% gc time)
Number of procs = 7
  9.502370 seconds (6.38 k allocations: 2.000 GB, 2.98% gc time)
Number of procs = 8
 10.497571 seconds (7.40 k allocations: 2.000 GB, 2.76% gc time)
Number of procs = 9
 11.757150 seconds (8.45 k allocations: 2.001 GB, 2.45% gc time)
Number of procs = 10
 13.046635 seconds (9.52 k allocations: 2.001 GB, 2.24% gc time)

I'm not sure why this is hapenning.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.