Giter Club home page Giter Club logo

locustdb's People

Contributors

cactter avatar cswinter avatar dbxnicolas avatar ddfisher avatar dependabot[bot] avatar dirkjonker avatar gitter-badger avatar virattara avatar zhzy0077 avatar zoidyzoidzoid avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

locustdb's Issues

Refactor to make it easy to add new functions

I envision a world where adding a new function to LocustDB looks something like this:

// subtraction.rs
struct Subtract;

impl<T: Integer, U: Integer> Op2<T, U, i64> for Subtract {
    fn op(x: T, y: U) -> i64 { x.to_i64() - y.to_i64() }

    // optional properties used by query engine/optimiser
    fn is_commutative() -> bool { false }
    fn is_order_preserving() -> bool { true }
    // ...

    fn display_name() -> &'static str { "-" }
    fn display_infix() -> bool { true }
}
fn init_function_registry() {
    // ...
    register_function(
        Box::new(Subtract),
        specialize_int_int_i64!(Subtract),
    );
    // ...
}

There are a number of obstacles that have to be removed to make that happen.

Generating specializations

Look at the factory methods defined under impl VecOperator { and despair. We should have some macros that can automatically generate all that crap.

E.g. specialize_int_int_i64! would return something like a dictionary that maps (EncodingType, EncodingType) to a function that can be called to create a new BoxedOperator implementing subtraction for the corresponding types and includes implementations for for vector and scalar inputs. We will need to have a small number of actual VecOperator implementations to make this possible, similar to but slightly more general than e.g. the VecConstBoolOperator.

As an added bonus, this will also make it much easier to add support for new types and have them be supported by all existing functions.

Query planner

The create_query_plan function currently contains large (mostly copy-pasted) case for each individual function. It should be possible to replace this with a single piece of code that works for any function by using a table that maps each function to its properties and factory methods. This table is populated by the register_function in the example above.

Parser

It should not be necessary to modify the parser, at the most we should have to add the function name to a single list of valid names. Also see #4.

Open the floodgates

Add ALL the functions.

Expand cases where intermediary results can be streamed between operators

There are a lot of cases where intermediary results could be streamed between operators, but aren't. One example is inability to stream from operators that produce a full output. There are probably some low hanging fruit, but proper solution is more involved and will need to revisit the overall design to solve this better, probably moving the graph partitioning from QueryExecutor to QueryPlanner to allow insertion of additional operators to aid streaming data.

Tweak RocksDB options

Performance might benefit from ReadOptions.readahead and ReadOptions.iterate_upper_bound once those are exposed by rust-rocksdb

CLI/REPL improvements

All the bin/repl/main.rs currently does is to look for some magic strings in the first command line argument to know whether it should load the nyc taxi trips data set, and otherwise assumes the argument to refer to a single file. Just setting up rustflags or something would be an improvement, and once we have that we can add more features:

  • Pass in multiple files
  • Flags for various options (partition size, thread count, unzip files, ...)
  • --help flag and usage info
  • Some lightweight way to pass in names/types of columns which can't always be determined automatically

`to_year` function is broken

locustdb> select passenger_count, to_year(pickup_datetime) from trips where (passenger_count = 9) and (to_year(pickup_datetime) = 2014);
Not implemented: SQLFunction { id: "to_year", args: [SQLIdentifier("pickup_datetime")] }

The parser sucks

The parser solves a fairly well-defined problem and has a simple and clean interface to the rest of LocustDB so this is quite easy to work on without knowing much about the rest of the code.

I hacked together the parser within a few hours during the original hack week, and it has mostly remained unchanged since. It has serious issues:

  • no usable error messages
  • no operator precedence
  • not built in a principled way and not super easy to extend with new operators or functions
  • various quirks where queries that seem like they should parse don't
  • it uses an outdated version of nom, and I think there might be good reasons not to use no nom at all because parser performance is irrelevant

Even if we ultimately decide to build our own parser, it would probably not be a bad idea in the short term to just leverage some other existing SQL parser. All we would need to do is add an additional pass that rejects parts of SQL not supported by LocustDB (most of them) and maps the rest into the AST format expected by the query engine.

Panics! Panics everywhere!

$ find . -name '*.rs' | xargs grep panic | wc -l
62

Make this number go down by replacing panics with Results and bubbling them up to the toplevel.
The biggest offenders are the cast_* methods on AnyVec and the factory methods on impl VecOperator.
There's plenty of unwraps also.

Add support for mixed type columns

Columns that contain both strings, ints and null values.

  • when loading from CSV
  • in select clause
  • in group by clause
  • in order by clause

No such subcommand: `+nightly`

I'm seeing the following error when following your README install notes.

$ RUSTFLAGS="-Ccodegen-units=1" CARGO_INCREMENTAL=0 cargo +nightly run --release --bin repl -- test_data/nyc-taxi.csv.gz
error: no such subcommand: `+nightly`

I'm on Ubuntu 16 with the following packages installed:

$ sudo apt install curl git cargo make
$ curl https://sh.rustup.rs -sSf | sh

Parallelize merge

Final pass for merging query results is done by a single thread. Parallelizing this would give large speed up for queries with high cardinality group bys.

Ordering by grouping column performs unnecessary work

Note this portion of the query plan which combines passenger_count with itself:

casted_1     = column_0 as I64                               TypeConversionOperator<u8, i64>
bitpacked_2  = casted_1 + (casted_1 << $shift)               ParameterizedVecVecIntegerOperator<BitShiftLeftAdd>

This causes a roughly 3x slowdown (on this query). When creating the grouping key, each unique expression should just be used once.

Full query plan:

locustdb> :explain SELECT passenger_count, count(0) FROM trips ORDER BY passenger_count LIMIT 100;

Query plan in 22412 batches
-- Stage 0 (streaming) --
column_0     = "passenger_count".0                           ReadColumnData
casted_1     = column_0 as I64                               TypeConversionOperator<u8, i64>
bitpacked_2  = casted_1 + (casted_1 << $shift)               ParameterizedVecVecIntegerOperator<BitShiftLeftAdd>
constant_3   = Constant<Integer>                             Constant
count_4[bitpacked_2] += 1                                    VecCount<i64>

-- Stage 1 --
nonzero_indices_5 = nonzero_indices(count_4)                 NonzeroIndices<u32, i64>

-- Stage 2 --
             = count_4[count_4 > 0]                          NonzeroCompact<u32>

-- Stage 3 --
casted_6     = count_4 as I64                                TypeConversionOperator<u32, i64>

-- Stage 4 --
unpacked_7   = (nonzero_indices_5 >> $shift) & $mask         BitUnpackOperator
casted_8     = unpacked_7 as U8                              TypeConversionOperator<i64, u8>
casted_9     = casted_8 as I64                               TypeConversionOperator<u8, i64>

-- Stage 5 --
unpacked_10  = (nonzero_indices_5 >> $shift) & $mask         BitUnpackOperator
casted_11    = unpacked_10 as U8                             TypeConversionOperator<i64, u8>
casted_12    = casted_11 as I64                              TypeConversionOperator<u8, i64>

Not implemented: Failed to pack group by columns into 64 bit value

locustdb> select count(1) from default ;
Not implemented: Failed to pack group by columns into 64 bit value
locustdb> select Location from default limit 10 offset 9999  ;

Scanned 65.5 thousand rows in 575μs (0.11 billion rows/s)!

Location
-------------------------------------------
"(40.683246921488006, -73.96177039490063)"
"(40.85617232035731, -73.90313819268951)"
"(40.60049182458343, -73.96232553044085)"
"(40.79027285680634, -73.94162987401177)"
"(40.833094237517365, -73.9091777076508)"
"(40.64359070987828, -74.01084624332543)"
"(40.73571318781656, -73.98697373014004)"
"(40.652461128591106, -73.9533333272863)"
"(40.636365702226996, -74.07724500466884)"
"(40.8470602597306, -73.9177429027627)"



Streaming merge

Add support for streaming subresults between merge operators. Would give significant speedups for queries that perform high cardinality group bys.

Audit TODOs

There are 63 TODOs in the code. They should all be actioned, deleted, or turned into issues.

Add some tags to the repo

I was having a massively hard time finding this project after reading the blog post a few days ago.

I suggest adding some tags so that github indexes the repo in the explore pages. Also consider adding it to some other places like here.

Great work btw. Very interesting read and loved how you linked to the code from the blog post. 🍻

Allow arbitrary expressions in ORDER BY clause

Currently, ORDER BY expression is constrained to be just a column name, and to be identical to at least one SELECT projection.
Queries like this should also work:
SELECT name FROM default ORDER BY y/10

Basic functions/operators

Blocked on #6 and partially #32.

  • count
  • sum
  • min
  • max
  • avg
  • LIKE
  • +, -, *, /, %
  • String length
  • AND
  • OR
  • NOT
  • =, >, <, >=, <=, <>
  • IS NULL
  • IS NOT NULL

Add support for entropy coding

LocustDB should have support for entropy coding data using Huffman or ANS. This seems to be the leading implementation, but there don't actually seem to be any Rust bindings for it yet.

Once there are bindings, actually integrating with LocustDB requires changes in a number of places, but there are good examples for how to do so in previous commits:
Adding hex encoding: 3455b20
Stripping lz4 compression for data cached in memory: fc83844

Synthetic table data

The taxi ride dataset used in tests and benchmarks is starting to outlive its usefulness. Some issues:

  • the dataset is unwieldy and takes up a bunch of disk space
  • loading even a subset of 100M records takes very long (minutes) which makes benchmarking cumbersome
  • the columns in the dataset only cover a small subset of real world data distributions, which means that many important missing features cannot be currently tested or benchmarked easily

I think a good solution is to create a new module for synthetically generating column data which will solve the load problem and allow us to easily create and test on a variety of different data distributions. Benchmarking on real data sets will always remain important, but I think synthetic benchmarks/tests are a better fit for most of development .

Rough Milestones

MVP

Set up a new method on the database that takes in a set of column generators and uses them to create a new table. I can provide some more details for how to do that if requested. At the end of this, we should have a single integration test case that runs on a generated column.

Research

There may be existing libraries for data generation that we can leverage. We should figure out which ones they are and whether they would work for us.

Migrate

Add additional generators to cover all the data distributions found in existing benchmarks that currently use the taxi ride dataset. Incrementally replace all of those benchmarks with equivalent ones running on synthetic data.

Flesh out

Add additional generators and test cases/benchmarks that can lay the foundation for new feature work. Some examples of distributions that would be useful:

  • integer column with a wide range of values, but a comparatively small number of unique values
  • sparse columns that are mostly null
  • string column that is not dictionary compressible
  • delta compressible timestamp column (poisson distribution, maybe with additional bursts/variable rate)
  • timestamp column with very regular interval that is delta-delta encodable
  • columns containing mixed integer and string data

build failure

Hi, I'm getting a compiler error when trying to build:

repo is at c594299
rustc 1.30.0-nightly (33b923fd4 2018-08-18)

    Checking futures-executor v0.2.1
    Checking backtrace v0.3.5
    Checking csv v1.0.0
    Checking failure v0.1.1
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:543:49
    |
543 |         hasher.input(&discriminant_value(&plan).to_bytes());
    |                                                 ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `usize` in the current scope
   --> src/engine/query_plan.rs:547:37
    |
547 |                 hasher.input(&index.to_bytes());
    |                                     ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `usize` in the current scope
   --> src/engine/query_plan.rs:551:40
    |
551 |                 hasher.input(&buffer.0.to_bytes());
    |                                        ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:561:54
    |
561 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:576:65
    |
576 |                 hasher.input(&discriminant_value(&initial_type).to_bytes());
    |                                                                 ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:577:64
    |
577 |                 hasher.input(&discriminant_value(&target_type).to_bytes());
    |                                                                ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:583:54
    |
583 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `usize` in the current scope
   --> src/engine/query_plan.rs:594:43
    |
594 |                 hasher.input(&total_bytes.to_bytes());
    |                                           ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:601:54
    |
601 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:609:54
    |
609 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:615:54
    |
615 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:621:55
    |
621 |                 hasher.input(&discriminant_value(&t1).to_bytes());
    |                                                       ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:622:55
    |
622 |                 hasher.input(&discriminant_value(&t2).to_bytes());
    |                                                       ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:630:59
    |
630 |                 hasher.input(&discriminant_value(&data_t).to_bytes());
    |                                                           ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:631:61
    |
631 |                 hasher.input(&discriminant_value(&select_t).to_bytes());
    |                                                             ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:645:53
    |
645 |                 hasher.input(&(shift_amount as u64).to_bytes());
    |                                                     ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u8` in the current scope
   --> src/engine/query_plan.rs:651:37
    |
651 |                 hasher.input(&shift.to_bytes());
    |                                     ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u8` in the current scope
   --> src/engine/query_plan.rs:652:37
    |
652 |                 hasher.input(&width.to_bytes());
    |                                     ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:660:62
    |
660 |                 hasher.input(&discriminant_value(&left_type).to_bytes());
    |                                                              ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:668:62
    |
668 |                 hasher.input(&discriminant_value(&left_type).to_bytes());
    |                                                              ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:676:62
    |
676 |                 hasher.input(&discriminant_value(&left_type).to_bytes());
    |                                                              ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:691:62
    |
691 |                 hasher.input(&discriminant_value(&left_type).to_bytes());
    |                                                              ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:722:54
    |
722 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `usize` in the current scope
   --> src/engine/query_plan.rs:723:33
    |
723 |                 hasher.input(&n.to_bytes());
    |                                 ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:732:54
    |
732 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:740:54
    |
740 |                 hasher.input(&discriminant_value(&t).to_bytes());
    |                                                      ^^^^^^^^
error[E0599]: no method named `to_bytes` found for type `u64` in the current scope
   --> src/engine/query_plan.rs:746:64
    |
746 |                     RawVal::Int(i) => hasher.input(&(i as u64).to_bytes()),
    |                                                                ^^^^^^^^
error: aborting due to 27 previous errors
For more information about this error, try `rustc --explain E0599`.
error: Could not compile `locustdb`.

asking for documentation of usage and more detailed instructions

Hi, Thanks a lot for the work!

I am a bit confused with the following part:

You can pass the magic strings nyc100m or nyc to load the first 5 files (100m records) or full 1.46 billion taxi rides dataset which you will need to download first (for the full dataset, you will need about 120GB of disk space and 60GB of RAM).

So does this mean the full dataset will require 60GB of ram to query? what is the normal hardware requirement if let's say if I want to process 1TB of data -- 500GB of RAM?

Can't sort by multiple columns

Currently, the query engine and planner are missing support for sorting on multiple columns.

<Braindump>
Supporting multiple columns in order by clauses requires changes in almost all parts of the query engine and parser, but most of these changes are relatively small and don't require fundamentally new logic.

On a high level, I am aware of two different approaches that can be used for sorting data by multiple keys. Both start out by first creating a vector of indices 0..len-1, then sorting this vector according to the value at the index in the columns we want to sort by, and finally shuffling all result columns by reading from them in the order defined by this vector of indices.

Multiple stable sorts

Perform a sequence of stable sorts, starting with the least significant sort column. I think this is the first approach we should implement, because it is simple, works very generally, and the query engine already has all of the basic building blocks we need to implement this.

To illustrate why this approach works, imagine we want to order by c1, c2:

c1
[B B B A A A]
c2
[2 4 0 5 4 3]
indices
[0 1 2 3 4 5]

indices after sort by c2
[2 0 5 1 4 3]

indices after stable sort by c1
[5 4 3 2 0 1]

c1 shuffled
[A A A B B B]

c2 shuffled
[3 4 5 0 2 4]

Reduce to sort by single column

Pack values we want to sort by into a single column new columns, and sort by that. There are already exist some functionality in the query engine to do just that (used for grouping) But it's not fully generalized yet (#9) and will also need to support inverting the sort order for values (taking bitwise complement should work fine for integers, but I don't know that this is easy to do for strings/unicode). I would expect this approach to yield better performance in most cases (because sorting is very expensive compared to packing values), but it might break down in some (reverse sorted unicode column, other custom sort orders, wide string columns, dictionary encoded string values where the dictionary encoding does not preserve sort order).

</Braindump>

Full support for null values

  • Figure out representation for nullable columns
  • Option to treat empty entries in CSV as null
  • Select columns with null values
  • comparison operators
  • Arithmetic expressions/other functions
  • count/sum/...
  • null strings
  • Grouping by columns containing null values
  • Sorting by columns containing null values
  • Missing columns should be treated as null

`select total_amount/100, count(0) from trips limit 5000;` panics

Running select total_amount/100, count(0) from trips limit 5000; on taxi data set panics:

locustdb> select total_amount/100, count(0) from trips limit 5000;
thread 'thread '<unnamed><unnamed>' panicked at 'thread 'thread '' panicked at 'thread 'thread 'thread 'index out of bounds: the len is 527 but the index is 18446744073709551612thread '<unnamed>thread '<unnamed>index out of bounds: the len is 401 but the index is 18446744073709551586<unnamed><unnamed><unnamed>', <unnamed>' panicked at '<unnamed>' panicked at '', ' panicked at '' panicked at '' panicked at 'libcore/slice/mod.rs' panicked at 'index out of bounds: the len is 1648 but the index is 18446744073709551611' panicked at 'index out of bounds: the len is 500 but the index is 18446744073709551613libcore/slice/mod.rsindex out of bounds: the len is 578 but the index is 18446744073709551613index out of bounds: the len is 451 but the index is 18446744073709551611index out of bounds: the len is 411 but the index is 18446744073709551609:index out of bounds: the len is 501 but the index is 18446744073709551613', index out of bounds: the len is 253 but the index is 18446744073709551611', :', ', 2052', ', libcore/slice/mod.rs', libcore/slice/mod.rs2052libcore/slice/mod.rslibcore/slice/mod.rs:libcore/slice/mod.rslibcore/slice/mod.rs:::::libcore/slice/mod.rs14::205214205220522052:
20522052:
:::2052:note: Run with `RUST_BACKTRACE=1` for a backtrace.
:14141414:1414



14


thread '<unnamed>' panicked at 'index out of bounds: the len is 337 but the index is 18446744073709551613', libcore/slice/mod.rs:2052:14
thread '<unnamed>' panicked at 'index out of bounds: the len is 2048 but the index is 18446744073709551610', libcore/slice/mod.rs:2052:14
thread '<unnamed>' panicked at 'index out of bounds: the len is 500 but the index is 18446744073709551613', libcore/slice/mod.rs:2052:14```

Repl flag to specify schema

Anyway to specify datatypes/transformations when loading data from csv.
Might look like something like this:

locustdb --load data.csv --schema 'version:string,first_name:string,ts:date,amount:int,middle_name:string|null'

Feature may not be used on the stable release channel

I'm seeing the following issue on the current master branch:

...
   Compiling chrono v0.4.0
   Compiling failure v0.1.1
   Compiling serde_derive_internals v0.21.0
   Compiling serde_derive v1.0.33
   Compiling locustdb v0.1.0 (file:///home/mark/LocustDB)
error[E0554]: #![feature] may not be used on the stable release channel
 --> src/lib.rs:1:1
  |
1 | #![feature(fn_traits, integer_atomics, refcell_replace_swap, specialization, trait_alias, core_intrinsics, box_patterns)]
  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

error: Could not compile `locustdb`.

To learn more, run the command again with --verbose.

These are the commands I ran on Ubuntu 16 before reaching this issue.

$ sudo apt install \
    cargo \
    curl \
    git \
    make
$ curl -sSf https://sh.rustup.rs \
    | RUSTUP_INIT_SKIP_PATH_CHECK=yes sh

$ git clone https://github.com/cswinter/LocustDB.git
$ cd LocustDB/

$ RUSTFLAGS="-Ccodegen-units=1" \
    CARGO_INCREMENTAL=0 \
    cargo run \
        --release \
        --bin repl \
        -- \
        test_data/nyc-taxi.csv.gz

Perform merging of select queries by constructing and executing query plan

There is one remaining place during query execution which is performed "manually" rather than by constructing and executing a query plan:

// TODO(clemens): make this work for differently aliased columns (need to send through query planner)

Mostly this just requires moving the Data::append_all function into a VectorOperator and corresponding QueryPlan variant and then constructing a query plan in the same fashion as all the other cases for merging batches.

+nightly subcommand not working?

my rustc is 1.26.0, cargo version is 0.26.0

my rustc already uses nightly (I temporarily set default to nightly to play with this package), but I still can't seem to execute cargo test with the +nightly flag. Am I missing something?

Thanks

Add support for persistence and running queries on data stored on disk

Braindump

This is less difficult than it sounds, I think we can leverage RocksDB to most of the heavy lifting.

This work can be split up into a number of incremental and independent steps. I think there will be three big components, RocksDB schema design, interactions with the query engine, and serialization.

Disk Subsystem

I'm pretty sure we can just make RocksDB work for us, I don't think we will (ever) gain in any notable performance improvements by rolling our own persistent database. LocustDB places very modest demands on the database layer: there is no support for updates, and all reads load fairly large values (megabytes) which should make it fairly easy to design a schema that will give us minimal read amplification.

At least for now, I think we can keep the metadata in memory and then load the data for individual columns on demand as required by running queries. The meta data consist of information of what tables exist, and for each table a list of partition meta data which lists the names all the columns in that partition as well as some summary information on the values contained in each column (right now just min, max).

So the interface for the disk subsystem can be fairly simple, and I think it maps relatively straightforwardly to a RocksDB schema:

fn load_column(table_name: &str, partition_id: usize, column_name: &str) -> ColumnData;
// Called after restart to restore all metadata
fn load_metadata() -> Vec<Metadata>;
// persist new data to disk
fn store_column(data: ColumnData, metadata: MetaData);

We probably want to have two separate column families for storing metadata and columns to make the initial LoadMetadata() fast, and we can use RocksDB transactions to keep them in sync.

At some point we will want to add a compaction mechanism that merges several partitions into one. This can be implemented by computing the merged partition and then running an atomic transaction that inserts the new partitions and deletes the old ones.

Efficiently supporting very high ingestion rates might require some additional cleverness and tuning of various RocksDB parameters, but I think even a simple design will get us very far.

Once we are storing more data on disk then we have memory, we need the ability to evict data from memory. I think this should be relatively straightforward, but still requires figuring out some questions:

  • How to accurately estimate memory usage/free memory.
  • With what part of the code lies the responsibility of managing loading/eviction of memory cached data and how exactly does that tie in with the query engine, scheduler and disk subsystem.

Scheduling

At the moment the query scheduling in LocustDB is very simple. At start-up, we spawn one worker thread for each core. All worker threads wait on a single task queue. When they pick up a new task, they will run it until completion.

Once we have queries that actually read from disk, we will want to have the ability to free up threads to work on different queries while data is loaded from disk (but the original implementation can just block). I'm not super clear on how all of this would work, we can probably leverage the new async/futures stuff. How can we fit it in with the existing query engine and task scheduling? Maybe we just need to completely redesigned that part to work well with Futures.

Serialization

The initial implementation might just convert everything to a single common data format, and then perform the same processing/compression that is done during ingestion. But that leaves a lot of performance on the table, if we can persist the compressed data to disk in the internal format we will gain large improvements in cold query speed, disk usage, and restart time.

The interface to RocksDB is just bytes, and we should require all of our in memory compression schemes to be converted to/restored from that format with minimal overhead. The serialized column will probably look something like this:

Version                       # Allows us to make backwards compatible changes to the format
Compression algorithm         # Tells us how the data can be decoded
One or more data sections     # The actual data

We can probably use some serialization library to actually figure out the layout (which serialization library though).

I think this is relative straightforward in theory, but will still be a bit tricky to get right. We will need some kind of framework that ensures that all data is routed to right compression algorithm and makes it easy to add new compression algorithms or modify existing ones all while maintaining backwards compatibility.

Query engine performs redundant computations

E.g. column "trip_id" is lz4 decoded twice:

locustdb> :explain SELECT trip_id FROM trips WHERE trip_id = 1337;

Query plan in 1465 batches
-- Stage 0 --
column_0     = "trip_id".0                                   ReadColumnData

-- Stage 1 --
column_5     = "trip_id".0                                   ReadColumnData

-- Stage 2 (streaming) --
decoded_1    = lz4_decode(column_0)                          LZ4Decode<u32>
constant_2   = 1337                                          Constant
encoded_3    = encode(constant_2; Codec { ops: [ToI64(U32)], column_name: "trip_id", encoding_type: U32, decoded_type: Integer, is_summation_preserving: true, is_order_preserving: true, is_positive_integer: true, is_fixed_width: true }) EncodeIntConstant
equals_4     = decoded_1 == encoded_3                        VecConstBoolOperator<u32, i64, EqualsInt<u32>>
decoded_6    = lz4_decode(column_5)                          LZ4Decode<u32>
filtered_7   = decoded_6[equals_4]                           Filter<u32>
casted_8     = filtered_7 as I64                             TypeConversionOperator<u32, i64>

Need some kind of common sub expression elimination pass, probably by computing signatures for query plans passed to QueryPlan::prepare and returning previous result if the query plan is identical to a previous one.

Grouping by columns with large negative values may return incorrect results

The current group by implementation may create rows of byte slices of the group by expressions for grouping. If any of these contain negative integers, this will throw off the sort order (because interpreted as bytes, negative integers are larger than positive integers). Consistent sort order of the group by columns is required for cross partition merging to work correctly.

Support grouping by many large columns and string columns

Currently LocustDB requires that all columns in a group by can be bit packed into a single 64bit integer. This means queries with a lot of/high cardinality columns in the group by are not currently supported. Grouping by string columns that are not dictionary encoded is also not supported.

To support this, we will need a new datatype in the query engine that supports arbitrary length bytes slices. We could reuse the existing Vec and then pass around the length of the bite slices as a separate scalar value, but creating a new datatype that puts those two together might be better.

Once we have this datatype, we will need a new operator that takes a column and writes the values into this byte vector (at some stride, offset), and use it instead of the bit packing operator when generating query plans for these kind of columns. Overall this will be very similar to how the current grouping works, but writing values into byte slices rather than bitpacking into a single integer.

One additional detail is that we probably want to change the hashing function used for HashMap groupings to use e.g. SeaHash instead of FNV (the current default) when grouping by long bytes slices to get performance.

Completing #8 would be useful before starting work on this.

naming tables

I could be misunderstanding the help/documentation: is it possible to load multiple csvs that all representing different tables, and then name those tables individually? --table seemed to only be allowed once, to rename the default table. Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.