beacon-biosignals / ondabatches.jl Goto Github PK
View Code? Open in Web Editor NEWLocal and distributed batch loading for Onda datasets
License: MIT License
Local and distributed batch loading for Onda datasets
License: MIT License
It would be nice to be able to extend it, in case e.g. something else needs to happen when collecting individual batch items for a particular type of Batcher or BatchItem. I think the issue is now we have a table instead of a single item, so we can't just dispatch off the item type. One option is allowing users to pass a singleton type to use for dispatch which gets passed around, or something like that.
In my case, I have 2 needs that the current method doesn't do:
map
instead of asyncmap
since my materialize_batch_item
is doing a bit more work and it's not IO bound (I think)Array
s yet but rather a Vector{Samples}
and an array, since I still want the samples wrapper for a bit longer. [This part I could avoid by refactoring to move more work into this portion since I do need a raw array in the end, but it would be nice to be able to do that more incrementally without piracy]edit: also realized materialize_batch_item
tends to need piracy too, since it comes in as a NamedTuple
, not a BatchItem
(or such) object, at least w the Legolas v0.4 branch
@ericphanson noted in beacon-biosignals/OldOndaBatches.jl#18 that there's some apparently extraneous data movement in multi-worker batching: #18 (comment)
so here the "batch manager" (the process with the Batcher) fetch the future from the worker, and puts it on channel which is a remote channel to somewhere else (the trainer, I guess?). I wonder if that's extra data movement we don't strictly need
I guess we kinda do need it, bc if we just passed the remote channel on to the workers, they wouldn't know what order to do stuff in, and passing it through the batch manager let's us straighten out the order before loading up the remote channel.
This may or may not turn out to be an issue. One way around it would be to remotecall
the put!
to have it execute on the batch worker.
this is used often for "evaluation" mode rather than training mode: we want to sweep through a whole dataset, generating batches of either a certain size OR for whole recordings.
Similar to Legolas.jl and Onda.jl a tour of the code with examples would be helpful to get a sense of the use cases and common entrypoints.
Some examples in the README would also be good.
It's important that batches get the same number of samples and that label spans and signal spans are properly aligned, so we should be using AlignedSpans.jl internally wherever we're cross-referencing signals with potentially different sampling rates etc.
We would like to have a batching scheme that will balance batches AND ensure that a whole epoch covers the entire dataset. This is a lot more complicated than doing weighted/pseudorandom batching since you have to keep track of which labeled segments you've already handled. One strategy would be to generate all possible batch items and shuffle, but this could be a big footgun if there are MANY possible batch items. One way to cut down on the number of batches is to use partially overlapping windows of a particular duration, and have an index of label classes to windows.
This is a nice to have since it's very fiddly and requires a lot of validation to make sure it's correct and performant. For the time being the pseudorandom/online batching strategy works well enough for the kinds of models we're training.
Currently, RandomBatches
is a big chonky struct that includes three, separate things:
Really, these are basically orthogonal to each other, and we could support more flexible batching specifications by allowing users to "mix and match" different ways of sampling batches with some light refactoring.
We've observed that during long-running multiworker batching jobs, memory use on workers can increase gradually over time. Eventually this leads to the workers being OOM killed, or to performance degradation adn plateauing RAM load/increased CPU usage. This has been observed in the following circumstances:
start_batching
call)OndaBatches.get_channel_data
); simply specifying one or more channels in batch_channels
does not trigger this behaviorstack
is present in Base as of 1.9, and the implementation was added to Compat
in 4.2 (with a backport to 3.46). This does a better thought-out, in-place
copyto!
-based implementation of what _glue
was doing so we might as well use
it.
This issue is used to trigger TagBot; feel free to unsubscribe.
If you haven't already, you should update your TagBot.yml
to include issue comment triggers.
Please see this post on Discourse for instructions and more details.
If you'd like for me to do this for you, comment TagBot fix
on this issue.
I'll open a PR within a few hours, please be patient!
(OndaBatches) pkg> st
Project OndaBatches v0.4.4
Status `~/.julia/dev/OndaBatches/Project.toml`
[fbe9abb3] AWS v1.84.1
[1c724243] AWSS3 v0.10.3
[72438786] AlignedSpans v0.2.4
[a93c6f00] DataFrames v1.5.0
[741b9549] Legolas v0.5.9
[e853f5be] Onda v0.15.1
[2913bbd2] StatsBase v0.33.21
[bd369af6] Tables v1.10.1
[bb34ddd2] TimeSpans v0.3.6
[ade2ca70] Dates
[8ba89e20] Distributed
[cf7118a7] UUIDs
(OndaBatches) pkg> test
...
...
Testing Running tests...
ERROR: LoadError: MethodError: no method matching getpass(::IOStream, ::Base.TTY, ::String)
Closest candidates are:
getpass(::Base.TTY, ::IO, ::AbstractString) at util.jl:282
Stacktrace:
[1] getpass(prompt::String)
@ Base ./util.jl:291
[2] macro expansion
@ ~/.julia/packages/Mocking/MRkF3/src/mock.jl:29 [inlined]
[3] _aws_get_role(role::String, ini::IniFile.Inifile)
@ AWS ~/.julia/packages/AWS/v6b1A/src/utilities/credentials.jl:90
[4] dot_aws_config(profile::String)
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSCredentials.jl:437
[5] (::AWS.var"#9#13"{String})()
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSCredentials.jl:119
[6] AWS.AWSCredentials(; profile::Nothing, throw_cred_error::Bool)
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSCredentials.jl:128
[7] AWS.AWSConfig()
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSConfig.jl:32
[8] #global_aws_config#74
@ ~/.julia/packages/AWS/v6b1A/src/AWS.jl:79 [inlined]
[9] global_aws_config
@ ~/.julia/packages/AWS/v6b1A/src/AWS.jl:77 [inlined]
[10] get_config
@ ~/.julia/packages/AWSS3/tK7Rm/src/s3path.jl:141 [inlined]
[11] read(fp::AWSS3.S3Path{Nothing}; byte_range::Nothing)
@ AWSS3 ~/.julia/packages/AWSS3/tK7Rm/src/s3path.jl:626
[12] read
@ ~/.julia/packages/AWSS3/tK7Rm/src/s3path.jl:625 [inlined]
[13] read_arrow(path::AWSS3.S3Path{Nothing})
@ Legolas ~/.julia/packages/Legolas/sjEyS/src/tables.jl:114
[14] read(io_or_path::AWSS3.S3Path{Nothing}; validate::Bool)
@ Legolas ~/.julia/packages/Legolas/sjEyS/src/tables.jl:160
[15] read(io_or_path::AWSS3.S3Path{Nothing})
@ Legolas ~/.julia/packages/Legolas/sjEyS/src/tables.jl:159
[16] top-level scope
@ ~/.julia/dev/OndaBatches/test/OndaBatchesTests.jl:58
[17] include(fname::String)
@ Base.MainInclude ./client.jl:476
[18] top-level scope
@ ~/.julia/dev/OndaBatches/test/runtests.jl:1
[19] include(fname::String)
@ Base.MainInclude ./client.jl:476
[20] top-level scope
@ none:6
in expression starting at /Users/glenn/.julia/dev/OndaBatches/test/OndaBatchesTests.jl:1
in expression starting at /Users/glenn/.julia/dev/OndaBatches/test/runtests.jl:1
MethodError: no method matching getpass(::IOStream, ::Base.TTY, ::String)
Closest candidates are:
getpass(::Base.TTY, ::IO, ::AbstractString) at util.jl:282
Stacktrace:
[1] getpass(prompt::String)
@ Base ./util.jl:291
[2] macro expansion
@ ~/.julia/packages/Mocking/MRkF3/src/mock.jl:29 [inlined]
[3] _aws_get_role(role::String, ini::IniFile.Inifile)
@ AWS ~/.julia/packages/AWS/v6b1A/src/utilities/credentials.jl:90
[4] dot_aws_config(profile::String)
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSCredentials.jl:437
[5] (::AWS.var"#9#13"{String})()
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSCredentials.jl:119
[6] AWS.AWSCredentials(; profile::Nothing, throw_cred_error::Bool)
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSCredentials.jl:128
[7] AWS.AWSConfig()
@ AWS ~/.julia/packages/AWS/v6b1A/src/AWSConfig.jl:32
[8] #global_aws_config#74
@ ~/.julia/packages/AWS/v6b1A/src/AWS.jl:79 [inlined]
[9] global_aws_config
@ ~/.julia/packages/AWS/v6b1A/src/AWS.jl:77 [inlined]
[10] #s3_list_keys#46
@ ~/.julia/packages/AWSS3/tK7Rm/src/AWSS3.jl:682 [inlined]
[11] s3_list_keys(::SubString{String}, ::String)
@ AWSS3 ~/.julia/packages/AWSS3/tK7Rm/src/AWSS3.jl:682
[12] macro expansion
@ ./task.jl:454 [inlined]
[13] (::Main.OndaBatchesTests.var"#3#5")()
@ Main.OndaBatchesTests ~/.julia/dev/OndaBatches/test/OndaBatchesTests.jl:40
[14] _atexit()
@ Base ./initdefs.jl:372
[15] exit
@ ./initdefs.jl:28 [inlined]
[16] _start()
@ Base ./client.jl:525
We have tables of labels, which could be sparse throughout a recording, and could be overlapping. That makes them a bad fit for LabeledSignal's. Is there another way to use OndaBatches? What needs to happen here to support that?
This is already supported with some manual effort (need to manually construct the Samples
with each soft label class as a channel) but some guardrails would be nice.
MWEs based off the labeled_signals
generated by the test data.
julia> batches = RandomBatches(labeled_signals, nothing, nothing, 1, 3, Second(60))
julia> addprocs(3);
julia> @everywhere begin
using Pkg
Pkg.activate(@__DIR__)
using OndaBatches
end
From worker 2: Activating project at `~/.julia/dev/OndaBatches`
julia> batcher = Batcher(1, [2], batches; start=false);
julia> init_state = MersenneTwister(1)
MersenneTwister(1)
julia> start!(batcher, init_state);
julia> init_state # NOTE: start! mutated init_state
MersenneTwister(1, (0, 2256, 0, 36, 1002, 36))
julia> take!(batcher, init_state)
┌ Warning: mismatch between requested batch state and Batcher state, restarting
│ state = MersenneTwister(1, (0, 2256, 0, 12, 1002, 12))
│ prev_state = MersenneTwister(1)
└ @ OndaBatches ~/.julia/dev/OndaBatches/src/batch_services.jl:547
[ Info: stopping batcher
[ Info: batch channel closed, stopping batching
[ Info: channel close, stopping
┌ Warning: only one extra worker to load batches!
└ @ OndaBatches ~/.julia/dev/OndaBatches/src/batch_services.jl:419
[ Info: starting multi-worker batching, with manager 1 and workers WorkerPool(Channel{Int64}(9223372036854775807), Set([2]), RemoteChannel{Channel{Any}}(1, 1, 5)), at state MersenneTwister(1, (0, 2256, 0, 18, 1002, 18))
(([-3.75 -5.0 … -5.5 -11.75;;; -21.75 -18.0 … 14.5 7.25;;; -3.7155719473958015 -6.1875334940850735 … 12.748912675306201 11.78759429603815], [0x01 0x01;;; 0x01 0x01;;; 0x03 0x03]), MersenneTwister(1, (0, 2256, 0, 24, 1002, 24)))
julia> Unhandled Task ERROR: no process with id 0 exists
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:35
[2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64)
@ Distributed ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1097
[3] worker_from_id
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1089 [inlined]
[4] #remotecall_fetch#162
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492 [inlined]
[5] remotecall_fetch
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492 [inlined]
[6] call_on_owner
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:565 [inlined]
[7] isopen
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:769 [inlined]
[8] macro expansion
@ ~/.julia/dev/OndaBatches/src/batch_services.jl:171 [inlined]
[9] (::OndaBatches.var"#44#48"{RemoteChannel{Channel{Any}}, Channel{OndaBatches.BatchJob}})()
@ OndaBatches ./task.jl:484
julia>
Note that the above works if we refresh init_state
julia> take!(batcher, MersenneTwister(1))
(([-4.0 -2.25 … -8.75 -6.75;;; 8.25 4.25 … 4.0 4.75;;; 33.508812077343464 28.44281680881977 … -7.14122224599123 -13.36690317839384], [0x01 0x01;;; 0x01 0x01;;; 0x01 0x01]), MersenneTwister(1, (0, 2256, 0, 6, 1002, 6)))
Same as above:
julia> batcher = Batcher(1, [1], batches; start=false);
julia> init_state = MersenneTwister(1)
MersenneTwister(1)
julia> start!(batcher, init_state);
┌ Warning: only one extra worker to load batches!
└ @ OndaBatches ~/.julia/dev/OndaBatches/src/batch_services.jl:419
[ Info: starting multi-worker batching, with manager 2 and workers WorkerPool(Channel{Int64}(9223372036854775807), Set([2]), RemoteChannel{Channel{Any}}(1, 1, 123)), at state MersenneTwister(1)
julia> init_state
MersenneTwister(1)
julia> take!(batcher, init_state)
┌ Warning: mismatch between requested batch state and Batcher state, restarting
│ state = MersenneTwister(1, (0, 2256, 0, 36, 1002, 36))
│ prev_state = MersenneTwister(1)
└ @ OndaBatches ~/.julia/dev/OndaBatches/src/batch_services.jl:547
[ Info: stopping batcher
[ Info: batch channel closed, stopping batching
┌ Warning: only one extra worker to load batches!
└ @ OndaBatches ~/.julia/dev/OndaBatches/src/batch_services.jl:419
[ Info: starting multi-worker batching, with manager 1 and workers WorkerPool(Channel{Int64}(9223372036854775807), Set([1]), RemoteChannel{Channel{Any}}(1, 1, 14)), at state MersenneTwister(1, (0, 2256, 0, 36, 1002, 36))
(([35.03471426665783 33.386739902198315 … -5.981536582112312 -11.169604025781155;;; -159.75 312.0 … 165.5 -232.75;;; -47.549019515514374 16.17647075653076 … -1.4705880880355835 -99.50980389118195], [0x03 0x02;;; 0x04 0x04;;; 0x01 0x01]), MersenneTwister(1, (0, 2256, 0, 42, 1002, 42)))
julia> Unhandled Task ERROR: no process with id 0 exists
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:35
[2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64)
@ Distributed ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1097
[3] worker_from_id
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1089 [inlined]
[4] #remotecall_fetch#162
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492 [inlined]
[5] remotecall_fetch
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492 [inlined]
[6] call_on_owner
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:565 [inlined]
[7] isopen
@ ~/.julia-downloads/julia-1.8.2/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:769 [inlined]
[8] macro expansion
@ ~/.julia/dev/OndaBatches/src/batch_services.jl:171 [inlined]
[9] (::OndaBatches.var"#44#48"{RemoteChannel{Channel{Any}}, Channel{OndaBatches.BatchJob}})()
@ OndaBatches ./task.jl:484
julia> batcher = Batcher(2, [3, 4], batches; start=false);
julia> init_state = MersenneTwister(1)
MersenneTwister(1)
julia> start!(batcher, init_state);
julia> init_state # remains unchanged
MersenneTwister(1)
julia> take!(batcher, init_state)
(([-4.0 -2.25 … -8.75 -6.75;;; 8.25 4.25 … 4.0 4.75;;; 33.508812077343464 28.44281680881977 … -7.14122224599123 -13.36690317839384], [0x01 0x01;;; 0x01 0x01;;; 0x01 0x01]), MersenneTwister(1, (0, 2256, 0, 6, 1002, 6)))
Finally, for completeness it also works if we flip the manager and worker around:
julia> batcher = Batcher(2, [1], batches; start=false);
julia> init_state = MersenneTwister(1);
julia> start!(batcher, init_state);
┌ Warning: only one extra worker to load batches!
└ @ OndaBatches ~/.julia/dev/OndaBatches/src/batch_services.jl:419
[ Info: starting multi-worker batching, with manager 2 and workers WorkerPool(Channel{Int64}(9223372036854775807), Set([2]), RemoteChannel{Channel{Any}}(1, 1, 135)), at state MersenneTwister(1)
julia> take!(batcher, init_state)
(([-4.0 -2.25 … -8.75 -6.75;;; 8.25 4.25 … 4.0 4.75;;; 33.508812077343464 28.44281680881977 … -7.14122224599123 -13.36690317839384], [0x01 0x01;;; 0x01 0x01;;; 0x01 0x01]), MersenneTwister(1, (0, 2256, 0, 6, 1002, 6)))
Because when we start!
a Batcher
we create a new remote channel, the local storage that backs the channel can be created on any worker that calls start!
. Instead, we could use the where
field of the RemoteChannel
to determine where the channel was created originally, and create a new one on the same PID.
The way Onda loads samples is to always generate a Float64 array when loading, unless you request encoded samples and then decode yourself later with decode(samples, eltype::Type)
. We'd like to be able to allow users to specify the eltype of the desired array, probalby in materialize_batch
. To do this, I think we'd want to:
materialize_batch
, materialize_batch_item
, and load_labeled_signal
load_labeled_signal
, and decode to eltypeThis ensures that it can actually be used as a drop-in replacement for
Batcher.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.