Comments (4)
@rjzamora do you have time to poke at this?
Yes, I also find this concerning. So, I definitely want to figure out what is going wrong.
Side Note: When we start implementing #10602, I have some ideas for how we can get rid of all of this ugly "adaptive aggregation" code without sacrificing our ability to split files.
from dask.
I also encountered this in #10722
from dask.
I ran into the same test_split_adaptive_aggregate_files
test failing in #10729 but with a different traceback
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
when attempting to read in a partition with the fastparquet
engine (full traceback below)
Full traceback:
_______ test_split_adaptive_aggregate_files[b-fastparquet-fastparquet] ________
[gw0] win32 -- Python 3.10.13 C:\Users\runneradmin\miniconda3\envs\test-environment\python.exe
column = column_index_length: null
column_index_offset: null
crypto_metadata: null
encrypted_column_metadata: null
file_offset:...otal_compressed_size: 37
total_uncompressed_size: null
type: 1
offset_index_length: null
offset_index_offset: null
schema_helper = <Parquet Schema with 3 entries>
infile = <fsspec.implementations.local.LocalFileOpener object at 0x000001AF9C49BA30>
use_cat = False, selfmade = True, assign = array([0]), catdef = None
row_filter = False
def read_col(column, schema_helper, infile, use_cat=False,
selfmade=False, assign=None, catdef=None,
row_filter=None):
"""Using the given metadata, read one column in one row-group.
Parameters
----------
column: thrift structure
Details on the column
schema_helper: schema.SchemaHelper
Based on the schema for this parquet data
infile: open file or string
If a string, will open; if an open object, will use as-is
use_cat: bool (False)
If this column is encoded throughout with dict encoding, give back
a pandas categorical column; otherwise, decode to values
row_filter: bool array or None
if given, selects which of the values read are to be written
into the output. Effectively implies NULLs, even for a required
column.
"""
cmd = column.meta_data
try:
> se = schema_helper.schema_element(cmd.path_in_schema)
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:450:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Parquet Schema with 3 entries>, name = ['d']
def schema_element(self, name):
"""Get the schema element with the given name or path"""
root = self.root
if isinstance(name, str):
name = name.split('.')
for part in name:
> root = root["children"][part]
E KeyError: 'd'
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\schema.py:119: KeyError
During handling of the above exception, another exception occurred:
tmpdir = local('C:\\Users\\runneradmin\\AppData\\Local\\Temp\\pytest-of-runneradmin\\pytest-0\\popen-gw0\\test_split_adaptive_aggregate_4')
write_engine = 'fastparquet', read_engine = 'fastparquet', aggregate_files = 'b'
@write_read_engines()
@pytest.mark.parametrize("aggregate_files", ["a", "b"])
def test_split_adaptive_aggregate_files(
tmpdir, write_engine, read_engine, aggregate_files
):
blocksize = "1MiB"
partition_on = ["a", "b"]
df_size = 100
df1 = pd.DataFrame(
{
"a": np.random.choice(["apple", "banana", "carrot"], size=df_size),
"b": np.random.choice(["small", "large"], size=df_size),
"c": np.random.random(size=df_size),
"d": np.random.randint(1, 100, size=df_size),
}
)
ddf1 = dd.from_pandas(df1, npartitions=9)
ddf1.to_parquet(
str(tmpdir),
engine=write_engine,
partition_on=partition_on,
write_index=False,
)
with pytest.warns(FutureWarning, match="Behavior may change"):
ddf2 = dd.read_parquet(
str(tmpdir),
engine=read_engine,
blocksize=blocksize,
split_row_groups="adaptive",
aggregate_files=aggregate_files,
)
# Check that files where aggregated as expected
if aggregate_files == "a":
assert ddf2.npartitions == 3
elif aggregate_files == "b":
assert ddf2.npartitions == 6
# Check that the final data is correct
> df2 = ddf2.compute().sort_values(["c", "d"])
dask\dataframe\io\tests\test_parquet.py:3132:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask\base.py:342: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask\base.py:628: in compute
results = schedule(dsk, keys, **kwargs)
dask\dataframe\io\parquet\core.py:96: in __call__
return read_parquet_part(
dask\dataframe\io\parquet\core.py:668: in read_parquet_part
df = engine.read_partition(
dask\dataframe\io\parquet\fastparquet.py:1075: in read_partition
return cls.pf_to_pandas(
dask\dataframe\io\parquet\fastparquet.py:1172: in pf_to_pandas
pf.read_row_group_file(
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\api.py:386: in read_row_group_file
core.read_row_group(
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:642: in read_row_group
read_row_group_arrays(file, rg, columns, categories, schema_helper,
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:612: in read_row_group_arrays
read_col(column, schema_helper, file, use_cat=name+'-catdef' in out,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
column = column_index_length: null
column_index_offset: null
crypto_metadata: null
encrypted_column_metadata: null
file_offset:...otal_compressed_size: 37
total_uncompressed_size: null
type: 1
offset_index_length: null
offset_index_offset: null
schema_helper = <Parquet Schema with 3 entries>
infile = <fsspec.implementations.local.LocalFileOpener object at 0x000001AF9C49BA30>
use_cat = False, selfmade = True, assign = array([0]), catdef = None
row_filter = False
def read_col(column, schema_helper, infile, use_cat=False,
selfmade=False, assign=None, catdef=None,
row_filter=None):
"""Using the given metadata, read one column in one row-group.
Parameters
----------
column: thrift structure
Details on the column
schema_helper: schema.SchemaHelper
Based on the schema for this parquet data
infile: open file or string
If a string, will open; if an open object, will use as-is
use_cat: bool (False)
If this column is encoded throughout with dict encoding, give back
a pandas categorical column; otherwise, decode to values
row_filter: bool array or None
if given, selects which of the values read are to be written
into the output. Effectively implies NULLs, even for a required
column.
"""
cmd = column.meta_data
try:
se = schema_helper.schema_element(cmd.path_in_schema)
except KeyError:
# column not present in this row group
> assign[:] = None
E TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:453: TypeError
from dask.
when attempting to read in a partition with the fastparquet engine (full traceback below)
Thanks @jrbourbeau ! I have been struggling to figure out what is going wrong beyond "The fastparquet engine is messing up somewhere with hive-partitioned data" - This is helpful.
from dask.
Related Issues (20)
- pandas upstream package fails to install HOT 3
- Pandas read_sql vs dask read_sql issues HOT 2
- assert_eq sometimes doesn't raise for differing string dtypes
- Issue repartitioning a time series by frequency when loaded from parquet file HOT 5
- UnicodeDecodeError when using a Dataframe with byte data and pandas 2 HOT 1
- RFE: is it possible to start making github releases?🤔 HOT 3
- Dataframe doesn't copy lists when doing column projections
- Sphinx API documentation for `dask.config` shows the whole config
- Inconsistent casting behaviour with dask-expr Dataframe HOT 2
- Support bag.to_dataframe when query planning is enabled HOT 1
- Drop pandas 1.X support? HOT 1
- dask-expr: computing single partition after set_index in from_pandas dd.DataFrame fails HOT 2
- dask-expr: DataFrame.map_partitions no longer takes a `token` keyword HOT 1
- dask-expr is now a hard dependency HOT 3
- Sparse masking throws error HOT 1
- Importing dask 2023.7.1 breaks `sys.last_traceback` in IPython HOT 2
- Dask Nunique bug under dask 2024.2.1 HOT 7
- CI failing on `main`
- CI is printing tracebacks for all xfailed tests which can be very confusing
- Combined save and calculation is using excessive memory HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dask.