From 65b22dab5f273945e5efa806476bc7b6372331ef Mon Sep 17 00:00:00 2001 From: nightcityblade Date: Wed, 10 Jun 2026 23:11:58 +0800 Subject: [PATCH 1/2] test: cover remote pairwise file paths Signed-off-by: nightcityblade --- .../semantic/test_pairwise_io.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/stages/deduplication/semantic/test_pairwise_io.py b/tests/stages/deduplication/semantic/test_pairwise_io.py index e8d16b1d4d..c1c3fd6472 100644 --- a/tests/stages/deduplication/semantic/test_pairwise_io.py +++ b/tests/stages/deduplication/semantic/test_pairwise_io.py @@ -110,3 +110,32 @@ def test_process_finds_all_centroid_files(self, tmp_path: Path): assert result[2].task_id == "pairwise_centroid_2" assert result[2]._metadata == {"centroid_id": 2, "filetype": "parquet"} assert result[2].data == [str(centroid_2_dir / "file4.parquet"), str(centroid_2_dir / "file5.parquet")] + + def test_process_restores_protocol_for_remote_listings(self): + """Remote fsspec listings may strip protocols; tasks should keep full URLs.""" + + class FakeRemoteFs: + def unstrip_protocol(self, path: str) -> str: + return path if path.startswith("gs://") else f"gs://{path}" + + def ls(self, _path: str) -> list[str]: + return ["bucket/kmeans/centroid=7"] + + def expand_path(self, path: str, _recursive: bool = False) -> list[str]: + return [path] + + def isdir(self, _path: str) -> bool: + return True + + def find(self, _path: str, _maxdepth: int | None, _withdirs: bool, _detail: bool) -> list[str]: + return ["bucket/kmeans/centroid=7/part.0.parquet"] + + stage = ClusterWiseFilePartitioningStage("gs://bucket/kmeans") + stage.fs = FakeRemoteFs() + stage.path_normalizer = stage.fs.unstrip_protocol + + empty_task = _EmptyTask(task_id="test", dataset_name="test", data=None) + result = stage.process(empty_task) + + assert len(result) == 1 + assert result[0].data == ["gs://bucket/kmeans/centroid=7/part.0.parquet"] From 4cbaa5f41aee95ad529f2b717bfccc31cc5f6b8a Mon Sep 17 00:00:00 2001 From: nightcityblade Date: Thu, 11 Jun 2026 11:05:00 +0800 Subject: [PATCH 2/2] test: align fake remote fs kwargs --- tests/stages/deduplication/semantic/test_pairwise_io.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/stages/deduplication/semantic/test_pairwise_io.py b/tests/stages/deduplication/semantic/test_pairwise_io.py index c1c3fd6472..689463c76c 100644 --- a/tests/stages/deduplication/semantic/test_pairwise_io.py +++ b/tests/stages/deduplication/semantic/test_pairwise_io.py @@ -121,13 +121,18 @@ def unstrip_protocol(self, path: str) -> str: def ls(self, _path: str) -> list[str]: return ["bucket/kmeans/centroid=7"] - def expand_path(self, path: str, _recursive: bool = False) -> list[str]: + def expand_path(self, path: str, recursive: bool = False) -> list[str]: + assert recursive is False return [path] def isdir(self, _path: str) -> bool: return True - def find(self, _path: str, _maxdepth: int | None, _withdirs: bool, _detail: bool) -> list[str]: + def find(self, path: str, maxdepth: int | None, withdirs: bool, detail: bool) -> list[str]: + assert path == "gs://bucket/kmeans/centroid=7" + assert maxdepth == 1 + assert withdirs is False + assert detail is False return ["bucket/kmeans/centroid=7/part.0.parquet"] stage = ClusterWiseFilePartitioningStage("gs://bucket/kmeans")