diff --git a/fastparquet/api.py b/fastparquet/api.py index 3f6f7815..df43bd30 100644 --- a/fastparquet/api.py +++ b/fastparquet/api.py @@ -196,6 +196,7 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False, "a filesystem compatible with fsspec") from e self.open = open_with self._statistics = None + self.global_cats = {} def _parse_header(self, f, verify=True): if self.fn and self.fn.endswith("_metadata"): @@ -318,7 +319,8 @@ def __getitem__(self, item): new_pf.__setstate__( {"fn": self.fn, "open": self.open, "fmd": fmd, "pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype, - "tz": self.tz, "_columns_dtype": self._columns_dtype} + "tz": self.tz, "_columns_dtype": self._columns_dtype, + "global_cats": {}} # fresh empty dict for the slice ) new_pf._set_attrs() return new_pf @@ -389,7 +391,7 @@ def read_row_group_file(self, rg, columns, categories, index=None, f, rg, columns, categories, self.schema, self.cats, selfmade=self.selfmade, index=index, assign=assign, scheme=self.file_scheme, partition_meta=partition_meta, - row_filter=row_filter + row_filter=row_filter, global_cats=self.global_cats ) if ret: return df @@ -1011,7 +1013,7 @@ def __getstate__(self): self.fmd.row_groups = [] return {"fn": self.fn, "open": self.open, "fmd": self.fmd, "pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype, - "tz": self.tz} + "tz": self.tz, "global_cats": self.global_cats} def __setstate__(self, state): self.__dict__.update(state) diff --git a/fastparquet/core.py b/fastparquet/core.py index 1dee1318..c9bf4800 100644 --- a/fastparquet/core.py +++ b/fastparquet/core.py @@ -200,7 +200,7 @@ def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata, def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, dic, assign, num, use_cat, file_offset, ph, idx=None, - selfmade=False, row_filter=None): + selfmade=False, row_filter=None, remap_array=None): """ :param infile: open file :param schema_helper: @@ -211,6 +211,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, :param assign: output array (all of it) :param num: offset, rows so far :param use_cat: output is categorical? + :param remap_array: array for remapping categorical indices :return: None test data "/Users/mdurant/Downloads/datapage_v2.snappy.parquet" @@ -338,6 +339,9 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, if bit_width in [8, 16, 32] and selfmade: # special fastpath for cats outbytes = raw_bytes[pagefile.tell():] + if remap_array is not None: + # Apply remapping to outbytes. + outbytes = remap_array[outbytes] if len(outbytes) == assign[num:num+data_header2.num_values].nbytes: assign[num:num+data_header2.num_values].view('uint8')[row_filter] = outbytes[row_filter] else: @@ -358,6 +362,9 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, encoding.NumpyIO(assign[num:num+data_header2.num_values].view('uint8')), itemsize=bit_width ) + if remap_array is not None: + # Apply remapping after reading + assign[num:num+data_header2.num_values] = remap_array[assign[num:num+data_header2.num_values]] else: temp = np.empty(data_header2.num_values, assign.dtype) encoding.read_rle_bit_packed_hybrid( @@ -367,6 +374,8 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, encoding.NumpyIO(temp.view('uint8')), itemsize=bit_width ) + if remap_array is not None: + temp = remap_array[temp] if not nullable: assign[num:num+data_header2.num_values][nulls[row_filter]] = None assign[num:num+data_header2.num_values][~nulls[row_filter]] = temp[row_filter] @@ -429,7 +438,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, def read_col(column, schema_helper, infile, use_cat=False, selfmade=False, assign=None, catdef=None, - row_filter=None): + row_filter=None, global_cats=None): """Using the given metadata, read one column in one row-group. Parameters @@ -443,10 +452,20 @@ def read_col(column, schema_helper, infile, use_cat=False, use_cat: bool (False) If this column is encoded throughout with dict encoding, give back a pandas categorical column; otherwise, decode to values + selfmade: bool (False) + If data created by fastparquet + assign: numpy array + Where to store the result + catdef: pandas.Categorical or CategoricalDtype + If reading a categorical column, the categorical definition (categories and + ordering). row_filter: bool array or None if given, selects which of the values read are to be written into the output. Effectively implies NULLs, even for a required column. + global_cats: dict or None + Optional dictionary for storing global categorical values across row groups. + Format: {col_path: array} """ cmd = column.meta_data try: @@ -480,6 +499,15 @@ def read_col(column, schema_helper, infile, use_cat=False, row_idx = [0] # map/list objects dic = None index_off = 0 # how far through row_filter we are + + # Initialize tracking variables for categorical dictionaries + # Only set up global dictionary tracking if using categorical and global_cats is provided + remap_dict = {} # Dictionary for collecting mappings + if use_cat and global_cats is not None: + path_str = ".".join(cmd.path_in_schema) + # Register this column in global_cats if not already present + if path_str not in global_cats: + global_cats[path_str] = None while num < rows: off = infile.tell() @@ -497,7 +525,44 @@ def read_col(column, schema_helper, infile, use_cat=False, ddt = [kv.value.decode() for kv in (cmd.key_value_metadata or []) if kv.key == b"label_dtype"] ddt = ddt[0] if ddt else None - catdef._set_categories(pd.Index(dic, dtype=ddt), fastpath=True) + + if global_cats is not None: + # Check if categorical values are consistent with global dictionary. + if global_cats[path_str] is None: + # This is the first dictionary for this column, save it as global + global_cats[path_str] = dic + else: + # Dictionary already defined for this column, check for inconsistency. + global_dict = global_cats[path_str] + new_values = [] + # Build remap_dict in a single comprehension, + # appending new values to new_values at the same time: + # - Use walrus operator (:=) to store found_idx from global_dict lookup. + # - When found_idx is -1, append val to new_values and use its new position. + # - Only include indices that need remapping (found_idx != i). + remap_dict = {i: (len(global_dict) + len(new_values) - 1) + if found_idx == -1 and not new_values.append(val) else found_idx + for (i,), val in np.ndenumerate(dic) + if (found_idx := next((j + for (j,), gval in np.ndenumerate(global_dict) + if val == gval), -1)) != i + } + if remap_dict: + # If any remapping is needed, create a complete remap array. + # Initialize with identity mapping (no change) + remap_array = np.arange(len(dic), dtype=np.int32) + # Update indices that need remapping + remap_array[list(remap_dict)] = list(remap_dict.values()) + if new_values: + # Add new values to global dictionary + global_cats[path_str] = np.append(global_dict, new_values) + # Update categories + catdef._set_categories(pd.Index(global_cats[path_str], dtype=ddt), fastpath=True) + + # Normal case - always set categories for this dictionary + if global_cats is None or not remap_dict: + catdef._set_categories(pd.Index(dic, dtype=ddt), fastpath=True) + if np.iinfo(assign.dtype).max < len(dic): raise RuntimeError('Assigned array dtype (%s) cannot accommodate ' 'number of category labels (%i)' % @@ -509,7 +574,7 @@ def read_col(column, schema_helper, infile, use_cat=False, if ph.type == parquet_thrift.PageType.DATA_PAGE_V2: num += read_data_page_v2(infile, schema_helper, se, ph.data_page_header_v2, cmd, dic, assign, num, use_cat, off, ph, row_idx, selfmade=selfmade, - row_filter=row_filter) + row_filter=row_filter, remap_array=remap_array if remap_dict else None) continue if (selfmade and hasattr(cmd, 'statistics') and getattr(cmd.statistics, 'null_count', 1) == 0): @@ -563,6 +628,9 @@ def read_col(column, schema_helper, infile, use_cat=False, part[defi == max_defi] = dic[val] elif not use_cat: part[defi == max_defi] = convert(val, se, dtype=assign.dtype) + elif remap_dict: + # Apply remapping of categorical codes + part[defi == max_defi] = remap_array[val] else: part[defi == max_defi] = val else: @@ -582,6 +650,9 @@ def read_col(column, schema_helper, infile, use_cat=False, piece[:] = dic[val] elif not use_cat: piece[:] = convert(val, se, dtype=assign.dtype) + elif remap_dict: + # Apply remapping of categorical codes + piece[:] = remap_array[val] else: piece[:] = val @@ -589,7 +660,8 @@ def read_col(column, schema_helper, infile, use_cat=False, def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, - selfmade=False, assign=None, row_filter=False): + selfmade=False, assign=None, row_filter=False, + global_cats=None): """ Read a row group and return as a dict of arrays @@ -615,7 +687,7 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, read_col(column, schema_helper, file, use_cat=name+'-catdef' in out, selfmade=selfmade, assign=out[name], catdef=out.get(name+'-catdef', None), - row_filter=row_filter) + row_filter=row_filter, global_cats=global_cats) if _is_map_like(schema_helper, column): # TODO: could be done in fast loop in _assemble_objects? @@ -634,7 +706,8 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, def read_row_group(file, rg, columns, categories, schema_helper, cats, selfmade=False, index=None, assign=None, - scheme='hive', partition_meta=None, row_filter=False): + scheme='hive', partition_meta=None, row_filter=False, + global_cats=None): """ Access row-group in a file and read some columns into a data-frame. """ @@ -642,7 +715,8 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats, if assign is None: raise RuntimeError('Going with pre-allocation!') read_row_group_arrays(file, rg, columns, categories, schema_helper, - cats, selfmade, assign=assign, row_filter=row_filter) + cats, selfmade, assign=assign, row_filter=row_filter, + global_cats=global_cats) for cat in cats: if cat not in assign: diff --git a/fastparquet/test/test_output.py b/fastparquet/test/test_output.py index aa045043..283a0ca2 100644 --- a/fastparquet/test/test_output.py +++ b/fastparquet/test/test_output.py @@ -1214,3 +1214,91 @@ def test_attrs_roundtrip(tempdir): df.to_parquet(path=fn, engine="fastparquet") df2 = pd.read_parquet(fn, engine="fastparquet") assert df2.attrs == attrs + + +def test_append_different_categorical_simple(tempdir): + """Test for issue #949: wrong categories data when appending with categorical columns""" + fn = os.path.join(str(tempdir), 'test.parquet') + # First DataFrame with a categorical column + df1 = pd.DataFrame({ + "col1": [1, 4, 7], + "col2": [2, 5, 8] + }) + df1["col2"] = df1["col2"].astype("category") + write(fn, df1, write_index=False, file_scheme='simple') + # Second DataFrame to append + df2 = pd.DataFrame({ + "col1": [4, 7, 10], + "col2": [5, 8, 11] + }) + df2["col2"] = df2["col2"].astype("category") + write(fn, df2, append=True, write_index=False, file_scheme='simple') + # Read back again - this should maintain correct categorical values + df_combined = pd.read_parquet(fn, engine="fastparquet") + # Expected result when concatenating the two dataframes + expected = pd.concat([df1, df2], ignore_index=True) + expected["col2"] = expected["col2"].astype("category") + assert_frame_equal(df_combined, expected) + + +def test_append_different_categorical_multi(tempdir): + """Test for issue #949: wrong categories data when appending with categorical columns""" + # Testing ParquetFile slicing as well. + # Set random seed for reproducibility + np.random.seed(42) + # Create initial DataFrame with categorical columns + def create_test_df(start_idx, rows, cats1, cats2): + cat1 = [cats1[i % len(cats1)] for i in range(rows)] + cat2 = [cats2[i % len(cats2)] for i in range(rows)] + df = pd.DataFrame({ + 'cat_col1': cat1, + 'cat_col2': cat2, + 'value': np.random.rand(rows) + }) + df['cat_col1'] = df['cat_col1'].astype('category') + df['cat_col2'] = df['cat_col2'].astype('category') + return df + # Initial categories + cats1 = ['A', 'B', 'C'] + cats2 = [10, 20, 30] + # First dataframe + fn = os.path.join(str(tempdir), 'test_parquet') + df1 = create_test_df(0, 5, cats1, cats2) + write(fn, df1, file_scheme='hive', write_index=False) + # New categories for second dataframe (overlapping + new values) + cats1_2 = ['B', 'C', 'D'] # B,C overlap with first df, D is new + cats2_2 = [30, 40, 50] + # Create second dataframe + df2 = create_test_df(len(df1), 6, cats1_2, cats2_2) + # Append second dataframe + write(fn, df2, file_scheme='hive', append=True, write_index=False) + # New categories for third dataframe (different ordering + new values) + cats1_3 = ['E', 'C', 'A'] # A,C from first, E is new + cats2_3 = [60, 70, 50] # Mixed order + # Create third dataframe + df3 = create_test_df(len(df1)+len(df2), 7, cats1_3, cats2_3) + # Append third dataframe + write(fn, df3, file_scheme='hive', append=True, write_index=False) + # Combine all original dataframes for comparison + expected_df = pd.concat([df1, df2, df3], axis=0, ignore_index=True) + expected_df['cat_col1'] = expected_df['cat_col1'].astype('category') + expected_df['cat_col2'] = expected_df['cat_col2'].astype('category') + pf = ParquetFile(fn) + actual_df = pf.to_pandas() + # Assert that the dataframes are equal + assert_frame_equal(expected_df, actual_df) + # Test slicing. + actual_df_subset = pf[1:].to_pandas() + expected_df_subset = pd.concat([df2, df3], axis=0, ignore_index=True) + expected_df_subset['cat_col1'] = expected_df_subset['cat_col1'].astype('category') + expected_df_subset['cat_col2'] = expected_df_subset['cat_col2'].astype('category') + try: + # Code to manage new categorical values in fastparquet does not reorder them. + # Code in pandas concat seems to do so. + actual_df_subset['cat_col1'] = actual_df_subset['cat_col1'].cat.reorder_categories( + expected_df_subset['cat_col1'].cat.categories + ) + except ValueError: + raise AssertionError("failed to reorder categories") + assert_frame_equal(expected_df_subset, actual_df_subset) +