Polars integration library for Dagster.
dagster-polars
integration has been merged into Dagster's monorepo. Please create dagster-polars
related issues there and tag @danielgafni
.
API documentation can be found here.
[Project moved] Polars integration for Dagster
License: Apache License 2.0
The current implementation is unnecessarily susceptible to breaking changes. Say a new version of Polars (or pyarrow.dataset
) removes/renames a given write_parquet
argument. Because the arguments are currently hard-coded in the PolarsParquetIOManager
, this would break user scripts if they upgrade to the newer version of Polars.
I think a good solution would be switching to a dictionary (called something along the lines of polars_io_args
-- I'm sure there's a better name) as follows:
@asset(
metadata={
"polars_io_args": {
"compression": "snappy"
}
}
)
def some_asset():
...
And the dump_df_to_path
and scan_df_from_path
method then forward these args to Polars, without hard-coding their names:
class PolarsParquetIOManager(BasePolarsUPathIOManager):
...
def dump_df_to_path(self, context: OutputContext, df: pl.DataFrame, path: UPath):
assert context.metadata is not None
io_args = context.metadata.get("polars_io_args", {})
with path.open("wb") as file:
df.write_parquet(
file,
**io_args
)
def scan_df_from_path(self, path: UPath, context: InputContext) -> pl.LazyFrame:
assert context.metadata is not None
io_args = context.metadata.get("polars_io_args", {})
io_args.setdefault("format", "parquet") # sets format to Parquet if not defined already
fs: Union[fsspec.AbstractFileSystem, None] = None
try:
fs = path._accessor._fs
except AttributeError:
pass
return pl.scan_pyarrow_dataset(
ds.dataset(
str(path),
filesystem=fs,
**io_args
),
allow_pyarrow_filter=context.metadata.get("allow_pyarrow_filter", True),
)
This
Finally, this might also enable (in the future) 'hybrid' IO managers which save the same asset in two ways, because you can separate each IO manager's args in different keys, even if the names of the children-args clash/overlap.
The only problem with the above solution is that I can't think of a "clean" solution that also allows you to pass allow_pyarrow_filer
values because io_args
is unpacked at a different place.
Hi there! I'm really liking this project.
As I was trying to read through your code, I noticed that there seems to be a lot of duplication for the metadata columns, where you've got the same functions in base.py
and utils.py
.
To make the code easier to read, it would be great if you removed these functions from base.py
and kept them in utils.py
only.
We are currently on dagster-polars==0.1.5
and we tried to upgrade to 2.2, but are getting the following failures on CI
polars.exceptions.ColumnNotFoundError: describe
FAILED
We are running in a x86 docker container.
Polars version:
polars==0.20.6
platform:
root@50efadb560a1:/src# uname -a
Linux 50efadb560a1 6.2.0-39-generic #40~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Nov 16 10:53:04 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
Here is the full relevant stack trace
polars.exceptions.ColumnNotFoundError: describe
Stack Trace:
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/api.py", line 764, in job_execution_iterator
for event in job_context.executor.execute(job_context, execution_plan):
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/executor/in_process.py", line 55, in execute
yield from iter(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/api.py", line 875, in __iter__
yield from self.iterator(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/executor/in_process.py", line 26, in inprocess_execution_iterator
yield from inner_plan_execution_iterator(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_plan.py", line 121, in inner_plan_execution_iterator
for step_event in check.generator(
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_plan.py", line 359, in dagster_event_sequence_for_step
raise dagster_user_error.user_exception
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 467, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 738, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/storage/upath_io_manager.py", line 448, in handle_output
custom_metadata = self.get_metadata(context=context, obj=obj)
File "/usr/local/lib/python3.10/dist-packages/dagster_polars/io_managers/base.py", line 317, in get_metadata
return get_polars_metadata(context, df) if df is not None else {"missing": MetadataValue.bool(True)}
File "/usr/local/lib/python3.10/dist-packages/dagster_polars/io_managers/utils.py", line 125, in get_polars_metadata
"stats": MetadataValue.json(get_polars_df_stats(df)),
File "/usr/local/lib/python3.10/dist-packages/dagster_polars/io_managers/utils.py", line 108, in get_polars_df_stats
return {
File "/usr/local/lib/python3.10/dist-packages/dagster_polars/io_managers/utils.py", line 109, in <dictcomp>
col: {stat: describe[col][i] for i, stat in enumerate(describe["describe"].to_list())}
File "/usr/local/lib/python3.10/dist-packages/polars/dataframe/frame.py", line 1731, in __getitem__
return self.get_column(item)
File "/usr/local/lib/python3.10/dist-packages/polars/dataframe/frame.py", line 7068, in get_column
return wrap_s(self._df.get_column(name))
The above exception occurred during handling of the following exception:
Seems like the problem lies here but looking at the blame, it hasn't changed for 7 months. Is this function invoked some where new? Do we have to have all of our df's implement a describe
col? If so, where is this documented?
Some parameters, like overwrite_schema
from deltalake
, should be set from the IOManager config, not only from the metadata.
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.Statistics.html
Currently dagster-polars
is calculating similar statistics manually.
The pyarrow
statistics might be useful, and they would always be consistent with the actual Parquet file, not the polars DataFrame
latest dagster version (1.6.5) now depends on pendulum 3.0, should bump pendulum version accordingly
Hi there!
I've found this integration very useful and hope it'll get much more steam in the future. As such, it would be great to have expanded docs that ease new users into the following:
load_from_path
method of the base IO manager does a collect()
if the input is a LazyFrame, but given that any upstream asset would have to be materialized to write to disk anyways, how would one receive a lazy frame as an input in the first place? This bit is rather ambiguous to me.
dump_df_to_path
function also only works with pl.DataFrame
, but there is presumably a use case for the user simply returning a LazyFrame
as the output of an asset. Perhaps it might be helpful to have a collect()
in the dump_df_to_path
(or the use of sink_parquet
) for those scenarios?I'm seeing this with dagster 1.5.5, pyarrow 13.0.0, dagster-polars 0.1.3. Could be a local problem, but in looking through versions, etc., there may be a version difference between pyarrow versions and their behavior, so asking here anyway.
The context is with a polars parquet io manager pointing at google cloud storage.
AttributeError: module 'pyarrow' has no attribute 'dataset'
Stack Trace:
File "/Users/seandavis/Library/Caches/pypoetry/virtualenvs/dagster-infra-h7rMZSN8-py3.9/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/Users/seandavis/Library/Caches/pypoetry/virtualenvs/dagster-infra-h7rMZSN8-py3.9/lib/python3.9/site-packages/dagster/_core/execution/plan/inputs.py", line 831, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/Users/seandavis/Library/Caches/pypoetry/virtualenvs/dagster-infra-h7rMZSN8-py3.9/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 402, in load_input
return self._load_single_input(path, context)
File "/Users/seandavis/Library/Caches/pypoetry/virtualenvs/dagster-infra-h7rMZSN8-py3.9/lib/python3.9/site-packages/dagster/_core/storage/upath_io_manager.py", line 239, in _load_single_input
obj = self.load_from_path(context=context, path=path)
File "/Users/seandavis/Library/Caches/pypoetry/virtualenvs/dagster-infra-h7rMZSN8-py3.9/lib/python3.9/site-packages/dagster_polars/io_managers/base.py", line 193, in load_from_path
ldf = self.scan_df_from_path(path=path, context=context) # type: ignore
File "/Users/seandavis/Library/Caches/pypoetry/virtualenvs/dagster-infra-h7rMZSN8-py3.9/lib/python3.9/site-packages/dagster_polars/io_managers/parquet.py", line 83, in scan_df_from_path
ds = pyarrow.dataset.dataset(
File "/Users/seandavis/Library/Caches/pypoetry/virtualenvs/dagster-infra-h7rMZSN8-py3.9/lib/python3.9/site-packages/pyarrow/__init__.py", line 317, in __getattr__
raise AttributeError(
Hi,
As far as I am aware, for a partioned LazyFrame/DataFrame, you can only read it as DataFramePartitions|LazyFramePartitions, is it possible to read partioned parquet as a single LazyFrame?
polars 0.20.5 support syntax like wildcard read for all parquet file under a folder like:
pl.scan_parquet("folder/*parquet")
so on polars side, user's direct reading each partitioned parquets as a dictionary of files is not necessary now.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.