From a28d9069377e3400e0b819e434402b37cd74095e Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 23 Jan 2026 10:16:33 -0500 Subject: [PATCH 1/6] Maybe fix column selection --- fsspec/caching.py | 3 +- fsspec/parquet.py | 61 ++++++++++++++++++++++---------- fsspec/tests/test_parquet.py | 68 ++++++++++++++---------------------- 3 files changed, 70 insertions(+), 62 deletions(-) diff --git a/fsspec/caching.py b/fsspec/caching.py index e91317ed8..080f0cd39 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -25,7 +25,7 @@ T = TypeVar("T") -logger = logging.getLogger("fsspec") +logger = logging.getLogger("fsspec.caching") Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes @@ -662,6 +662,7 @@ def nblocks(self, value): pass def _fetch(self, start: int | None, stop: int | None) -> bytes: + logger.debug("Known parts request %s %s", start, stop) if start is None: start = 0 if stop is None: diff --git a/fsspec/parquet.py b/fsspec/parquet.py index 0bca67fde..be85fddf2 100644 --- a/fsspec/parquet.py +++ b/fsspec/parquet.py @@ -465,6 +465,10 @@ def _parquet_byte_ranges( # Input row_groups contains row-group indices row_group_indices = row_groups row_groups = pf.row_groups + if column_set is not None: + column_set = [ + _ if isinstance(_, list) else _.split(".") for _ in column_set + ] # Loop through column chunks to add required byte ranges for r, row_group in enumerate(row_groups): @@ -475,10 +479,9 @@ def _parquet_byte_ranges( fn = self._row_group_filename(row_group, pf) for column in row_group.columns: - name = column.meta_data.path_in_schema[0] - # Skip this column if we are targeting a - # specific columns - if column_set is None or name in column_set: + name = column.meta_data.path_in_schema + # Skip this column if we are targeting specific columns + if column_set is None or _cmp(name, column_set): file_offset0 = column.meta_data.dictionary_page_offset if file_offset0 is None: file_offset0 = column.meta_data.data_page_offset @@ -550,6 +553,10 @@ def _parquet_byte_ranges( if not isinstance(ind, dict) ] column_set |= set(md_index) + if column_set is not None: + column_set = [ + _ if isinstance(_, list) else _.split(".") for _ in column_set + ] # Loop through column chunks to add required byte ranges for r in range(md.num_row_groups): @@ -559,22 +566,38 @@ def _parquet_byte_ranges( row_group = md.row_group(r) for c in range(row_group.num_columns): column = row_group.column(c) - name = column.path_in_schema - # Skip this column if we are targeting a - # specific columns - split_name = name.split(".")[0] - if ( - column_set is None - or name in column_set - or split_name in column_set - ): - file_offset0 = column.dictionary_page_offset - if file_offset0 is None: - file_offset0 = column.data_page_offset - num_bytes = column.total_compressed_size - if file_offset0 < footer_start: + name = column.path_in_schema.split(".") + # Skip this column if we are targeting specific columns + if column_set is None or _cmp(name, column_set): + meta = column.to_dict() + # Any offset could be the first one + file_offset0 = min( + _ + for _ in [ + meta.get("dictionary_page_offset"), + meta.get("data_page_offset"), + meta.get("index_page_offset"), + ] + if _ is not None + ) + if footer_start is None or file_offset0 < footer_start: data_starts.append(file_offset0) data_ends.append( - min(file_offset0 + num_bytes, footer_start) + min( + meta["total_compressed_size"] + file_offset0, + footer_start + or (meta["total_compressed_size"] + file_offset0), + ) ) + data_starts.append(footer_start) + data_ends.append(footer_start + len(footer)) return data_starts, data_ends + + +def _cmp(name, column_set): + lname = len(name) + return any( + (len(_) >= lname and all(a == b for a, b in zip(name, _))) + or (len(_) < lname and all(a == b for a, b in zip(_, name))) + for _ in column_set + ) diff --git a/fsspec/tests/test_parquet.py b/fsspec/tests/test_parquet.py index a28f54a3b..d6772903f 100644 --- a/fsspec/tests/test_parquet.py +++ b/fsspec/tests/test_parquet.py @@ -1,7 +1,10 @@ import os +import random import pytest +import fsspec.utils + try: import fastparquet except ImportError: @@ -11,13 +14,13 @@ except ImportError: pq = None -from fsspec.core import url_to_fs from fsspec.parquet import ( - _get_parquet_byte_ranges, open_parquet_file, open_parquet_files, ) +pd = pytest.importorskip("pandas") + # Define `engine` fixture FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found") PYARROW_MARK = pytest.mark.skipif(not pq, reason="pyarrow not found") @@ -43,7 +46,6 @@ def test_open_parquet_file( tmpdir, engine, columns, max_gap, max_block, footer_sample_size, range_index ): # Pandas required for this test - pd = pytest.importorskip("pandas") if columns == ["z"] and engine == "fastparquet": columns = ["z.a"] # fastparquet is more specific @@ -52,9 +54,9 @@ def test_open_parquet_file( nrows = 40 df = pd.DataFrame( { - "x": [i * 7 % 5 for i in range(nrows)], "y": [[0, i] for i in range(nrows)], # list "z": [{"a": i, "b": "cat"} for i in range(nrows)], # struct + "x": [i * 7 % 5 for i in range(nrows)], }, index=pd.Index([10 * i for i in range(nrows)], name="myindex"), ) @@ -66,40 +68,6 @@ def test_open_parquet_file( # "Traditional read" (without `open_parquet_file`) expect = pd.read_parquet(path, columns=columns, engine=engine) - # Use `_get_parquet_byte_ranges` to re-write a - # place-holder file with all bytes NOT required - # to read `columns` set to b"0". The purpose of - # this step is to make sure the read will fail - # if the correct bytes have not been accurately - # selected by `_get_parquet_byte_ranges`. If this - # test were reading from remote storage, we would - # not need this logic to capture errors. - fs = url_to_fs(path)[0] - data = _get_parquet_byte_ranges( - [path], - fs, - columns=columns, - engine=engine, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, - )[path] - file_size = fs.size(path) - with open(path, "wb") as f: - f.write(b"0" * file_size) - - if footer_sample_size == 8 and columns is not None: - # We know 8 bytes is too small to include - # the footer metadata, so there should NOT - # be a key for the last 8 bytes of the file - bad_key = (file_size - 8, file_size) - assert bad_key not in data - - for (start, stop), byte_data in data.items(): - f.seek(start) - f.write(byte_data) - - # Read back the modified file with `open_parquet_file` with open_parquet_file( path, columns=columns, @@ -151,10 +119,9 @@ def test_open_parquet_file( ) +@pytest.mark.filterwarnings("ignore:.*Not enough data.*") @FASTPARQUET_MARK def test_with_filter(tmpdir): - import pandas as pd - df = pd.DataFrame( { "a": [10, 1, 2, 3, 7, 8, 9], @@ -180,10 +147,9 @@ def test_with_filter(tmpdir): pd.testing.assert_frame_equal(expect, result) +@pytest.mark.filterwarnings("ignore:.*Not enough data.*") @FASTPARQUET_MARK def test_multiple(tmpdir): - import pandas as pd - df = pd.DataFrame( { "a": [10, 1, 2, 3, 7, 8, 9], @@ -238,3 +204,21 @@ def test_multiple(tmpdir): dfs = [pd.read_parquet(f, engine="fastparquet", columns=["a"]) for f in ofs] result = pd.concat(dfs).reset_index(drop=True) assert expect.equals(result) + + +@pytest.mark.parametrize("n", [1_000_000]) +def test_nested(n, tmpdir, engine): + fsspec.utils.setup_logging(logger_name="fsspec.caching") + path = os.path.join(str(tmpdir), "test.parquet") + pa = pytest.importorskip("pyarrow") + flat = pa.array([random.random() for _ in range(n)]) + a = random.random() + b = random.random() + nested = pa.array([{"a": a, "b": b} for _ in range(n)]) + table = pa.table({"flat": flat, "nested": nested}) + pq.write_table(table, path) + with open_parquet_file(path, columns=["nested.a"], engine=engine) as fh: + print(list(fh.cache.data), os.path.getsize(path)) + col = pd.read_parquet(fh, engine=engine, columns=["nested.a"]) + name = "a" if engine == "pyarrow" else "nested.a" + assert (col[name] == a).all() From b9fc3b74ca70693487300db90759bb40f5df2eb1 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 23 Jan 2026 10:35:23 -0500 Subject: [PATCH 2/6] simplify --- fsspec/parquet.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/fsspec/parquet.py b/fsspec/parquet.py index be85fddf2..061e86681 100644 --- a/fsspec/parquet.py +++ b/fsspec/parquet.py @@ -595,9 +595,4 @@ def _parquet_byte_ranges( def _cmp(name, column_set): - lname = len(name) - return any( - (len(_) >= lname and all(a == b for a, b in zip(name, _))) - or (len(_) < lname and all(a == b for a, b in zip(_, name))) - for _ in column_set - ) + return any(all(a == b for a, b in zip(name, _)) for _ in column_set) From 4a2ad9fbb2ba30609eeb0cb2595d2fc55a637818 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 23 Jan 2026 10:38:56 -0500 Subject: [PATCH 3/6] more test --- fsspec/tests/test_parquet.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fsspec/tests/test_parquet.py b/fsspec/tests/test_parquet.py index d6772903f..8698cbd53 100644 --- a/fsspec/tests/test_parquet.py +++ b/fsspec/tests/test_parquet.py @@ -3,8 +3,6 @@ import pytest -import fsspec.utils - try: import fastparquet except ImportError: @@ -206,9 +204,8 @@ def test_multiple(tmpdir): assert expect.equals(result) -@pytest.mark.parametrize("n", [1_000_000]) +@pytest.mark.parametrize("n", [100, 10_000, 1_000_000]) def test_nested(n, tmpdir, engine): - fsspec.utils.setup_logging(logger_name="fsspec.caching") path = os.path.join(str(tmpdir), "test.parquet") pa = pytest.importorskip("pyarrow") flat = pa.array([random.random() for _ in range(n)]) From bd34c977653e135897448c3e3428a0d79f740c5b Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 23 Jan 2026 10:39:15 -0500 Subject: [PATCH 4/6] remove debug --- fsspec/tests/test_parquet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fsspec/tests/test_parquet.py b/fsspec/tests/test_parquet.py index 8698cbd53..f08aa177d 100644 --- a/fsspec/tests/test_parquet.py +++ b/fsspec/tests/test_parquet.py @@ -215,7 +215,6 @@ def test_nested(n, tmpdir, engine): table = pa.table({"flat": flat, "nested": nested}) pq.write_table(table, path) with open_parquet_file(path, columns=["nested.a"], engine=engine) as fh: - print(list(fh.cache.data), os.path.getsize(path)) col = pd.read_parquet(fh, engine=engine, columns=["nested.a"]) name = "a" if engine == "pyarrow" else "nested.a" assert (col[name] == a).all() From 8ba3aae512bac48d036cd8c6a7c6be71e3aa2158 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 23 Jan 2026 10:41:20 -0500 Subject: [PATCH 5/6] Avoid pandas 3 (for now) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 0bf377a97..395289e01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,7 @@ test_full = [ 'notebook', 'numpy', 'ocifs', - 'pandas', + 'pandas <3.0.0', 'panel', 'paramiko', 'pyarrow', From e249e04b8659bb7d830915df5b5858aa07b70a1f Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sun, 25 Jan 2026 15:58:19 -0500 Subject: [PATCH 6/6] fastparquet skip pandas 3 --- fsspec/tests/test_parquet.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fsspec/tests/test_parquet.py b/fsspec/tests/test_parquet.py index f08aa177d..6b62edfe5 100644 --- a/fsspec/tests/test_parquet.py +++ b/fsspec/tests/test_parquet.py @@ -18,9 +18,12 @@ ) pd = pytest.importorskip("pandas") +pd_gt_3 = pd.__version__ > "3" # Define `engine` fixture -FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found") +FASTPARQUET_MARK = pytest.mark.skipif( + pd_gt_3 or not fastparquet, reason="fastparquet not found" +) PYARROW_MARK = pytest.mark.skipif(not pq, reason="pyarrow not found")