Giter Club home page Giter Club logo

delta-kernel-rs's Introduction

delta-kernel-rs

Delta-kernel-rs is an experimental Delta implementation focused on interoperability with a wide range of query engines. It currently only supports reads.

The Delta Kernel project is a Rust and C library for building Delta connectors that can read (and soon, write) Delta tables without needing to understand the Delta protocol details. This is the Rust/C equivalent of Java Delta Kernel.

Crates

Delta-kernel-rs is split into a few different crates:

  • kernel: The actual core kernel crate
  • acceptance: Acceptance tests that validate correctness via the Delta Acceptance Tests
  • derive-macros: A crate for our derive-macros to live in
  • ffi: Functionallity that enables delta-kernel-rs to be used from C or C++ See the ffi directory for more information.

Building

By default we build only the kernel and acceptance crates, which will also build derive-macros as a dependency.

To get started, install Rust via rustup, clone the repository, and then run:

cargo test

This will build the kernel, run all unit tests, fetch the Delta Acceptance Tests data and run the acceptance tests against it.

As it is a library, in general you will want to depend on delta-kernel-rs by adding it as a dependency to your Cargo.toml. For example:

delta_kernel = "0.0.1"

Documentation

Examples

There are some example programs showing how delta-kernel-rs can be used to interact with delta tables. They live in the kernel/examples directory.

Development

delta-kernel-rs is still under heavy development but follows conventions adopted by most Rust projects.

Concepts

There are a few key concepts that will help in understanding kernel:

  1. The Engine trait encapsulates all the functionality and engine or connector needs to provide to the Delta Kernel in order to read the Delta table.
  2. The DefaultEngine is our default implementation of the the above trait. It lives in engine/default, and provides a reference implementation for all Engine functionality. DefaultEngine uses arrow as its in-memory data format.
  3. A Scan is the entrypoint for reading data from a table.

Design Principles

Some design principles which should be considered:

  • async should live only in the Engine implementation. The core kernel does not use async at all. We do not wish to impose the need for an entire async runtime on an engine or connector. The DefaultEngine does use async quite heavily. It doesn't depend on a particular runtime however, and implementations could provide an "executor" based on tokio, smol, async-std, or whatever might be needed. Currently only a tokio based executor is provided.
  • Minimal Table API. The kernel intentionally exposes the concept of immutable versions of tables through the snapshot API. This encourages users to think about the Delta table state more accurately.
  • Prefer builder style APIs over object oriented ones.
  • "Simple" set of default-features enabled to provide the basic functionality with the least necessary amount of dependencies possible. Putting more complex optimizations or APIs behind feature flags
  • API conventions to make it clear which operations involve I/O, e.g. fetch or retrieve type verbiage in method signatures.

Tips

  • When developing, rust-analyzer is your friend. rustup component add rust-analyzer
  • If using emacs, both eglot and lsp-mode provide excellent integration with rust-analyzer. rustic is a nice mode as well.
  • When also developing in vscode its sometimes convenient to configure rust-analyzer in .vscode/settings.json.
{
  "editor.formatOnSave": true,
  "rust-analyzer.cargo.features": ["default-engine", "acceptance"]
}
  • The crate's documentation can be easily reviewed with: cargo docs --open

delta-kernel-rs's People

Contributors

nicklan avatar roeap avatar ryan-johnson-databricks avatar zachschuermann avatar rtyler avatar wjones127 avatar hntd187 avatar scovich avatar scarman-db avatar nkarpov avatar abrassel avatar blajda avatar dennyglee avatar

Stargazers

Beni avatar Gaurav Kumar avatar Matthias De Vriendt avatar Thomas Frederik Hoeck avatar  avatar  avatar Matthew Powers avatar  avatar  avatar  avatar Mimoune avatar  avatar yi wang avatar Shingo OKAWA avatar Leo Gomes avatar wvwwvwwv avatar  avatar  avatar Zac Davies avatar Adrian Ehrsam avatar Tom van Bussel avatar Ion Koutsouris avatar  avatar Tim Dikland avatar  avatar

Watchers

 avatar QP Hou avatar  avatar  avatar  avatar Florian Valeye avatar Tim Dikland avatar  avatar

delta-kernel-rs's Issues

Checkpoint actions don't need to be tracked in the log replay hashmap

Currently, we don't distinguish between actions that came from deltas vs. checkpoint, which bloats the hash table used to track and deduplicate versions in file_stream.rs.

We need the hash table to track actions from deltas, that we've already seen and which may have older versions in older deltas and/or checkpoint... but checkpoint actions are already "oldest" and so we don't need to track them.

Updates for the actions module

  • move to parsing via whatever we define in #63
  • better parsing for deletion vectors
  • types.rs -> actions.rs (a bit confusing as we'll have actions::actions)

Handle nested columns

We need to handle nested columns in various locations, data skipping being an obvious one.

We should come up with a robust and simple design that is capable of handling column mapping and other special scenarios as well.

FFI is not prod-safe until `c_unwind` RFC stabilizes and we can use it

For FFI to really be prod-ready, we need the rust c_unwind RFC to stabilize (see rust-lang/rust#115285 and rust-lang/rust#116088), and then we must update FFI code to take advantage of it. Otherwise, exceptions thrown from C++ code, or panic! originating in rust code, have undefined behavior when unwinding across an FFI boundary.

Context

C++ and Rust take completely different approaches to exception handling.

Rust generally prefers methods to return an error Result when something expected and recoverable goes wrong, and to panic! if the problem is unexpected or unrecoverable. In turn, panic! may be implemented by stack unwinding, if the runtime is configured with panic=unwind.

Meanwhile, C++ relies heavily on exceptions, and on stack unwinding to run destructors as uncaught exceptions propagate. Exceptions can be thrown from just about anywhere (including the standard library) unless extreme care is taken to avoid them.

Because kernel-rs relies heavily on callbacks, and often on double-trampoline callbacks, we must expect cases where an exception thrown from C++ might propagate uncaught through rust code and back into C++ where it is caught and handled. Or for a rust panic to propagate uncaught through C++ code and back to rust code protected by a catch_unwind block.

Unfortunately, any kind of stack unwinding across an FFI boundary (whether C++ -> Rust or Rust -> C++) is undefined behavior today in Rust. The c_unwind RFC proposes to address this by allowing exceptions thrown by extern "c-unwind" fn to safely propagate -- uncaught -- through rust code and back into C++, and for panics in extern "rust-unwind" fn to safely propagate -- also uncaught -- through C++ code and back into rust. Attempting to catch a foreign exception would still produce undefined behavior, but we don't need that capability.

Delta Log support

Make sure we can parse full delta log into Arrow

  • Support partition values
  • Support reading statistics
  • Handle parsing errors gracefully
  • Support collecting add actions efficiently
  • Support collecting table-level metadata efficiently

Snapshot P&M is not optional

Snapshot::read_metadata currently returns Option<Metadata, Protocol>, but those are not optional.
A table whose replay lacks either or both is ill-formed and should always produce an error.

See Delta spec for Action reconciliation:

A given snapshot of a Delta table consists of:

  • A single protocol action
  • A single metaData action

P&M query should leverage predicate pushdown

We don't currently leverage predicate pushdown when retrieving protocol and metadata for a snapshot. The vast majority of rows in a log replay will be file actions, not be P nor M, so there's a big memory advantage to letting the engine scan filter by protocol IS NOT NULL OR metaData IS NOT NULL, so it can skip those unwanted rows.

Handle null columns and column selection for nested fields when reading parquet files

When asking the engine to read data from parquet, we potentially pass a superset of the fields available in the file, specifically also for nested fields. Example where this comes up is reading the protocol actions where there may or may not be a readerFeatures array.

The currently used parquet reader does not automatically add null arrays for missing fields, nor are able to push down column selection for nested struct fields.

A related question would be out expectation. Do we always want to get a null array back, or can the engine choose to just ignore nullable columns?

Public release goals

Goals:

  • Support reading delta tables
    • Including deletion vectors, column mapping
    • Support single threaded, multiple process, multiple threads cases
  • #46
  • We have easy instructions to pull C library as a dependency.
  • We have documentation for the C API
  • DuckDB prototype
  • Documentation for architecture vision

Low priority:

  • Expression API and file skipping support
  • A C++ library wrapping C FFI with read support as described above

See Initial Release Milestone for issues

Return to a pure Sync API

TDLR: return the API to be synchronous. Make the Rust default TableClient use async in it's implementation, by embedding a reference to a runtime/executor.

Why not async?

  • Async API brings complexity to the FFI layer
  • Async call stacks can be more difficult to debug
  • Offering both a sync and async API will be too much maintenance burden. In prototyping a parallel API, we found there was no way around writing duplicate code. See: #25

How can TableClient default impl still use async?

Async is most valuable at the IO level to bring IO concurrency, so our TableClient methods should be designed to allow for concurrent IO operations in an async runtime.

For example, consider the FileSystemClient.

pub trait FileSystemClient: Send + Sync {
/// List the paths in the same directory that are lexicographically greater or equal to
/// (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
async fn list_from(&self, path: &Url) -> DeltaResult<BoxStream<'_, DeltaResult<FileMeta>>>;
/// Read data specified by the start and end offset from the file.
async fn read_files(&self, files: Vec<FileSlice>) -> DeltaResult<Vec<Bytes>>;
}

The list_from method could be a synchronous iterator, but under the hood the implementation could use asynchronous calls to buffer the next page. Similarly, read_files could also be implemented with async calls to read files concurrently.

Implementing correctly

If we want to run async functions inside of sync helpers, we need to submit them to some sort of executor. Thus, a table client that wants to run async tasks needs to hold a reference to an executor itself.

The main danger to be avoided is calling tokio::Runtime::block_on() from within one of the helpers. Users might wrap the kernel high-level APIs in a tokio runtime, and tokio will panic if block_on() is called while inside an async runtime.

We may wish to make it so the user could configure the executor used. For example, they might want to pass down their own tokio runtime.

Extract expression evaluator into client

Right now we hard-code the expression evaluation in

pub(crate) fn construct_metadata_filters(
&self,
stats: RecordBatch,
) -> Result<BooleanArray, ArrowError> {
match self {
// col < value
Expression::LessThan(left, right) => {
match (left.as_ref(), right.as_ref()) {
(Expression::Column(name), Expression::Literal(l)) => {
// column_min < value
lt(
stats
.column_by_name("minValues")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.column_by_name(name)
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap(),
&PrimitiveArray::<Int32Type>::new_scalar(*l),
)
}
_ => todo!(),
}
}
_ => todo!(),
}
}

We do define an expression evaluator, so we want to move it to point to that.

pub trait ExpressionEvaluator {
/// Evaluate the expression on given ColumnarBatch data.
///
/// Contains one value for each row of the input.
/// The data type of the output is same as the type output of the expression this evaluator is using.
fn evaluate(&self, batch: &RecordBatch) -> DeltaResult<RecordBatch>;
}

Then the default client needs to implement the evaluator. Ideally, we should implement for all expressions we support representing.

Settle ColumnBatch API and finish PR #41

Main things that need to happen:

  • move to Results rather than panic
  • fill in remaining types
  • move our action parsing to use this
  • finish the pr
  • write an implementation with arrow under the hood

Still quite a lot of open discussion to be had here about exactly what the API should look like. This issue can serve as a place to centralize those discussions.

Explore providing a ColumnarBatch like trait

This issue is to capture the work of determining how feasible it would be to remove our arrow::RecordBatch dependency and replace it with something akin to the ColumnarBatch interface implemented in the JVM kernel implementation

The goal for this ticket is to provide an interface which would allow default-client to rely on an arrow::RecordBatch implementation of ColumnarBatch that makes sense in some form.

Add logging or tracing

@nicklan proposed adding logging via the log crate.

It seems like a good idea to add some sort of observability capability.

Based on my experience in Rust, it's far more useful to have tracing. This crate has logging capabilities, but also the ability to emit spans. The tracing crate does a good job of this.

Dependency / performance impact

It was asked in the meeting what the performance impact would be. The answer is that adding these dependencies on its own does not change the output binary. Libraries are expected to use the macros to add logs and spans, but not instantiate any subscriber to them. Without a subscriber these logs and spans are no-ops at compile time, and will be optimized away. However, if an application using kernel registers a subscriber, they will generate code to record the events.

Make kernel implementation generic over ColumnarBatch

Ideally, these should mono-morphize over ColumnarBatch, since there should only be one implementation per connector.

Once complete, arrow crates should be optional dependencies under a feature flag. That feature flag may be required for the default client.

Validate type coercion rules in default client kernels.

We need to make sure, that type coercion is handled as we expect, Specifically in our default client implementation.

Haven't validated it yet, but IIRC it will error for any different types - e.g. also f32 vs f64. We may then want to wrap our desired behaviour around it using arrows cast kernel.

make Scan::execute return an iterator

Main things that should happen:

  1. scan should return a lazy iterator
  2. Internally in the file_replay_iter we should represent each "item" as EngineData + a selection vector (for which rows to process)
  3. Only when getting an item out of the scan should we inspect the EngineData to get the needed items out of selected rows (i.e. the add file paths and deletion vectors), read the underlying data, and return the data plus dv mask.

Also relevant:
#109 (comment)

List of connectors to target

  • Polars
  • DuckDB (included in #42 and #8)
  • Pandas
  • delta-rs (blocked on write support) -> python deltalake
  • datafusion
  • one of the go implementations?
  • Clickhouse
  • Pytorch
  • influx

Support adding back in partitionColumns

We need to add the partition column with the partition value specified in the Add file when we query a partitioned table. These columns are omitted in the actual data files

Kernel should allow engine to provide its own data skipping predicate implementation

Today, data skipping is extracted by kernel from the predicate engine provided. But kernel's data skipping implementation is intentionally basic and engine may conceivably want to support more (and more sophisticated) expression types.

I don't think it would be possible to hook that in with today's code structure, and we should explore the possibility to allow it.

Must apply log replay dedup before file skipping

Today's kernel applies data skipping predicates before deduplicating adds and removes. This is appealing from a memory efficiency perspective (smaller hash table), but it poses two correctness hazards:

First hazard

File stats are optional, and there's no guarantee file skipping will work for all file actions that refer to a given file. For example, the following sequence of commits would produce an incorrect result that includes file F:

  • v0 - add F without stats (Delta spec says stats are optional)
  • v1 - re-add F, this time with stats (nothing in Delta spec forbids such duplicate add actions)
  • v2 - remove F, with stats

Because log replay goes in reverse order and eagerly applies skipping predicates, v2 and v1 would both be filtered out and never make it to the hash table; v0 would then survive data skipping because it has no stats to filter it out, and was not previously "seen" according to the hash table.

The Delta spec for Action Reconciliation says:

File actions (add or remove) reference logical files, and a log can contain any number of references to a single file.

and the spec for Add File and Remove File says (emphasis mine):

The dataChange flag on either an add or a remove can be set to false to indicate that an action when combined with other actions in the same atomic version only rearranges existing data or adds new statistics [to an existing file].

Second hazard

A table replace operation could produce a table whose older files have a stats schema which is completely incompatible with the current schema. Such files are guaranteed not to survive log replay, because all existing files must be dropped before making an incompatible schema changes, but they would still break the json parser during log replay, i.e. if we asked for a json numeric but the old stats contained a json object instead. We could work around this situation by configuring the parser to silently produce null (= no stats) on error, but then we might fail to detect legitimately invalid stats. Maybe that's ok tho -- missing stats just means poor(er) file skipping, which is likely preferable to a query error.

Note: The second situation cannot arise when column mapping is enabled, because the new columns are guaranteed to have different physical names than the old ones. With no overlap between old stats schema and new read schema, the old/bad stats would be invisible and harmless.

File skipping should not read removes from checkpoint

A checkpoint read for file skipping doesn't need to return any Remove actions -- those tombstones exist only for benefit of VACUUM.

Optimizing this would require the ability to pass separate schemas for commit .json vs. checkpoint .parquet... with the latter "padding" the removes column with a literal null column for schema compatibility.

Alternatively, we could leave the schema alone, but pass different predicate pushdowns (add IS NOT NULL OR remove IS NOT NULL for commit .json vs. just add IS NOT NULL for checkpoint .parquet)

cache tables, snapshots, and files

Lifted from a comment in the code:

We should cache known tables, snapshots, and files, so repeated reads can reuse them instead of going back to cloud storage each time.

Sort out formatting issues with errors

We have a lot of locations where we "double format". This is largely because we want more specific information than the errors allow for. We should either:

a.) Not provide that specific information or
b.) Define new errors that allow us to provide it without double formatting (preferred)

The json and parquet handler API should include kernel-specific state

Today, the file handler API only passes the file name to engine, and produces an iterator of data batches. Because each file will likely produce multiple data batches (exact details depending on the engine implementation), kernel needs a way to associate or map each batch back to the file that produced it in order to e.g. apply deletion vectors.

Most likely, this will mean kernel passes some opaque per-file state to the engine, beyond the FileMeta we pass today. Engine would be expected to maintain the association and the resulting iterator should return (opaque state, data batch) pairs.

At first, this could be a simple integral value (some kind of row index into the corresponding metadata batch) because the engine is not allowed to reorder files. But in a distributed execution setting, the engine may do the metadata reads on a coordinator node, while processing the actual files in a completely different kernel instance running on a worker node. In that case, we would need a reliable serde story for the kernel's opaque state so that worker nodes don't have to redo log replay (which would be too expensive even if we could guarantee perfect repeatability). The JVM kernel solves this by defining a Row concept which follows the kernel schema, and which the engine is free to serialize however it wants (if it even wants to). Opacity is preserved because the engine only receives a generic Row whose schema is only provided at runtime.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.