Giter Club home page Giter Club logo

Comments (10)

nevi-me avatar nevi-me commented on August 14, 2024 1

... and not declare a ConvertedType::None that IMO should be Option)

I agree with this, that's why i made LogicalType to be Option<LogicalType>, and avoid creating a different struct to what's on the format.

The main place where the basics.rs new-types were useful was with the Display impl. I didn't have a sense of this until I was able to implement the logicaltype parsing work. Now that I understand the codebase more, I think we could replace the new-types with parquet-format going forward.

We have a similar performance issue with the writer, because it was either we create a writer using the existing low-level constructs like write_batch(), or to bypass that but take much longer in the process.

I like your thinking around the reader, and agree that we can remove some of the allocation steps in the process.

I've created https://issues.apache.org/jira/browse/ARROW-12121 so I can better track the changes that I make to the writer going forward.

from arrow2.

jorgecarleitao avatar jorgecarleitao commented on August 14, 2024 1

Heads up: I was able to re-write all decoding functionality unsafe-free and without jeopardizing performance.

I was able to correctly read both a spark 3 and pyarrow3 -generated snappy-compressed, dictionary-encoded parquet with nulls into native rust (e.g. Vec<Option<i32>>) using https://github.com/jorgecarleitao/parquet2.

The design is quite simple:

There is a page iterator that returns compressed, encoded pages, and there is a function that decompresses, decodes, and de-serializes them into an in-memory format (page_to_array). The page iterator is IO bounded as there are no large CPU-intensive ops on it, and page_to_array is CPU-intensive. This enable us to decouple these two parts to maximize throughput. The whole code has about 5k LOC atm, and it only depends on a bitpacking to encode and decode bitpacks, and the usual compression suspects.

I also understood the RLE-bitpacking hybrid encoder. I wrote a PR to parquet-format with a more detailed explanation of them.

I am now writing the logic to map parquet's representation directly into arrow2, which is fairly simple for non-nested types.

from arrow2.

nevi-me avatar nevi-me commented on August 14, 2024 1

I have no idea how to handle rep and def levels. @nevi-me , can we assume that given a Vec and some metadata, we can reconstruct any arrow type into a RecordBatch? I.e. are the rep and def levels "independent" between row groups? Or do the rep and def levels must be read across the whole column to make sense?

Hey @jorgecarleitao, I missed this comment and question.

I've been thinking about this in the context of a writer, in how we could avoid materialising a whole Recordbatch into Vec<T> before writing to parquet.

Definitions are mostly confined to a single row. With primitives, it's super easy (as there are no reps). With lists, we could probably find a way of doing this efficiently. I'll illustrate below.

If we have:

  • Arrow batch of 65'536 records (because we deem it a good unit of work for a processing engine)
  • That we want to read as 4096 records per page (because we don't want to create very large pages),
  • That contains an int32 (a), sring (b), struct of int32 (c._1), and a list of int32 (d)

We have:

 a
 b
 c._1
 d

when we write a page, we only need 4096 values from the batch, so we could benefit from slicing each array (this is where the nested slicing was an issue).

From these 4096 values, if we are interested in writing say row 10, we'd do the following:

row n, where n has no relationship with n-1
 a.    : definition will be 1 or 0 depending on nullability rules, unit is Option<i32>
 b.    : definition will be 1 or 0 again, a unit is Option<Vec<u8>> or Option<Vec<&str>> 
 c._1 : definition will be 0, 1 or 2 depending on nullability rules, unit is Option<i32>
 d.   : def and rep values may be the child value length, or 1 value, depending on nullability

With column d, if we have a Option<vec![Some(1), None, Some(3)]>, if we have arrow::Field("d", List<Box<Field("item", Int32, true)>>, true) as the schema, there's 2 levels that can be null, so the definition would be [2, 1, 2].

If the list slot is null, slot: Option<Vec<i32>> = None, then the list's definition will be [0].

If the list has a slot, but that slot has 0 values Some(vec![]), the list's def will be [1]. This looks similar to Option<vec![Some(1), None, Some(3)]> from the fist example, but the repetition is what will tell us that "there's no value for this 1 slot".

This potentially leaves us with:

definitions:
 a.    : [u16; 1]
 b.    : [u16; 1]
 c._1 : [u16; 1]
 d.    : [u16; n], where n is a function of list parent nullability (1) and list child length (n)

So, from the above example, I can make the following observations:

  1. If an array is a non-nested type (primitive, binary), we could reasonably go from Iter<T> and Iter<Option<T>> to a Parquet column, reading enough values to fill our 4096 values (without worrying about slicing?)
  2. If an array is a struct that only ends up having a primitive or binary (e.g. struct<struct<struct<...<int32>>>), we could reuse the above to an extent, only needing to know what level of the struct soup becomes null. Recall the logical nullness issue we had a few months ago, where a null struct meant that its children automatically become null, even if they have physical values?
  3. If an array contains a list, we can still construct definitions just from looking at the list's value hierarachy for 1 row.
  4. Repetitions only become relevant if we encounter a list

We can deal with repetition separately, but I'll briefly say that a unit of computing a repetition, is the upper-most parent on the tree structure that is a list, i.e. d in our example above.

If I have a <struct<list<struct<list<list<int32>>>>>, I'll only need repetition once I encounter the first list. The other 2 lists' repetition values will subject to the first list.

So my final observation here is that if I have a non-null fixedlist hierarchy (using the above), and:

  • the first list has 3 values for row n
  • the second has 4 values
  • the third has 2 values

The number of values in the fixedlist's definition and repetition will be 3 * 4 * 2 = 24, and I'll need [T; 24] values for just that row n

from arrow2.

nevi-me avatar nevi-me commented on August 14, 2024 1

I was able to correctly read both a spark 3 and pyarrow3 -generated snappy-compressed, dictionary-encoded parquet with nulls into native rust (e.g. Vec<Option>) using https://github.com/jorgecarleitao/parquet2.

This is fantastic. From my comment above, Vec<Option<T>> is good enough for us to reconstruct primitives.

I think if you have Iter<i16> for definitions, Option<Iter<i16>> = None for repetition, and Iter<T> for values, we should be able to reconstruct arbitrary structs. If you point me in the right direction, I'd love to help out.

Lists are a separate beast best handled carefully 😀

from arrow2.

jorgecarleitao avatar jorgecarleitao commented on August 14, 2024

FYI, I am working on this. However, given the requirements that I outlined for this project (notably the safety and performance part), this may require a significant work on the parquet side of things.

from arrow2.

nevi-me avatar nevi-me commented on August 14, 2024

How much from the parquet crate would you be able to reuse?

from arrow2.

jorgecarleitao avatar jorgecarleitao commented on August 14, 2024

I went through all metadata and schema and looks great. I would only do minor changes to it (mostly around using parquet-format structs instead of re-declaring our owns, and not declare a ConvertedType::None that IMO should be Option<ConvertedType>). Tests are also great. Main challenges atm are around

  • BitWriter
  • encoding
  • ByteBufferPtr
  • DataType

They seem to be quite tightly coupled, which requires an "all or nothing" :/

Some ideas so far:

  • refactor the encoders so that they can rely on an arbitrary buffer (instead of only Vec<u8>), so that we can write the encoding directly to the buffers, thereby skipping a bunch of steps e.g. in booleans.
  • use a third-party for encoding/decoding, so that we do not take that maintenance hit (AFAI understand encoding is pretty generic), or create a third-party dependency specifically for this, so others can rely on these encoders.
  • replace ChunkReader, Length and others by simply Read + Seek (I have successfully done this and can read footers)
  • replace dyn by generics whenever possible, specially on basics.rs
  • Replace ColumnReaderImpl<DataType> by physicaltype-specific implementations, as different types have different constraints:
    • booleans are bitmaps and have their own logic
    • int32, int64, int96, float, double have a compile-time bit-width and thus most serialization can be done on the stack
    • ByteArray and Fixed have a variable bit-width and will continue to be handled on the heap
  • Make the code less "stateful", so that we can more easily parallelize it.

I am also evaluating the allocation path: it seems to me that we need up to 4 allocations:

  1. when we read the buffer from the file (to a Vec)
  2. when we decompress (to a Vec)
  3. when we decode (to a Vec)
  4. when we convert to arrow buffers

We could try to at least remove one of these by decoding directly to arrow buffer, at least when there are no rep or def levels.

Your input in all of this would be highly appreciated, as always!

from arrow2.

jorgecarleitao avatar jorgecarleitao commented on August 14, 2024

Ongoing experimental work is now available here: https://github.com/jorgecarleitao/parquet2

This handles the parallelism problem, on which we just delegate the behavior to downstream dependencies (i.e. that crates' responsibility is to be a library to read parquet, and specific implementations decide how to actually read it). This allows e.g. DataFusion, Ballista, Polars to decide how they want to parallelize. The unit of work of this crate is a page, which is the smallest unit of work of parquet (AFAI understood).

There are two missing pieces of the puzzle now:

  1. decoding (RLE, etc.)
  2. deserialization (to arrow, to Vec, etc.)

I will start working on deserialization there. The direction is to have this crate optionally depend on parquet2 and implement deserialization logic to convert an iterator of "Pages" into a RecordBatch (AFAI understand the logical equivalent of a row group).

There is also minor to be done on the Statistics, since statistics are type-dependent (min and max require interpreting bytes).

I have no idea how to handle rep and def levels. @nevi-me , can we assume that given a Vec<Page> and some metadata, we can reconstruct any arrow type into a RecordBatch? I.e. are the rep and def levels "independent" between row groups? Or do the rep and def levels must be read across the whole column to make sense?

I am thinking in how a ListArray is reconstructed from a parquet group (generally speaking).

from arrow2.

jorgecarleitao avatar jorgecarleitao commented on August 14, 2024

Wow, thank you so much @nevi-me for that detailed explanation and for the offer!

I think that you are in the right track. There are some optimizations that I found while going trough the reader which may enhance your thoughts about writing.

The design I used for reading the rep and def levels is the following: instead of decoding them to Vec<i16>, we keep the encoded runs, and deal with them at the in-memory representation level. This idea came about for two reasons:

  1. a bitpacked-encoded run of a bit_width=1 has the same byte representation as Arrow bitmaps (LSB), and thus we can just copy the bytes directly to the bitmaps without converting each item to i16 and them setting bits on a bitmap based on the i16.

  2. for RLE encoding, we can use things like buffer.extend_trusted_len_iter(repeat(value).take(repetition)), which does not require materializing the vector

  3. if optional repetition and RLE's repetition = len on a bitmap, we can set the whole validity to None without materializing the defs to a Vec.

For the more complex cases we can always materialize them like you said, but keeping this flexibility to read to arrow seems powerful.

The mechanism I designed for this is a small struct to store an individual RLE/bitpacked run, and have the hybrid decoder be an iterator over this struct.

The corresponding use of this struct at the consumers is here. Atm it materializes it because I have not written the rle_decode as an iterator adapter for native but it is fairly simple to not materialize at that point.

IMO this design fits well into writing because the def levels can be wholy calculated by transversing the validity bitmap. I.e. I think that for writing non-nested types, RLE-bitpacking encoding is basically RleEncoder::new(array.validity()) that implements an iterator over bytes (so that an array can spam multiple pages), or io::Write.

The blocker is that we do not have the encoder for hybrid RLE/bitpacking yet. It should not be difficult, though, as we can always start with a poor encoding (e.g. always use bitpacking), and gradually improve it. I was hoping to have apache/parquet-format#170 reviewed before just to make sure that the ideas hold, so that we can then supplement that with how to perform encoding (and push the corresponding implementation to parquet2).

from arrow2.

jorgecarleitao avatar jorgecarleitao commented on August 14, 2024

Just to complement my previous comment on "without jeopardizing performance", that I did not really gave evidence.

Without any parallelism on my VM: read a parquet with a single int64 column with N entries with nulls (density of about 40%) generated from pyarrow:

parquet+arrow (master):

read_decompressed_pages_100000                                                                             
                        time:   [1.4687 ms 1.4733 ms 1.4778 ms]

parquet2, dee9dcd8 + arrow2:

read_decompressed_pages_100000                                                                            
                        time:   [656.07 us 660.57 us 665.08 us]

so, things look fast, even without unsafe.

from arrow2.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.