From d55ce777d674c5b16bfe8a2c1158f5296226da6c Mon Sep 17 00:00:00 2001 From: yohplala Date: Tue, 25 Mar 2025 14:25:01 +0100 Subject: [PATCH] Fix categorical data appending - ticket #949. --- .github/workflows/main.yaml | 29 +++++- .github/workflows/test_wheel.yaml | 2 +- .github/workflows/wheel.yml | 12 +-- ci/environment-py310.yml | 1 - docs/source/details.rst | 4 +- fastparquet/api.py | 10 +- fastparquet/core.py | 91 +++++++++++++++-- fastparquet/dataframe.py | 15 ++- fastparquet/test/test_api.py | 49 +++++++++- fastparquet/test/test_dataframe.py | 22 ++--- fastparquet/test/test_output.py | 98 ++++++++++++++++++- .../test_partition_filters_specialstrings.py | 46 +++++---- fastparquet/writer.py | 44 ++++++--- pyproject.toml | 6 ++ setup.cfg | 11 --- setup.py | 7 +- 16 files changed, 346 insertions(+), 101 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index dcf72914..79b31135 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -23,8 +23,13 @@ jobs: with: fetch-depth: 0 + - name: Fetch upstream tags + run: | + git remote add upstream https://github.com/dask/fastparquet.git + git fetch upstream --tags + - name: Setup conda - uses: mamba-org/provision-with-micromamba@main + uses: conda-incubator/setup-miniconda@v3 with: environment-file: ci/environment-${{ matrix.CONDA_ENV }}.yml @@ -53,8 +58,13 @@ jobs: with: fetch-depth: 0 + - name: Fetch upstream tags + run: | + git remote add upstream https://github.com/dask/fastparquet.git + git fetch upstream --tags + - name: Setup conda - uses: mamba-org/provision-with-micromamba@main + uses: conda-incubator/setup-miniconda@v3 with: environment-file: ci/environment-${{ matrix.CONDA_ENV }}.yml @@ -82,8 +92,13 @@ jobs: with: fetch-depth: 0 + - name: Fetch upstream tags + run: | + git remote add upstream https://github.com/dask/fastparquet.git + git fetch upstream --tags + - name: Setup conda - uses: mamba-org/provision-with-micromamba@main + uses: conda-incubator/setup-miniconda@v3 with: environment-file: ci/environment-py310.yml @@ -94,6 +109,7 @@ jobs: pip install hypothesis pip install pytest-localserver pytest-xdist pytest-asyncio pip install -e . --no-deps # Install fastparquet + pip install versioneer # Needed for pandas build git clone https://github.com/pandas-dev/pandas cd pandas python setup.py build_ext -j 4 @@ -117,8 +133,13 @@ jobs: with: fetch-depth: 0 + - name: Fetch upstream tags + run: | + git remote add upstream https://github.com/dask/fastparquet.git + git fetch upstream --tags + - name: Setup conda - uses: mamba-org/provision-with-micromamba@main + uses: conda-incubator/setup-miniconda@v3 with: environment-file: ci/environment-py310win.yml diff --git a/.github/workflows/test_wheel.yaml b/.github/workflows/test_wheel.yaml index ff03aff0..23cea75d 100644 --- a/.github/workflows/test_wheel.yaml +++ b/.github/workflows/test_wheel.yaml @@ -58,7 +58,7 @@ jobs: python -m pip install delvewheel cython - name: Build wheels - uses: joerick/cibuildwheel@v2.16.5 + uses: joerick/cibuildwheel@v2.21.3 - name: Install wheels shell: bash -l {0} diff --git a/.github/workflows/wheel.yml b/.github/workflows/wheel.yml index 7e0d042b..90473ace 100644 --- a/.github/workflows/wheel.yml +++ b/.github/workflows/wheel.yml @@ -50,7 +50,7 @@ jobs: python -m pip install delvewheel cython - name: Build wheels - uses: joerick/cibuildwheel@v2.18.1 + uses: joerick/cibuildwheel@v2.21.3 - uses: actions/upload-artifact@v3 with: @@ -105,7 +105,7 @@ jobs: python -m pip install delvewheel cython - name: Build wheels - uses: joerick/cibuildwheel@v2.18.1 + uses: joerick/cibuildwheel@v2.21.3 - uses: actions/upload-artifact@v3 with: @@ -160,7 +160,7 @@ jobs: python -m pip install delvewheel cython - name: Build wheels - uses: joerick/cibuildwheel@v2.18.1 + uses: joerick/cibuildwheel@v2.21.3 - uses: actions/upload-artifact@v3 with: @@ -215,7 +215,7 @@ jobs: python -m pip install delvewheel cython - name: Build wheels - uses: joerick/cibuildwheel@v2.18.1 + uses: joerick/cibuildwheel@v2.21.3 - uses: actions/upload-artifact@v3 with: @@ -246,10 +246,10 @@ jobs: - name: Setup Python uses: actions/setup-python@v5 with: - python-version: "3.12" + python-version: "3.13" - name: Build wheels - uses: joerick/cibuildwheel@v2.18.1 + uses: joerick/cibuildwheel@v2.21.3 - uses: actions/upload-artifact@v3 with: diff --git a/ci/environment-py310.yml b/ci/environment-py310.yml index bf765296..4dee9657 100644 --- a/ci/environment-py310.yml +++ b/ci/environment-py310.yml @@ -18,6 +18,5 @@ dependencies: - orjson - ujson - python-rapidjson - - versioneer - meson-python - pyarrow diff --git a/docs/source/details.rst b/docs/source/details.rst index d915b428..decd75cf 100644 --- a/docs/source/details.rst +++ b/docs/source/details.rst @@ -194,7 +194,7 @@ split data data on the values of those columns. This is done by writing a directory structure with *key=value* names. Multiple partition columns can be chosen, leading to a multi-level directory tree. -Consider the following directory tree from this `Spark example `_: +Consider the following directory tree from this `Spark example `_: table/ gender=male/ @@ -261,4 +261,4 @@ the file system implementation. .. raw:: html \ No newline at end of file + async src="//gc.zgo.at/count.js"> diff --git a/fastparquet/api.py b/fastparquet/api.py index c54e5eb5..df43bd30 100644 --- a/fastparquet/api.py +++ b/fastparquet/api.py @@ -196,6 +196,7 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False, "a filesystem compatible with fsspec") from e self.open = open_with self._statistics = None + self.global_cats = {} def _parse_header(self, f, verify=True): if self.fn and self.fn.endswith("_metadata"): @@ -260,7 +261,7 @@ def columns(self): @property def statistics(self): - if self._statistics is None: + if not hasattr(self, '_statistics') or self._statistics is None: self._statistics = statistics(self) return self._statistics @@ -318,7 +319,8 @@ def __getitem__(self, item): new_pf.__setstate__( {"fn": self.fn, "open": self.open, "fmd": fmd, "pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype, - "tz": self.tz, "_columns_dtype": self._columns_dtype} + "tz": self.tz, "_columns_dtype": self._columns_dtype, + "global_cats": {}} # fresh empty dict for the slice ) new_pf._set_attrs() return new_pf @@ -389,7 +391,7 @@ def read_row_group_file(self, rg, columns, categories, index=None, f, rg, columns, categories, self.schema, self.cats, selfmade=self.selfmade, index=index, assign=assign, scheme=self.file_scheme, partition_meta=partition_meta, - row_filter=row_filter + row_filter=row_filter, global_cats=self.global_cats ) if ret: return df @@ -1011,7 +1013,7 @@ def __getstate__(self): self.fmd.row_groups = [] return {"fn": self.fn, "open": self.open, "fmd": self.fmd, "pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype, - "tz": self.tz} + "tz": self.tz, "global_cats": self.global_cats} def __setstate__(self, state): self.__dict__.update(state) diff --git a/fastparquet/core.py b/fastparquet/core.py index 79c17762..c9bf4800 100644 --- a/fastparquet/core.py +++ b/fastparquet/core.py @@ -200,7 +200,7 @@ def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata, def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, dic, assign, num, use_cat, file_offset, ph, idx=None, - selfmade=False, row_filter=None): + selfmade=False, row_filter=None, remap_array=None): """ :param infile: open file :param schema_helper: @@ -211,6 +211,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, :param assign: output array (all of it) :param num: offset, rows so far :param use_cat: output is categorical? + :param remap_array: array for remapping categorical indices :return: None test data "/Users/mdurant/Downloads/datapage_v2.snappy.parquet" @@ -338,6 +339,9 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, if bit_width in [8, 16, 32] and selfmade: # special fastpath for cats outbytes = raw_bytes[pagefile.tell():] + if remap_array is not None: + # Apply remapping to outbytes. + outbytes = remap_array[outbytes] if len(outbytes) == assign[num:num+data_header2.num_values].nbytes: assign[num:num+data_header2.num_values].view('uint8')[row_filter] = outbytes[row_filter] else: @@ -358,6 +362,9 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, encoding.NumpyIO(assign[num:num+data_header2.num_values].view('uint8')), itemsize=bit_width ) + if remap_array is not None: + # Apply remapping after reading + assign[num:num+data_header2.num_values] = remap_array[assign[num:num+data_header2.num_values]] else: temp = np.empty(data_header2.num_values, assign.dtype) encoding.read_rle_bit_packed_hybrid( @@ -367,6 +374,8 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, encoding.NumpyIO(temp.view('uint8')), itemsize=bit_width ) + if remap_array is not None: + temp = remap_array[temp] if not nullable: assign[num:num+data_header2.num_values][nulls[row_filter]] = None assign[num:num+data_header2.num_values][~nulls[row_filter]] = temp[row_filter] @@ -429,7 +438,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, def read_col(column, schema_helper, infile, use_cat=False, selfmade=False, assign=None, catdef=None, - row_filter=None): + row_filter=None, global_cats=None): """Using the given metadata, read one column in one row-group. Parameters @@ -443,10 +452,20 @@ def read_col(column, schema_helper, infile, use_cat=False, use_cat: bool (False) If this column is encoded throughout with dict encoding, give back a pandas categorical column; otherwise, decode to values + selfmade: bool (False) + If data created by fastparquet + assign: numpy array + Where to store the result + catdef: pandas.Categorical or CategoricalDtype + If reading a categorical column, the categorical definition (categories and + ordering). row_filter: bool array or None if given, selects which of the values read are to be written into the output. Effectively implies NULLs, even for a required column. + global_cats: dict or None + Optional dictionary for storing global categorical values across row groups. + Format: {col_path: array} """ cmd = column.meta_data try: @@ -480,6 +499,15 @@ def read_col(column, schema_helper, infile, use_cat=False, row_idx = [0] # map/list objects dic = None index_off = 0 # how far through row_filter we are + + # Initialize tracking variables for categorical dictionaries + # Only set up global dictionary tracking if using categorical and global_cats is provided + remap_dict = {} # Dictionary for collecting mappings + if use_cat and global_cats is not None: + path_str = ".".join(cmd.path_in_schema) + # Register this column in global_cats if not already present + if path_str not in global_cats: + global_cats[path_str] = None while num < rows: off = infile.tell() @@ -497,7 +525,44 @@ def read_col(column, schema_helper, infile, use_cat=False, ddt = [kv.value.decode() for kv in (cmd.key_value_metadata or []) if kv.key == b"label_dtype"] ddt = ddt[0] if ddt else None - catdef._set_categories(pd.Index(dic, dtype=ddt), fastpath=True) + + if global_cats is not None: + # Check if categorical values are consistent with global dictionary. + if global_cats[path_str] is None: + # This is the first dictionary for this column, save it as global + global_cats[path_str] = dic + else: + # Dictionary already defined for this column, check for inconsistency. + global_dict = global_cats[path_str] + new_values = [] + # Build remap_dict in a single comprehension, + # appending new values to new_values at the same time: + # - Use walrus operator (:=) to store found_idx from global_dict lookup. + # - When found_idx is -1, append val to new_values and use its new position. + # - Only include indices that need remapping (found_idx != i). + remap_dict = {i: (len(global_dict) + len(new_values) - 1) + if found_idx == -1 and not new_values.append(val) else found_idx + for (i,), val in np.ndenumerate(dic) + if (found_idx := next((j + for (j,), gval in np.ndenumerate(global_dict) + if val == gval), -1)) != i + } + if remap_dict: + # If any remapping is needed, create a complete remap array. + # Initialize with identity mapping (no change) + remap_array = np.arange(len(dic), dtype=np.int32) + # Update indices that need remapping + remap_array[list(remap_dict)] = list(remap_dict.values()) + if new_values: + # Add new values to global dictionary + global_cats[path_str] = np.append(global_dict, new_values) + # Update categories + catdef._set_categories(pd.Index(global_cats[path_str], dtype=ddt), fastpath=True) + + # Normal case - always set categories for this dictionary + if global_cats is None or not remap_dict: + catdef._set_categories(pd.Index(dic, dtype=ddt), fastpath=True) + if np.iinfo(assign.dtype).max < len(dic): raise RuntimeError('Assigned array dtype (%s) cannot accommodate ' 'number of category labels (%i)' % @@ -509,7 +574,7 @@ def read_col(column, schema_helper, infile, use_cat=False, if ph.type == parquet_thrift.PageType.DATA_PAGE_V2: num += read_data_page_v2(infile, schema_helper, se, ph.data_page_header_v2, cmd, dic, assign, num, use_cat, off, ph, row_idx, selfmade=selfmade, - row_filter=row_filter) + row_filter=row_filter, remap_array=remap_array if remap_dict else None) continue if (selfmade and hasattr(cmd, 'statistics') and getattr(cmd.statistics, 'null_count', 1) == 0): @@ -563,6 +628,9 @@ def read_col(column, schema_helper, infile, use_cat=False, part[defi == max_defi] = dic[val] elif not use_cat: part[defi == max_defi] = convert(val, se, dtype=assign.dtype) + elif remap_dict: + # Apply remapping of categorical codes + part[defi == max_defi] = remap_array[val] else: part[defi == max_defi] = val else: @@ -582,6 +650,9 @@ def read_col(column, schema_helper, infile, use_cat=False, piece[:] = dic[val] elif not use_cat: piece[:] = convert(val, se, dtype=assign.dtype) + elif remap_dict: + # Apply remapping of categorical codes + piece[:] = remap_array[val] else: piece[:] = val @@ -589,7 +660,8 @@ def read_col(column, schema_helper, infile, use_cat=False, def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, - selfmade=False, assign=None, row_filter=False): + selfmade=False, assign=None, row_filter=False, + global_cats=None): """ Read a row group and return as a dict of arrays @@ -615,7 +687,7 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, read_col(column, schema_helper, file, use_cat=name+'-catdef' in out, selfmade=selfmade, assign=out[name], catdef=out.get(name+'-catdef', None), - row_filter=row_filter) + row_filter=row_filter, global_cats=global_cats) if _is_map_like(schema_helper, column): # TODO: could be done in fast loop in _assemble_objects? @@ -632,10 +704,10 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, for k in remains: out[k][:] = None - def read_row_group(file, rg, columns, categories, schema_helper, cats, selfmade=False, index=None, assign=None, - scheme='hive', partition_meta=None, row_filter=False): + scheme='hive', partition_meta=None, row_filter=False, + global_cats=None): """ Access row-group in a file and read some columns into a data-frame. """ @@ -643,7 +715,8 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats, if assign is None: raise RuntimeError('Going with pre-allocation!') read_row_group_arrays(file, rg, columns, categories, schema_helper, - cats, selfmade, assign=assign, row_filter=row_filter) + cats, selfmade, assign=assign, row_filter=row_filter, + global_cats=global_cats) for cat in cats: if cat not in assign: diff --git a/fastparquet/dataframe.py b/fastparquet/dataframe.py index 1e2aa583..a4f0f45a 100644 --- a/fastparquet/dataframe.py +++ b/fastparquet/dataframe.py @@ -144,14 +144,19 @@ def cat(col): # validation due to being an out-of-bounds datetime. xref # https://github.com/dask/fastparquet/issues/778 dtype = np.dtype(t) - d = np.zeros(size, dtype=dtype) if dtype.kind == "M" else np.empty(size, dtype=dtype) - if d.dtype.kind == "M" and str(col) in timezones: + if dtype.kind == "M": + d = np.zeros(size, dtype=dtype) # 1) create the DatetimeIndex in UTC as no datetime conversion is needed and # it works with d uninitialised data (no NonExistentTimeError or AmbiguousTimeError) # 2) convert to timezone (if UTC=noop, if None=remove tz, if other=change tz) - index = DatetimeIndex(d, tz="UTC").tz_convert( - tz_to_dt_tz(timezones[str(col)])) + if str(col) in timezones: + index = DatetimeIndex(d, tz="UTC").tz_convert( + tz_to_dt_tz(timezones[str(col)])) + else: + index = DatetimeIndex(d, tz=None) + d = index._data._ndarray else: + d = np.empty(size, dtype=dtype) index = Index(d) views[col] = d else: @@ -238,7 +243,7 @@ def set_cats(values, i=i, col=col, **kwargs): views[col] = block.values._codes views[col+'-catdef'] = block.values elif getattr(block.dtype, 'tz', None): - arr = np.asarray(block.values, dtype='M8[ns]') + arr = block.values._ndarray if len(arr.shape) > 1: # pandas >= 1.3 does this for some reason arr = arr.squeeze(axis=0) diff --git a/fastparquet/test/test_api.py b/fastparquet/test/test_api.py index 62cf749c..6ffdce5c 100644 --- a/fastparquet/test/test_api.py +++ b/fastparquet/test/test_api.py @@ -44,14 +44,23 @@ def test_statistics(tempdir): p = ParquetFile(fn) s = statistics(p) - expected = {'distinct_count': {'x': [None, None], + expected1 = {'distinct_count': {'x': [None, None], 'y': [None, None], 'z': [None, None]}, 'max': {'x': [2, 3], 'y': [2.0, 1.0], 'z': ['b', 'c']}, 'min': {'x': [1, 3], 'y': [1.0, 1.0], 'z': ['a', 'c']}, 'null_count': {'x': [0, 0], 'y': [0, 0], 'z': [0, 0]}} - assert s == expected + assert s == expected1 + + expected2 = {'distinct_count': {'x': [None], + 'y': [None], + 'z': [None]}, + 'max': {'x': [3], 'y': [1.0], 'z': ['c']}, + 'min': {'x': [3], 'y': [1.0], 'z': ['c']}, + 'null_count': {'x': [0], 'y': [0], 'z': [0]}} + + assert p[-1].statistics == expected2 def test_logical_types(tempdir): @@ -1546,3 +1555,39 @@ def test_read_a_non_pandas_parquet_file(tempdir): assert parquet_file.count() == 2 assert parquet_file.head(1).equals(pd.DataFrame({"foo": [0], "bar": ["a"]})) + + +def test_gh929(tempdir): + idx = pd.date_range("2024-01-01", periods=4, freq="h", tz="Europe/Brussels") + df = pd.DataFrame(index=idx, data={"index_as_col": idx}) + + df.to_parquet(f"{tempdir}/test_datetimetz_index.parquet", engine="fastparquet") + result = pd.read_parquet(f"{tempdir}/test_datetimetz_index.parquet", engine="fastparquet") + assert result.index.equals(df.index) + + +def test_writing_to_buffer_does_not_close(): + df = pd.DataFrame({"val": [1, 2]}) + buffer = io.BytesIO() + write(buffer, df, file_scheme="simple") + assert not buffer.closed + parquet_file = ParquetFile(buffer) + assert parquet_file.count() == 2 + + +@pytest.fixture() +def pandas_string(): + if pd.__version__.split(".") < ["3"]: + pytest.skip("'string' type coming in pandas 3.0.0") + original = pd.options.future.infer_string + pd.options.future.infer_string = True + yield + pd.options.future.infer_string = original + + +def test_auto_string(tempdir, pandas_string): + fn = f"{tempdir}/test.parquet" + df = pd.DataFrame({"a": ["some", "strings"]}) + df.to_parquet(fn, engine="fastparquet") + + diff --git a/fastparquet/test/test_dataframe.py b/fastparquet/test/test_dataframe.py index 24da294c..6066090a 100644 --- a/fastparquet/test/test_dataframe.py +++ b/fastparquet/test/test_dataframe.py @@ -66,8 +66,8 @@ def np_empty_mock(shape, dtype): def test_empty_tz_nonutc(): df, views = empty(types=[DatetimeTZDtype(unit="ns", tz="CET")], size=8784, cols=['a'], timezones={'a': 'CET', 'index': 'CET'}, index_types=["datetime64[ns]"], index_names=["index"]) - assert df.index.tz.zone == "CET" - assert df.a.dtype.tz.zone == "CET" + assert str(df.index.tz) == "CET" + assert str(df.a.dtype.tz) == "CET" # non-regression test for https://github.com/dask/fastparquet/issues/778 @@ -91,18 +91,18 @@ def test_timestamps(): views['t'].dtype.kind == "M" df, views = empty('M8', 100, cols=['t'], timezones={'t': z}) - assert df.t.dt.tz.zone == z + assert str(df.t.dt.tz) == z views['t'].dtype.kind == "M" # one time column, one normal df, views = empty('M8,i', 100, cols=['t', 'i'], timezones={'t': z}) - assert df.t.dt.tz.zone == z + assert str(df.t.dt.tz) == z views['t'].dtype.kind == "M" views['i'].dtype.kind == 'i' # no effect of timezones= on non-time column df, views = empty('M8,i', 100, cols=['t', 'i'], timezones={'t': z, 'i': z}) - assert df.t.dt.tz.zone == z + assert str(df.t.dt.tz) == z assert df.i.dtype.kind == 'i' views['t'].dtype.kind == "M" views['i'].dtype.kind == 'i' @@ -111,22 +111,22 @@ def test_timestamps(): z2 = 'US/Central' df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z, 't2': z}) - assert df.t1.dt.tz.zone == z - assert df.t2.dt.tz.zone == z + assert str(df.t1.dt.tz) == z + assert str(df.t2.dt.tz) == z df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z}) - assert df.t1.dt.tz.zone == z + assert str(df.t1.dt.tz) == z assert df.t2.dt.tz is None df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z, 't2': 'UTC'}) - assert df.t1.dt.tz.zone == z + assert str(df.t1.dt.tz) == z assert str(df.t2.dt.tz) == 'UTC' df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z, 't2': z2}) - assert df.t1.dt.tz.zone == z - assert df.t2.dt.tz.zone == z2 + assert str(df.t1.dt.tz) == z + assert str(df.t2.dt.tz) == z2 def test_pandas_hive_serialization(tmpdir): diff --git a/fastparquet/test/test_output.py b/fastparquet/test/test_output.py index 01c265a9..283a0ca2 100644 --- a/fastparquet/test/test_output.py +++ b/fastparquet/test/test_output.py @@ -174,15 +174,15 @@ def test_roundtrip_complex(tempdir, scheme,): @pytest.mark.parametrize('df', [ makeMixedDataFrame(), pd.DataFrame({'x': pd.date_range('3/6/2012 00:00', - periods=10, freq='H', tz='Europe/London')}), + periods=10, freq='h', tz='Europe/London')}), pd.DataFrame({'x': pd.date_range('3/6/2012 00:00', - periods=10, freq='H', tz='Europe/Berlin')}), + periods=10, freq='h', tz='Europe/Berlin')}), pd.DataFrame({'x': pd.date_range('3/6/2012 00:00', - periods=10, freq='H', tz='UTC')}), + periods=10, freq='h', tz='UTC')}), pd.DataFrame({'x': pd.date_range('3/6/2012 00:00', - periods=10, freq='H', tz=datetime.timezone.min)}), + periods=10, freq='h', tz=datetime.timezone.min)}), pd.DataFrame({'x': pd.date_range('3/6/2012 00:00', - periods=10, freq='H', tz=datetime.timezone.max)}) + periods=10, freq='h', tz=datetime.timezone.max)}) ]) def test_datetime_roundtrip(tempdir, df, capsys): fname = os.path.join(tempdir, 'test.parquet') @@ -1214,3 +1214,91 @@ def test_attrs_roundtrip(tempdir): df.to_parquet(path=fn, engine="fastparquet") df2 = pd.read_parquet(fn, engine="fastparquet") assert df2.attrs == attrs + + +def test_append_different_categorical_simple(tempdir): + """Test for issue #949: wrong categories data when appending with categorical columns""" + fn = os.path.join(str(tempdir), 'test.parquet') + # First DataFrame with a categorical column + df1 = pd.DataFrame({ + "col1": [1, 4, 7], + "col2": [2, 5, 8] + }) + df1["col2"] = df1["col2"].astype("category") + write(fn, df1, write_index=False, file_scheme='simple') + # Second DataFrame to append + df2 = pd.DataFrame({ + "col1": [4, 7, 10], + "col2": [5, 8, 11] + }) + df2["col2"] = df2["col2"].astype("category") + write(fn, df2, append=True, write_index=False, file_scheme='simple') + # Read back again - this should maintain correct categorical values + df_combined = pd.read_parquet(fn, engine="fastparquet") + # Expected result when concatenating the two dataframes + expected = pd.concat([df1, df2], ignore_index=True) + expected["col2"] = expected["col2"].astype("category") + assert_frame_equal(df_combined, expected) + + +def test_append_different_categorical_multi(tempdir): + """Test for issue #949: wrong categories data when appending with categorical columns""" + # Testing ParquetFile slicing as well. + # Set random seed for reproducibility + np.random.seed(42) + # Create initial DataFrame with categorical columns + def create_test_df(start_idx, rows, cats1, cats2): + cat1 = [cats1[i % len(cats1)] for i in range(rows)] + cat2 = [cats2[i % len(cats2)] for i in range(rows)] + df = pd.DataFrame({ + 'cat_col1': cat1, + 'cat_col2': cat2, + 'value': np.random.rand(rows) + }) + df['cat_col1'] = df['cat_col1'].astype('category') + df['cat_col2'] = df['cat_col2'].astype('category') + return df + # Initial categories + cats1 = ['A', 'B', 'C'] + cats2 = [10, 20, 30] + # First dataframe + fn = os.path.join(str(tempdir), 'test_parquet') + df1 = create_test_df(0, 5, cats1, cats2) + write(fn, df1, file_scheme='hive', write_index=False) + # New categories for second dataframe (overlapping + new values) + cats1_2 = ['B', 'C', 'D'] # B,C overlap with first df, D is new + cats2_2 = [30, 40, 50] + # Create second dataframe + df2 = create_test_df(len(df1), 6, cats1_2, cats2_2) + # Append second dataframe + write(fn, df2, file_scheme='hive', append=True, write_index=False) + # New categories for third dataframe (different ordering + new values) + cats1_3 = ['E', 'C', 'A'] # A,C from first, E is new + cats2_3 = [60, 70, 50] # Mixed order + # Create third dataframe + df3 = create_test_df(len(df1)+len(df2), 7, cats1_3, cats2_3) + # Append third dataframe + write(fn, df3, file_scheme='hive', append=True, write_index=False) + # Combine all original dataframes for comparison + expected_df = pd.concat([df1, df2, df3], axis=0, ignore_index=True) + expected_df['cat_col1'] = expected_df['cat_col1'].astype('category') + expected_df['cat_col2'] = expected_df['cat_col2'].astype('category') + pf = ParquetFile(fn) + actual_df = pf.to_pandas() + # Assert that the dataframes are equal + assert_frame_equal(expected_df, actual_df) + # Test slicing. + actual_df_subset = pf[1:].to_pandas() + expected_df_subset = pd.concat([df2, df3], axis=0, ignore_index=True) + expected_df_subset['cat_col1'] = expected_df_subset['cat_col1'].astype('category') + expected_df_subset['cat_col2'] = expected_df_subset['cat_col2'].astype('category') + try: + # Code to manage new categorical values in fastparquet does not reorder them. + # Code in pandas concat seems to do so. + actual_df_subset['cat_col1'] = actual_df_subset['cat_col1'].cat.reorder_categories( + expected_df_subset['cat_col1'].cat.categories + ) + except ValueError: + raise AssertionError("failed to reorder categories") + assert_frame_equal(expected_df_subset, actual_df_subset) + diff --git a/fastparquet/test/test_partition_filters_specialstrings.py b/fastparquet/test/test_partition_filters_specialstrings.py index 059481b8..fa2c3d4c 100644 --- a/fastparquet/test/test_partition_filters_specialstrings.py +++ b/fastparquet/test/test_partition_filters_specialstrings.py @@ -37,26 +37,30 @@ def frame_symbol_dtTrade_type_strike(days=1 * 252, @pytest.mark.parametrize('input_symbols,input_days,file_scheme,input_columns,' 'partitions,filters', [ - (['NOW', 'SPY', 'VIX'], 2 * 252, 'hive', 2, - ['symbol', 'year'], [('symbol', '==', 'SPY')]), - (['now', 'SPY', 'VIX'], 2 * 252, 'hive', 2, - ['symbol', 'year'], [('symbol', '==', 'SPY')]), - (['TODAY', 'SPY', 'VIX'], 2 * 252, 'hive', 2, - ['symbol', 'year'], [('symbol', '==', 'SPY')]), - (['VIX*', 'SPY', 'VIX'], 2 * 252, 'hive', 2, - ['symbol', 'year'], [('symbol', '==', 'SPY')]), - (['QQQ*', 'SPY', 'VIX'], 2 * 252, 'hive', 2, - ['symbol', 'year'], [('symbol', '==', 'SPY')]), - (['QQQ!', 'SPY', 'VIX'], 2 * 252, 'hive', 2, - ['symbol', 'year'], [('symbol', '==', 'SPY')]), - (['Q%QQ', 'SPY', 'VIX'], 2 * 252, 'hive', 2, - ['symbol', 'year'], [('symbol', '==', 'SPY')]), - (['NOW', 'SPY', 'VIX'], 10, 'hive', 2, - ['symbol', 'dtTrade'], [('symbol', '==', 'SPY')]), + # (['NOW', 'SPY', 'VIX'], 2 * 252, 'hive', 2, + # ['symbol', 'year'], [('symbol', '==', 'SPY')]), + # (['now', 'SPY', 'VIX'], 2 * 252, 'hive', 2, + # ['symbol', 'year'], [('symbol', '==', 'SPY')]), + # (['TODAY', 'SPY', 'VIX'], 2 * 252, 'hive', 2, + # ['symbol', 'year'], [('symbol', '==', 'SPY')]), + # (['VIX*', 'SPY', 'VIX'], 2 * 252, 'hive', 2, + # ['symbol', 'year'], [('symbol', '==', 'SPY')]), + # (['QQQ*', 'SPY', 'VIX'], 2 * 252, 'hive', 2, + # ['symbol', 'year'], [('symbol', '==', 'SPY')]), + # (['QQQ!', 'SPY', 'VIX'], 2 * 252, 'hive', 2, + # ['symbol', 'year'], [('symbol', '==', 'SPY')]), + # (['Q%QQ', 'SPY', 'VIX'], 2 * 252, 'hive', 2, + # ['symbol', 'year'], [('symbol', '==', 'SPY')]), + # (['NOW', 'SPY', 'VIX'], 10, 'hive', 2, + # ['symbol', 'dtTrade'], [('symbol', '==', 'SPY')]), (['NOW', 'SPY', 'VIX'], 10, 'hive', 2, ['symbol', 'dtTrade'], [('dtTrade', '==', '2005-01-02 00:00:00')]), + (['NOW', 'SPY', 'VIX'], 10, 'hive', 2, + ['symbol', 'dtTrade'], + [('dtTrade', '==', + pd.to_datetime('2005-01-02 00:00:00'))]), ] ) def test_frame_write_read_verify(tempdir, input_symbols, input_days, @@ -88,15 +92,9 @@ def test_frame_write_read_verify(tempdir, input_symbols, input_days, # Filter Input Frame to Match What Should Be Expected from parquet read # Handle either string or non-string inputs / works for timestamps - filterStrings = [] + filtered_input_df = input_df for name, operator, value in filters: - if isinstance(value, str): - value = "'{}'".format(value) - else: - value = value.__repr__() - filterStrings.append("{} {} {}".format(name, operator, value)) - filters_expression = " and ".join(filterStrings) - filtered_input_df = input_df.query(filters_expression) + filtered_input_df = filtered_input_df[filtered_input_df[name] == value] # Check to Ensure Columns Match for col in filtered_output_df.columns: diff --git a/fastparquet/writer.py b/fastparquet/writer.py index b023eb2e..a30d40db 100644 --- a/fastparquet/writer.py +++ b/fastparquet/writer.py @@ -181,7 +181,7 @@ def find_type(data, fixed_text=None, object_encoding=None, times='int64', "LogicalType", TIMESTAMP=ThriftObject.from_fields( "TimestampType", - isAdjustedToUTC=True, + isAdjustedToUTC=tz, unit=ThriftObject.from_fields("TimeUnit", MICROS={}) ) ) @@ -195,7 +195,7 @@ def find_type(data, fixed_text=None, object_encoding=None, times='int64', "LogicalType", TIMESTAMP=ThriftObject.from_fields( "TimestampType", - isAdjustedToUTC=True, + isAdjustedToUTC=tz, unit=ThriftObject.from_fields("TimeUnit", MILLIS={}) ) ) @@ -214,7 +214,7 @@ def find_type(data, fixed_text=None, object_encoding=None, times='int64', elif dtype.kind == "m": type, converted_type, width = (parquet_thrift.Type.INT64, parquet_thrift.ConvertedType.TIME_MICROS, None) - elif "string" in str(dtype): + elif "str" in str(dtype): type, converted_type, width = (parquet_thrift.Type.BYTE_ARRAY, parquet_thrift.ConvertedType.UTF8, None) @@ -283,7 +283,7 @@ def convert(data, se): raise ValueError('Error converting column "%s" to bytes using ' 'encoding %s. Original error: ' '%s' % (data.name, ct, e)) - elif str(dtype) == "string": + elif "str" in str(dtype): try: if converted_type == parquet_thrift.ConvertedType.UTF8: # TODO: into bytes in one step @@ -315,12 +315,30 @@ def convert(data, se): out['ns'] = ns out['day'] = day elif dtype.kind == "M": - out = data.values.view("int64") + part = str(dtype).split("[")[1][:-1].split(",")[0] + if converted_type: + factor = time_factors[(converted_type, part)] + else: + unit = [k for k, v in se.logicalType.TIMESTAMP.unit._asdict().items() if v is not None][0] + factor = time_factors[(unit, part)] + try: + out = data.values.view("int64") * factor + except KeyError: + breakpoint() else: raise ValueError("Don't know how to convert data type: %s" % dtype) return out +time_factors = { + ("NANOS", "ns"): 1, + (parquet_thrift.ConvertedType.TIMESTAMP_MICROS, "us"): 1, + (parquet_thrift.ConvertedType.TIMESTAMP_MICROS, "ns"): 1000, + (parquet_thrift.ConvertedType.TIMESTAMP_MILLIS, "ms"): 1, + (parquet_thrift.ConvertedType.TIMESTAMP_MILLIS, "s"): 1000, +} + + def infer_object_encoding(data): """Guess object type from first 10 non-na values by iteration""" if data.empty: @@ -449,7 +467,7 @@ def _rows_per_page(data, selement, has_nulls=True, page_size=None): bytes_per_element = 4 elif isinstance(data.dtype, BaseMaskedDtype) and data.dtype in pdoptional_to_numpy_typemap: bytes_per_element = np.dtype(pdoptional_to_numpy_typemap[data.dtype]).itemsize - elif data.dtype == "object" or str(data.dtype) == "string": + elif data.dtype == "object" or "str" in str(data.dtype): dd = data.iloc[:1000] d2 = dd[dd.notnull()] try: @@ -957,11 +975,8 @@ def write_simple(fn, data, fmd, row_group_offsets=None, compression=None, if isinstance(data, pd.DataFrame): data = iter_dataframe(data, row_group_offsets) mode = 'rb+' if append else 'wb' - if hasattr(fn, "write"): - of = fn - else: - of = open_with(fn, mode) - with of as f: + + def write_to_file(f): if append: f.seek(-8, 2) head_size = struct.unpack('= 0.29.23", "numpy>=2.0.0rc1"] +build-backend = "setuptools.build_meta" + +[tool.setuptools_scm] +write_to = "fastparquet/_version.py" +version_scheme = "guess-next-dev" +local_scheme = "no-local-version" diff --git a/setup.cfg b/setup.cfg index 815d65f0..adfa160e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,14 +3,3 @@ test=pytest [tool:pytest] addopts = -vv - -# See the docstring in versioneer.py for instructions. Note that you must -# re-run 'versioneer.py setup' after changing this section, and commit the -# resulting files. - -[versioneer] -VCS = git -style = pep440 -versionfile_source = fastparquet/_version.py -versionfile_build = fastparquet/_version.py -tag_prefix = "" diff --git a/setup.py b/setup.py index 3fb7749c..5c0736ec 100644 --- a/setup.py +++ b/setup.py @@ -48,11 +48,7 @@ def fix_exts(sources): setup( name='fastparquet', - use_scm_version={ - 'version_scheme': 'guess-next-dev', - 'local_scheme': 'no-local-version', - 'write_to': 'fastparquet/_version.py' - }, + use_scm_version=True, description='Python support for Parquet file format', author='Martin Durant', author_email='mdurant@anaconda.com', @@ -69,6 +65,7 @@ def fix_exts(sources): 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', 'Programming Language :: Python :: Implementation :: CPython', ], packages=['fastparquet'],