From efc4cbe8d955c5ae04a2897cc7bcaff4cc720973 Mon Sep 17 00:00:00 2001 From: nightcityblade Date: Tue, 16 Jun 2026 11:15:06 +0800 Subject: [PATCH 1/4] Add option to drop deduplication id field Signed-off-by: nightcityblade --- .../stages/text/deduplication/removal.py | 4 ++++ .../text/deduplication/removal_workflow.py | 2 ++ .../stages/text/deduplication/semantic.py | 1 + .../deduplication/test_removal_workflow.py | 24 +++++++++++++++++++ 4 files changed, 31 insertions(+) diff --git a/nemo_curator/stages/text/deduplication/removal.py b/nemo_curator/stages/text/deduplication/removal.py index f177d306ab..19be1f69e3 100644 --- a/nemo_curator/stages/text/deduplication/removal.py +++ b/nemo_curator/stages/text/deduplication/removal.py @@ -43,6 +43,7 @@ class TextDuplicatesRemovalStage(ProcessingStage[DocumentBatch, DocumentBatch]): id_field: Field to use for deduplication within the input dataframe. Defaults to CURATOR_DEDUP_ID_STR. duplicate_id_field: Field to use for deduplication within the removal dataframe. Defaults to "id". read_kwargs: Additional arguments for reading parquet files + drop_id_field: Whether to drop the deduplication ID field from the output batch. """ ids_to_remove_path: str @@ -51,6 +52,7 @@ class TextDuplicatesRemovalStage(ProcessingStage[DocumentBatch, DocumentBatch]): # Optional parameters read_kwargs: dict[str, Any] | None = None + drop_id_field: bool = False def __post_init__(self): """Initialize parent class after dataclass initialization.""" @@ -84,6 +86,8 @@ def process(self, task: DocumentBatch) -> DocumentBatch: time_to_remove_t0 = time.perf_counter() removal_ids = set(removal_df[self.duplicate_id_field].tolist()) df = df[~df[self.id_field].isin(removal_ids)] + if self.drop_id_field: + df = df.drop(columns=[self.id_field]) removal_ids_time = time.perf_counter() - time_to_remove_t0 self._log_metrics( { diff --git a/nemo_curator/stages/text/deduplication/removal_workflow.py b/nemo_curator/stages/text/deduplication/removal_workflow.py index 078b2014d3..58a82d93f1 100644 --- a/nemo_curator/stages/text/deduplication/removal_workflow.py +++ b/nemo_curator/stages/text/deduplication/removal_workflow.py @@ -61,6 +61,7 @@ class TextDuplicatesRemovalWorkflow(WorkflowBase): output_kwargs: dict[str, Any] | None = None output_fields: list[str] | None = None output_mode: Literal["ignore", "overwrite", "append", "error"] | None = None + drop_id_field: bool = False def __post_init__(self): """Initialize parent class after dataclass initialization.""" @@ -120,6 +121,7 @@ def _generate_stages(self, initial_tasks: list[FileGroupTask] | None = None) -> id_field=self.id_field, duplicate_id_field=self.duplicate_id_field, read_kwargs=self.duplicate_id_read_kwargs, + drop_id_field=self.drop_id_field, ) ) diff --git a/nemo_curator/stages/text/deduplication/semantic.py b/nemo_curator/stages/text/deduplication/semantic.py index a50cfd160e..2738803739 100644 --- a/nemo_curator/stages/text/deduplication/semantic.py +++ b/nemo_curator/stages/text/deduplication/semantic.py @@ -369,6 +369,7 @@ def _run_duplicate_removal(self, executor: BaseExecutor) -> WorkflowRunResult | output_kwargs=self.write_kwargs, output_fields=self.output_fields, output_mode="ignore", + drop_id_field=self.use_id_generator and self.output_fields is None, ) return workflow.run(executor=executor) diff --git a/tests/stages/text/deduplication/test_removal_workflow.py b/tests/stages/text/deduplication/test_removal_workflow.py index 5682931675..a8172dc07f 100644 --- a/tests/stages/text/deduplication/test_removal_workflow.py +++ b/tests/stages/text/deduplication/test_removal_workflow.py @@ -281,6 +281,27 @@ def test_initial_tasks_partitioning(self, test_config: "TestTextDuplicateRemoval assert workflow_output.get_metadata("num_duplicates_removed") == expected_removed + +def test_removal_stage_can_drop_id_field(tmp_path: Path): + ids_to_remove_path = tmp_path / "ids_to_remove.parquet" + pd.DataFrame({"id": [1]}).to_parquet(ids_to_remove_path, index=False) + task = DocumentBatch( + task_id="task", + dataset_name="dataset", + data=pd.DataFrame({CURATOR_DEDUP_ID_STR: [1, 2], "text": ["drop", "keep"]}), + ) + + stage = TextDuplicatesRemovalStage( + ids_to_remove_path=str(ids_to_remove_path), + id_field=CURATOR_DEDUP_ID_STR, + drop_id_field=True, + ) + + result = stage.process(task).to_pandas() + + assert result.to_dict(orient="list") == {"text": ["keep"]} + + class TestTextDuplicatesRemovalWorkflowGenerateStages: def test_invalid_filetypes(self): read_invalid_file_type_workflow = TextDuplicatesRemovalWorkflow( @@ -340,6 +361,7 @@ def test_reader_stage(self, input_filetype: str, id_generator_path: str | None): assert stages[2].id_field == CURATOR_DEDUP_ID_STR assert stages[2].duplicate_id_field == "id" assert stages[2].read_kwargs == {} + assert not stages[2].drop_id_field # test for writer stage (stages[3]) - default output_filetype is parquet assert isinstance(stages[3], ParquetWriter) @@ -352,6 +374,7 @@ def test_writer_stage(self, output_filetype: str): output_path="output_path", output_filetype=output_filetype, id_generator_path=None, + drop_id_field=True, ) stages = workflow._generate_stages(initial_tasks=None) assert len(stages) == 4 @@ -359,6 +382,7 @@ def test_writer_stage(self, output_filetype: str): # reader stage assert isinstance(stages[1], ParquetReaderStage) # Default input_filetype is parquet assert isinstance(stages[2], TextDuplicatesRemovalStage) + assert stages[2].drop_id_field expected_write_stage = ParquetWriter if output_filetype == "parquet" else JsonlWriter assert isinstance(stages[3], expected_write_stage) From 2dbe71d376c9b0648b8164ca86daa739d1cfa524 Mon Sep 17 00:00:00 2001 From: nightcityblade Date: Wed, 17 Jun 2026 23:08:46 +0800 Subject: [PATCH 2/4] Validate dropped id output field conflict --- .../stages/text/deduplication/removal_workflow.py | 3 +++ .../text/deduplication/test_removal_workflow.py | 11 +++++++++++ 2 files changed, 14 insertions(+) diff --git a/nemo_curator/stages/text/deduplication/removal_workflow.py b/nemo_curator/stages/text/deduplication/removal_workflow.py index 58a82d93f1..09c28500eb 100644 --- a/nemo_curator/stages/text/deduplication/removal_workflow.py +++ b/nemo_curator/stages/text/deduplication/removal_workflow.py @@ -69,6 +69,9 @@ def __post_init__(self): logger.warning( f"Using {CURATOR_DEDUP_ID_STR} as id_field for removal stage, even though we are not using id generator." ) + if self.drop_id_field and self.output_fields and self.id_field in self.output_fields: + msg = f"Cannot drop id_field {self.id_field!r} when it is included in output_fields." + raise ValueError(msg) def _generate_stages(self, initial_tasks: list[FileGroupTask] | None = None) -> list[ProcessingStage]: stages = [] diff --git a/tests/stages/text/deduplication/test_removal_workflow.py b/tests/stages/text/deduplication/test_removal_workflow.py index a8172dc07f..94f816c890 100644 --- a/tests/stages/text/deduplication/test_removal_workflow.py +++ b/tests/stages/text/deduplication/test_removal_workflow.py @@ -300,9 +300,20 @@ def test_removal_stage_can_drop_id_field(tmp_path: Path): result = stage.process(task).to_pandas() assert result.to_dict(orient="list") == {"text": ["keep"]} + assert CURATOR_DEDUP_ID_STR not in result.columns class TestTextDuplicatesRemovalWorkflowGenerateStages: + def test_drop_id_field_conflicts_with_output_fields(self): + with pytest.raises(ValueError, match="Cannot drop id_field"): + TextDuplicatesRemovalWorkflow( + input_path="input_path", + ids_to_remove_path="ids_to_remove_path", + output_path="output_path", + output_fields=["text", CURATOR_DEDUP_ID_STR], + drop_id_field=True, + ) + def test_invalid_filetypes(self): read_invalid_file_type_workflow = TextDuplicatesRemovalWorkflow( input_path="input_path", From 3ff725af3842124efe569e5a99500eb34c74e635 Mon Sep 17 00:00:00 2001 From: nightcityblade Date: Sat, 27 Jun 2026 11:05:25 +0800 Subject: [PATCH 3/4] docs: align semantic dedup tutorial output --- tutorials/text/deduplication/semantic/semantic_e2e.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/text/deduplication/semantic/semantic_e2e.ipynb b/tutorials/text/deduplication/semantic/semantic_e2e.ipynb index 258504f62c..573ecf2e2b 100644 --- a/tutorials/text/deduplication/semantic/semantic_e2e.ipynb +++ b/tutorials/text/deduplication/semantic/semantic_e2e.ipynb @@ -877,7 +877,7 @@ "\n", "We can control the schema of this by specifying the `output_fields` argument in the workflow definition.\n", "\n", - "If you had set `use_id_generator=True` then you'd see `_curator_dedup_id` here as well." + "If you had set `use_id_generator=True` with the default output schema, the deduplicated output now drops the generated `_curator_dedup_id` before writing. If you want it preserved, include it explicitly in `output_fields`." ] }, { From d124fe027aededad75f0a71c68aa7b69d6abbbc6 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Mon, 29 Jun 2026 10:02:53 -0700 Subject: [PATCH 4/4] Apply suggestion from @sarahyurick Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> --- tests/stages/text/deduplication/test_removal_workflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/stages/text/deduplication/test_removal_workflow.py b/tests/stages/text/deduplication/test_removal_workflow.py index fb07a4d91a..b034f10d13 100644 --- a/tests/stages/text/deduplication/test_removal_workflow.py +++ b/tests/stages/text/deduplication/test_removal_workflow.py @@ -286,7 +286,6 @@ def test_removal_stage_can_drop_id_field(tmp_path: Path): ids_to_remove_path = tmp_path / "ids_to_remove.parquet" pd.DataFrame({"id": [1]}).to_parquet(ids_to_remove_path, index=False) task = DocumentBatch( - task_id="task", dataset_name="dataset", data=pd.DataFrame({CURATOR_DEDUP_ID_STR: [1, 2], "text": ["drop", "keep"]}), )