Comments (10)
... and not declare a ConvertedType::None that IMO should be Option)
I agree with this, that's why i made LogicalType
to be Option<LogicalType>
, and avoid creating a different struct to what's on the format.
The main place where the basics.rs
new-types were useful was with the Display
impl. I didn't have a sense of this until I was able to implement the logicaltype parsing work. Now that I understand the codebase more, I think we could replace the new-types with parquet-format
going forward.
We have a similar performance issue with the writer, because it was either we create a writer using the existing low-level constructs like write_batch()
, or to bypass that but take much longer in the process.
I like your thinking around the reader, and agree that we can remove some of the allocation steps in the process.
I've created https://issues.apache.org/jira/browse/ARROW-12121 so I can better track the changes that I make to the writer going forward.
from arrow2.
Heads up: I was able to re-write all decoding functionality unsafe
-free and without jeopardizing performance.
I was able to correctly read both a spark 3 and pyarrow3 -generated snappy-compressed, dictionary-encoded parquet with nulls into native rust (e.g. Vec<Option<i32>>
) using https://github.com/jorgecarleitao/parquet2.
The design is quite simple:
There is a page iterator that returns compressed, encoded pages, and there is a function that decompresses, decodes, and de-serializes them into an in-memory format (page_to_array
). The page iterator is IO bounded as there are no large CPU-intensive ops on it, and page_to_array
is CPU-intensive. This enable us to decouple these two parts to maximize throughput. The whole code has about 5k LOC atm, and it only depends on a bitpacking
to encode and decode bitpacks, and the usual compression suspects.
I also understood the RLE-bitpacking hybrid encoder. I wrote a PR to parquet-format with a more detailed explanation of them.
I am now writing the logic to map parquet's representation directly into arrow2, which is fairly simple for non-nested types.
from arrow2.
I have no idea how to handle rep and def levels. @nevi-me , can we assume that given a Vec and some metadata, we can reconstruct any arrow type into a RecordBatch? I.e. are the rep and def levels "independent" between row groups? Or do the rep and def levels must be read across the whole column to make sense?
Hey @jorgecarleitao, I missed this comment and question.
I've been thinking about this in the context of a writer, in how we could avoid materialising a whole Recordbatch
into Vec<T>
before writing to parquet.
Definitions are mostly confined to a single row. With primitives, it's super easy (as there are no reps). With lists, we could probably find a way of doing this efficiently. I'll illustrate below.
If we have:
- Arrow batch of 65'536 records (because we deem it a good unit of work for a processing engine)
- That we want to read as 4096 records per page (because we don't want to create very large pages),
- That contains an int32 (
a
), sring (b
), struct of int32 (c._1
), and a list of int32 (d
)
We have:
a
b
c._1
d
when we write a page, we only need 4096 values from the batch, so we could benefit from slicing each array (this is where the nested slicing was an issue).
From these 4096 values, if we are interested in writing say row 10, we'd do the following:
row n, where n has no relationship with n-1
a. : definition will be 1 or 0 depending on nullability rules, unit is Option<i32>
b. : definition will be 1 or 0 again, a unit is Option<Vec<u8>> or Option<Vec<&str>>
c._1 : definition will be 0, 1 or 2 depending on nullability rules, unit is Option<i32>
d. : def and rep values may be the child value length, or 1 value, depending on nullability
With column d
, if we have a Option<vec![Some(1), None, Some(3)]>
, if we have arrow::Field("d", List<Box<Field("item", Int32, true)>>, true)
as the schema, there's 2 levels that can be null, so the definition would be [2, 1, 2]
.
If the list slot is null, slot: Option<Vec<i32>> = None
, then the list's definition will be [0]
.
If the list has a slot, but that slot has 0 values Some(vec![])
, the list's def will be [1]
. This looks similar to Option<vec![Some(1), None, Some(3)]>
from the fist example, but the repetition is what will tell us that "there's no value for this 1 slot".
This potentially leaves us with:
definitions:
a. : [u16; 1]
b. : [u16; 1]
c._1 : [u16; 1]
d. : [u16; n], where n is a function of list parent nullability (1) and list child length (n)
So, from the above example, I can make the following observations:
- If an array is a non-nested type (primitive, binary), we could reasonably go from
Iter<T>
andIter<Option<T>>
to a Parquet column, reading enough values to fill our 4096 values (without worrying about slicing?) - If an array is a struct that only ends up having a primitive or binary (e.g.
struct<struct<struct<...<int32>>>
), we could reuse the above to an extent, only needing to know what level of the struct soup becomes null. Recall the logical nullness issue we had a few months ago, where a null struct meant that its children automatically become null, even if they have physical values? - If an array contains a list, we can still construct definitions just from looking at the list's value hierarachy for 1 row.
- Repetitions only become relevant if we encounter a list
We can deal with repetition separately, but I'll briefly say that a unit of computing a repetition, is the upper-most parent on the tree structure that is a list, i.e. d
in our example above.
If I have a <struct<list<struct<list<list<int32>>>>>
, I'll only need repetition once I encounter the first list. The other 2 lists' repetition values will subject to the first list.
So my final observation here is that if I have a non-null fixedlist hierarchy (using the above), and:
- the first list has 3 values for row
n
- the second has 4 values
- the third has 2 values
The number of values in the fixedlist's definition and repetition will be 3 * 4 * 2 = 24, and I'll need [T; 24] values for just that row n
from arrow2.
I was able to correctly read both a spark 3 and pyarrow3 -generated snappy-compressed, dictionary-encoded parquet with nulls into native rust (e.g. Vec<Option>) using https://github.com/jorgecarleitao/parquet2.
This is fantastic. From my comment above, Vec<Option<T>>
is good enough for us to reconstruct primitives.
I think if you have Iter<i16>
for definitions, Option<Iter<i16>> = None
for repetition, and Iter<T>
for values, we should be able to reconstruct arbitrary structs. If you point me in the right direction, I'd love to help out.
Lists are a separate beast best handled carefully 😀
from arrow2.
FYI, I am working on this. However, given the requirements that I outlined for this project (notably the safety and performance part), this may require a significant work on the parquet side of things.
from arrow2.
How much from the parquet crate would you be able to reuse?
from arrow2.
I went through all metadata and schema and looks great. I would only do minor changes to it (mostly around using parquet-format
structs instead of re-declaring our owns, and not declare a ConvertedType::None
that IMO should be Option<ConvertedType>
). Tests are also great. Main challenges atm are around
BitWriter
- encoding
ByteBufferPtr
DataType
They seem to be quite tightly coupled, which requires an "all or nothing" :/
Some ideas so far:
- refactor the encoders so that they can rely on an arbitrary buffer (instead of only
Vec<u8>
), so that we can write the encoding directly to the buffers, thereby skipping a bunch of steps e.g. in booleans. - use a third-party for encoding/decoding, so that we do not take that maintenance hit (AFAI understand encoding is pretty generic), or create a third-party dependency specifically for this, so others can rely on these encoders.
- replace
ChunkReader
,Length
and others by simplyRead + Seek
(I have successfully done this and can read footers) - replace
dyn
by generics whenever possible, specially onbasics.rs
- Replace
ColumnReaderImpl<DataType>
by physicaltype-specific implementations, as different types have different constraints:- booleans are bitmaps and have their own logic
- int32, int64, int96, float, double have a compile-time bit-width and thus most serialization can be done on the stack
- ByteArray and Fixed have a variable bit-width and will continue to be handled on the heap
- Make the code less "stateful", so that we can more easily parallelize it.
I am also evaluating the allocation path: it seems to me that we need up to 4 allocations:
- when we read the buffer from the file (to a
Vec
) - when we decompress (to a
Vec
) - when we decode (to a
Vec
) - when we convert to arrow buffers
We could try to at least remove one of these by decoding directly to arrow buffer, at least when there are no rep or def levels.
Your input in all of this would be highly appreciated, as always!
from arrow2.
Ongoing experimental work is now available here: https://github.com/jorgecarleitao/parquet2
This handles the parallelism problem, on which we just delegate the behavior to downstream dependencies (i.e. that crates' responsibility is to be a library to read parquet, and specific implementations decide how to actually read it). This allows e.g. DataFusion, Ballista, Polars to decide how they want to parallelize. The unit of work of this crate is a page, which is the smallest unit of work of parquet (AFAI understood).
There are two missing pieces of the puzzle now:
- decoding (RLE, etc.)
- deserialization (to arrow, to
Vec
, etc.)
I will start working on deserialization
there. The direction is to have this crate optionally depend on parquet2 and implement deserialization logic to convert an iterator of "Pages" into a RecordBatch
(AFAI understand the logical equivalent of a row group).
There is also minor to be done on the Statistics, since statistics are type-dependent (min and max require interpreting bytes).
I have no idea how to handle rep and def levels. @nevi-me , can we assume that given a Vec<Page>
and some metadata, we can reconstruct any arrow type into a RecordBatch
? I.e. are the rep and def levels "independent" between row groups? Or do the rep and def levels must be read across the whole column to make sense?
I am thinking in how a ListArray is reconstructed from a parquet group (generally speaking).
from arrow2.
Wow, thank you so much @nevi-me for that detailed explanation and for the offer!
I think that you are in the right track. There are some optimizations that I found while going trough the reader which may enhance your thoughts about writing.
The design I used for reading the rep and def levels is the following: instead of decoding them to Vec<i16>
, we keep the encoded runs, and deal with them at the in-memory representation level. This idea came about for two reasons:
-
a bitpacked-encoded run of a
bit_width=1
has the same byte representation as Arrow bitmaps (LSB), and thus we can just copy the bytes directly to the bitmaps without converting each item toi16
and them setting bits on a bitmap based on thei16
. -
for RLE encoding, we can use things like
buffer.extend_trusted_len_iter(repeat(value).take(repetition))
, which does not require materializing the vector -
if optional repetition and RLE's
repetition = len
on a bitmap, we can set the whole validity toNone
without materializing the defs to aVec
.
For the more complex cases we can always materialize them like you said, but keeping this flexibility to read to arrow seems powerful.
The mechanism I designed for this is a small struct to store an individual RLE/bitpacked run, and have the hybrid decoder be an iterator over this struct.
The corresponding use of this struct at the consumers is here. Atm it materializes it because I have not written the rle_decode
as an iterator adapter for native but it is fairly simple to not materialize at that point.
IMO this design fits well into writing because the def levels can be wholy calculated by transversing the validity bitmap. I.e. I think that for writing non-nested types, RLE-bitpacking encoding is basically RleEncoder::new(array.validity())
that implements an iterator over bytes (so that an array can spam multiple pages), or io::Write
.
The blocker is that we do not have the encoder for hybrid RLE/bitpacking yet. It should not be difficult, though, as we can always start with a poor encoding (e.g. always use bitpacking), and gradually improve it. I was hoping to have apache/parquet-format#170 reviewed before just to make sure that the ideas hold, so that we can then supplement that with how to perform encoding (and push the corresponding implementation to parquet2).
from arrow2.
Just to complement my previous comment on "without jeopardizing performance", that I did not really gave evidence.
Without any parallelism on my VM: read a parquet with a single int64
column with N entries with nulls (density of about 40%) generated from pyarrow:
parquet+arrow (master):
read_decompressed_pages_100000
time: [1.4687 ms 1.4733 ms 1.4778 ms]
parquet2, dee9dcd8 + arrow2:
read_decompressed_pages_100000
time: [656.07 us 660.57 us 665.08 us]
so, things look fast, even without unsafe
.
from arrow2.
Related Issues (20)
- Error when timestamp casting for time unit millisecond or microsecond HOT 1
- does arrow2 support filter pushdown in parquet reader HOT 1
- Avro maps are unsupported
- Writing chunked dictionary arrays to IPC currently impossible due to difference in key maps? HOT 1
- Incorrect nullability inferred for nested parquet schema HOT 2
- Any plans to add an async flavor for json/ ndjson format?
- MutableDictionaryArray - another rewrite needed HOT 1
- arrow2 0.18.0 release broke against minimal dependencies in the Cargo.toml HOT 3
- `infer_records_schema` results in incorrect `Schema` when input json is in non-`Chunk` form
- Add Float16/Half-float logical type to Parquet
- Compressed IPC Crash in certain cases HOT 1
- Crash when loading avro file
- Specify compression per column instead of globally
- deserialize_schema looks not working
- Support for Utf8View in the Rust library HOT 1
- Upgrade odbc-api to stable 4.1.0
- [nightly] When compiling with `+nightly` one symbol is not found. HOT 1
- Tags / Commits for the 0.18.0 release HOT 1
- Append to existing ipc file results in ErrorLocation: InvalidOffset when reading new block
- arrow2 cannot read ipc files compressed by official's arrow crate
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from arrow2.