Comments (29)
Okay, prefacing with the fact that the flusight repo is very small - only 29,600,000 rows, here are some initial benchmarks on the following task:
- create handle to the partitioned dataset -->
open_dataset()
- filter on a particular model -->
filter(model == "Delphi-Stat")
- group by either submission date or target -->
group_by(submission_date)
orgroup_by(target)
- summarize the mean of the value [ I realize this is meaningless; just for illustration purposes] -->
summarize(meanvalue = mean(value, na.rm=T))
- collect the result set -->
collect()
We compare three partitioning options
- partitioned on target (labelled by
targ
in benchmark table) - partitioned on submission date (labelled by
sdate
in benchmark table) - partitioned on target and submission date (labelled by
both
in benchmark table)
- When grouping variable is
target
:
cmds1 = alist(
targ = open_dataset("flusight1_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(target) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
sdate = open_dataset("flusight1_date/") %>%
filter(model=="Delphi-Stat") %>%
group_by(target) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
both = open_dataset("flusight1_date_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(target) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect()
)
Unit: milliseconds
expr min lq mean median uq max neval
targ 64.7840 82.7043 198.1586 91.2229 100.0729 674.7837 10
sdate 133.0033 136.7588 139.9850 140.2119 142.3588 147.1574 10
both 1155.5496 1199.8241 1261.3381 1216.4838 1283.8325 1569.8637 10
- When grouping variable is
submission_date
:
cmds2 = alist(
targ = open_dataset("flusight1_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(submission_date) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
sdate = open_dataset("flusight1_date/") %>%
filter(model=="Delphi-Stat") %>%
group_by(submission_date) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect(),
both = open_dataset("flusight1_date_targ/") %>%
filter(model=="Delphi-Stat") %>%
group_by(submission_date) %>%
summarize(meanvalue = mean(value, na.rm=T)) %>%
collect()
)
Unit: milliseconds
expr min lq mean median uq max neval
targ 72.2872 83.7387 222.6143 88.8084 103.2500 1037.7222 10
sdate 126.4035 135.8261 178.6519 139.8237 147.7848 535.2068 10
both 1191.7232 1204.7832 1233.7476 1237.5590 1251.3735 1290.6913 10
So, in this admittedly very limited example, its clear that the doubly partitioned dataset (partitioned on both target and submission date) causes some slow down, and that the smaller of the two single partitions (i.e. target) is faster, even when the grouping variable is mismatched (i.e. when grouping by submission_date, the smaller partition was still faster)
from hubverse-cloud.
actually, i've been thinking about this a bit more, and perhaps there is a way that we can "append" to a single dataset, without reloading the prior data, appending the new, and rewriting the updated datastet.
from hubverse-cloud.
Capturing some notes from Luke Mullany about common use cases:
I think it is in fact hard to say what the most common use cases might be.. However, I am thinking perhaps the submission date is first, and then the targets, and then the modeling team last.
I think I would suspect getting an entire set of data for a particular submission date (i.e. all models and all targets) would be priority for many analyses. .. The next filter would probable by on the specific type of target..
I guess I suggest the modeling team last, because although modeling team-specific analyses are certainly important, in the context of "hub", the whole purpose is to leverage/compare/contrast multiple models and their forecasts
Note all the above depends critically on all model submissions within a given "round" having identical "submission_dates".. If not, replace the above with some logic that replaces "submission_date" with something that does identify the "round"..
from hubverse-cloud.
Nice!
Would be interesting to also try a query that collects forecasts for say the last n
rounds for a given output type.
I believe that might be quite a common query for getting data to pass on to ensemble methods.
from hubverse-cloud.
One thought on this is that for the scenario hub examples, there are few rounds and each one has a lot of data. For a typical the forecast hub (at least so far) there are a lot of rounds, and relative to the entire project, each round would have relatively little data.
I agree that it probably makes sense to standardize storage and partitioning structure for all hubs in the cloud.
I don't have a clear sense of the trade-offs for having multiple datasets for a hub (one per round) vs. trying to partition on round. @annakrystalli was saying something about how she thought that "appending" a round to an existing dataset could be an expensive procedure computationally. (The entire partition would have to be rebuilt?)
From the end user perspective, I think we pretty definitely want to keep the ability to issue one "query" or "collect statement" to gather data from multiple rounds at once.
from hubverse-cloud.
Yes @nickreich - its that issue of appending (i.e. one can't really append, but rather needs to load-append-re-partition) that led me to the comments above re separate datasets per round..
re the tradeoffs, would be good to do some testing with an actual S3 bucket, and actual large size repo.. If there were substantial trade offs (i.e. with a single round-partitioned dataset being optimal), then one might rethink our consensus that one single approach is best. That is, we might consider a different more optimzed storage structure for "archived/ no-longer-active" hubs vs "currently active" hubs
from hubverse-cloud.
@annakrystalli , here is the repo I've been working with: https://github.com/cdcepi/FluSight-forecasts
Below is screenshot of five seasons - for purposes of playing around/testing, I just read in all datasets with these five seasons
from hubverse-cloud.
Thanks @elray1 !
Ideally nothing is hardcoded and any re-partitioning is configurable by admins. I was just referring to the examples we had been discussing above, with target being one example given.
from hubverse-cloud.
@lmullany , myself and @bsweger actually have a call set up for tomorrow! I was going to suggest you should join us if you can!
from hubverse-cloud.
Linking to notes from the chat the @lmullany and @annakrystalli and I had this morning (Anna and Luke, please correect anything I got wrong): https://docs.google.com/document/d/1NgEZ1i_mXukoOMrgPWFKWm8xZKdFs6G1sWxeOIfvmT4/
Our small working group is coming at this problem from different angles, which is very useful (imo)
A high-level question that emerged is: what are hoping to gain by re-organizing/munging model-output data in the cloud? What would make the additional dev and maintenance of moving parts worthwhile?
from hubverse-cloud.
Having said that, if you know you are only interested in a single output type, perhaps you could just connect to the output type sub directory (rather then the whole model output dir) and set the required data type for the output type Id column.
from hubverse-cloud.
Depending on where we land with this, we might need a related task for ensuring that hubUtils
will work with the new organization.
from hubverse-cloud.
for many repos, partitioning on more than round may be overkill, and lead to a huge number of small files.
for example, on my local machine, I just now cloned the flusight 1 repo, read in all 4,026 submissions (i.e. across numerous flu seasons and models), combined into a single data.table, and creating a hive-partitioned dataset using arrow.. (with submission date and target)
now this would create >3000 files, many of which are quite small where total amount of metadata is starting to dominate relative to actual data of interest!. Granted, this is true partly because the submission dates in flusight are not sufficiently specific to round (see my other comment).
if i recreate the partition only on submission_date or only on target, obviously the number of files and the total size of the combined partitioned dataset will be much smaller.
from hubverse-cloud.
One level of partition indeed sounds about right. And definitely much easier to implement in a generalisable fashion than trying to add further partitions. Would be interesting to benchmark different queries on the two hubs (one standard partitioned and the one you repartitioned on submission date).
from hubverse-cloud.
yeah, that might be common; probably a good implementation is via identifying the last n round indicator, and then joining.. something like this:
get_last_n_rounds <- function(dataset,target_type, n=5) {
inner_join(
open_dataset(dataset) %>% filter(target == target_type),
open_dataset(dataset) %>% distinct(submission_date) %>% arrange(desc(submission_date)) %>% slice_head(n=n),
by="submission_date"
)
}
cmds4=alist(
targ = get_last_n_rounds("flusight1_targ/", target="Season onset") %>% collect(),
sdate = get_last_n_rounds("flusight1_date/", target="Season_onset") %>% collect(),
both = get_last_n_rounds("flusight1_date_targ/", target="season_onset") %>% collect()
)
microbenchmark(list=cmds4, times=10)
Unit: milliseconds
expr min lq mean median uq max neval
targ 469.5469 491.2216 528.6626 498.8077 536.0588 746.3325 10
sdate 490.5871 501.6316 543.4063 515.6881 554.7503 728.3711 10
both 1859.2467 1893.0724 2014.3022 1980.3282 2027.8139 2365.7026 10
but you all may have different ideas.. arrow as far as I know does not support a number of alternative ways you might consider doing this (i.e. via n()
, row_number()
, grouped slicing (e.g. slice_head(n=<n>, by=..)
, etc, etc.
Again note that I'm using "submission_date" as a stand in for the concept of "round", but this is for illustration only, and I really mean the latter (or whatever the current hubverse parlance is for a single "round")
from hubverse-cloud.
Actually, I think that partitioning by "round" (above "submission date"), might be the best choice, but only for retrospective storage
If the storage is being accumulated over time, then I think it make sense to have a separate (possibly partitioned) dataset for each round. That is, the data are accumulated for a round, and that round contstitutes a dataset, that could be partitioned by target. Then the next round is accumulated sometime later, and that round constitutes a dataset, which again could be partitioned by target, etc, etc.
And it seems we wouldn't want to have a different approach to storage partitioning for retrospective and prospective collection of data, right? Meaning, that generating a single hive-partiioned multi-file dataset for an entire repo, partitioned by round (aka the above examples in the benchmarking threads), makes little sense.
from hubverse-cloud.
I'm not super familiar with the ins and outs of arrow-style datasets, but if we did have separate datasets per round, would that kind of set-up be compatible with a single "collect-style" argument to load data from multiple rounds? If not, what would the right strategy be to support single-argument-style queries on the data? E.g. would we need build a user-facing function that would wrap a few collect calls for different rounds?
from hubverse-cloud.
I'll try to unpick a little bit the limitations I think we are facing and how I think it's best to proceed.
While arrow::write_dataset()
is a great tool for partitioning a dataset, especially complex partitions, the inability to use it to append to an already partitioned dataset makes it costly to use for regular updates as it would effectively require reading and re-writing the entire hub dataset to S3 every time a new file is added (or perhaps on a less frequent schedule).
Having said that, @bsweger & Matt pointed out that we could do away with write_dataset()
and re-organise the data ourselves. How easy and how much investment we'll need to make to develop the code to do that will depend on how complex we want the repartitioning to be.
Given each file submitted to a hub by a given team relates to a single round and includes the round_id
as the first element of the file name, it would be relatively straight forward to simply place it in a round ID folder on S3 instead of a team folder without any change to the file itself actually we would need to add the model_id
to the file. This would be pretty easy to do.
Re-partitioning across targets, however, would require that individual submission files be read, split across multiple files according to target and then, most likely appended to existing files on S3 to avoid splitting up into too many tiny files. Ensuring this is accomplished robustly obviously will require more work. More complex partitioning == more work etc.
My suggestion moving forward would be to:
- Engage the community to develop a realistic list of common hub data queries.
- Perform detailed benchmarking of that list of queries over a number of example & differently partitioned hubs. This should also include optimising queries.
This should then give us a better idea of whether it's worth the effort to develop additional tools to support more complex regular repartitioning.
It might also be good to get a feeling for the actual cost and limits (e.g. do GitHub Actions have a limit of how much data they can process by a free runner?) of re-partitioning the hub using write_dataset()
as it grows.
Also +1 on the suggestion to actually start benchmarking over S3 too.
@lmullany could you point me to the repo you have been using for your benchmarking experiments so far?
from hubverse-cloud.
Let's say we have an existing single dataset containing the entire hub's data collected thus far partitioned by round (here round is a string date, but it could be anything (i.e. "R1", "R2" etc, or "1", "2", "3", etc..) [note, this is just some data from Scenario Modeling Hub (not the FluSight repo mentioned throughout the benchmarking in this thread]
It might look something like this, with say 10 completed rounds (in this case, labelled by date string from 2021-05-02 to 2022-07-19
if we run
open_dataset("active_hub") %>% glimpse()
, we set that there are 9 million rows across 10 parquet files:
Now, lets say we have some new data (new_data
) from the next round, lets say "2022-10-19", and we want to add/append this to the same arrow multi-file dataset.
We can simply do:
# write the new data to the same dataset
write_dataset(new_data, "active_hub", partitioning = "round")
and now out dataset looks like this:
and similarly if we again run
open_dataset("active_hub") %>% glimpse()
we get
The single dataset has been properly appended, without re-reading or re-partitioning the existing data
from hubverse-cloud.
Right, but most hubs are organised under model IDs and we would want to repartition them across round IDs.
from hubverse-cloud.
sure, but within a round, we only have to row bind all the submissions from a single round, and then append that single new round to an arrow dataset.. I guess what I'm saying is that the process seems trivial regardless of whether it is an active or closed/complete hub
perhaps I'm missing something here. We could schedule a conf call to discuss further, sometime this week?
from hubverse-cloud.
It seems like this may have already been settled -- but responding to Anna's comment above alluding to a possibility of partitioning by target, wanted to note that although every hub has a concept of a "target", we don't impose any requirement that model outputs record a field called target
(e.g., if the hub only collects forecasts for one target, dropping that into the model output files would be superfluous). So if we're aiming to build a "one solution for all hubs" kind of answer here, we likely don't want to hard code target
as a part of that.
from hubverse-cloud.
A few comments:
- In general, I think we will “just know” the last n round id names (e.g. if I know today’s date, it’ll be today’s date minus 7*range(0,n-1) — couldn’t tell if you were computing this or if it was a determinant of performance here.
- My most common queries are:
- Building an ensemble: query speed is not critical: For some specified set of rounds, all models, all predictions
- Interactive vis: query speed is nice – this is the main one where I sit around impatiently waiting for query results to return: For one round, one “task” (location/date), all models, enough model outputs to plot point prediction and intervals. For example,
- E.g., 5 quantile levels
- Or all pmf bin probabilities
- I’m on board with avoiding premature optimization, just using a switch to parquet for now. It actually sounds like for the vis use case, duckdb might be the fastest answer?
from hubverse-cloud.
given this discussion this morning, I agree that simply mirroring the structure, but writing to parquet, might be pretty high on the list on candidate approaches.
if we are going to index a db, or partition for some optimization, round-driven queries (both of @elray1 examples) I think are very common (whether that's for an analysis/viz of a single round, or a set of recent rounds, etc).
from hubverse-cloud.
Just adding the link to the initial benchmarking here too: https://partition-benchmarking.netlify.app/
from hubverse-cloud.
To add on to @elray1 's comments above about common queries. I think a common secondary analysis of forecast data involves pulling down all forecasts of one target, for all models, for perhaps all or a subset of rounds. E.g. for an analysis of accuracy of forecasts of deaths, I just want "k step ahead incident hospitalizations" (for all possible values of k) from the past 20 weeks. Because this is a secondary analysis and not some interactive web app, "instantaneous" speed is not critical.
from hubverse-cloud.
Another comment here is that another partition that we could consider would be by output_type. One advantage here is that it might move us more towards a zoltar-style "one-table-per-output-type" kind of model which could have advantages later and help us get around some of the downstream data-type challenges that we have when output_type_id can take both character and string values (e.g., here).
from hubverse-cloud.
Just to note that unfortunately I don't think splitting across output types would solve the data type issue if accessing the data as an arrow dataset as the dataset as a whole uses a single schema across all files. If there are data type differences across files on the same column you need to explicitly define a single data type to be used for those columns or opening the dataset fails.
from hubverse-cloud.
Closing this one because we've verified that writing the model-output files as parquet (and added columns for round_id, team, and model) works with the existing hubData
tools.
At this point, we need more real-world usage patterns before deciding to re-organize the data. We had talked about potentially using Zoltar query history as a data point (and thanks @matthewcornell for volunteering to help with that!), but let's wait until we get some hubverse-centric feedback about this.
from hubverse-cloud.
Related Issues (20)
- Create an AWS account for the Hubverse
- Pilot an Infrastructure as Code tool for onboarding hubs to the cloud HOT 5
- Create an initial proof of concept for syncing hub data to AWS S3
- Create AWS alert for unusual activity HOT 3
- Decide on a data format for hubverse cloud storage HOT 3
- Schedule a demo of Hubverse cloud infrastructure HOT 1
- How will we automate the conversion of hub data to parquet after syncing to S3? HOT 5
- Investigate the actual behavior of S3 sync HOT 4
- Switch sync utility used in hubverse-aws-upload workflow HOT 1
- Create a test function to transform model-output data HOT 2
- Test the hubverse-aws-upload workflow against a large volume of data HOT 5
- Create proof-of-concept for using S3 triggers for automated conversion of model-output files HOT 3
- Get IaC production-ready: documentation HOT 2
- Get IaC production-ready: add branch protections HOT 1
- Get IaC production-ready: add linting and type checking
- Get IaC production-ready: remove GitHub secret for Pulumi AWS access
- Move model-output transform function to its own repository
- Get IaC production-ready: add test suite to Pulumi code
- test item - delete me
- [ORG NAME CHANGE]: Update repo to hubverse-org organisation name HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from hubverse-cloud.