Skip to content

fix: auto-detect Ray fanout stages#2025

Closed
nightcityblade wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
nightcityblade:fix/issue-1613-auto-fanout
Closed

fix: auto-detect Ray fanout stages#2025
nightcityblade wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
nightcityblade:fix/issue-1613-auto-fanout

Conversation

@nightcityblade

Copy link
Copy Markdown
Contributor

Description

Closes #1613.

Automatically marks Ray Data stages as fanout stages when their process return annotation is list[...] or a union that includes list[...]. This lets stages like URLGenerationStage rely on the base ProcessingStage default instead of manually setting is_fanout_stage.

Usage

class MyFanoutStage(ProcessingStage[InputTask, OutputTask]):
    def process(self, task: InputTask) -> list[OutputTask]:
        ...

assert MyFanoutStage().ray_stage_spec() == {"is_fanout_stage": True}

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Tests:

  • uv run --python 3.12 ruff check nemo_curator/stages/base.py nemo_curator/stages/text/download/base/url_generation.py tests/stages/common/test_base.py
  • uv run --python 3.12 --with pytest --with ray --with loguru --with pandas --with pyarrow --with fsspec python -m pytest tests/stages/common/test_base.py tests/stages/text/download/base/test_url_generation.py (blocked on macOS by NeMo-Curator's Linux-only runtime check)

@nightcityblade nightcityblade requested a review from a team as a code owner May 24, 2026 13:27
@nightcityblade nightcityblade requested review from meatybobby and removed request for a team May 24, 2026 13:27
@copy-pr-bot

copy-pr-bot Bot commented May 24, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented May 24, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR auto-detects Ray Data fanout stages by inspecting the process method's return annotation at class definition time, removing the need for manual ray_stage_spec() overrides. A NameError/TypeError fallback handles from __future__ import annotations environments by checking the raw string annotation.

  • Adds _process_returns_list() and _annotation_includes_list() classmethods to ProcessingStage; ray_stage_spec() now returns {"is_fanout_stage": True} automatically when process is annotated to return list[...].
  • Removes boilerplate ray_stage_spec() overrides from five stages (ALMManifestReaderStage, ClusterWiseFilePartitioningStage, FilePartitioningStage, URLGenerationStage, ClipTranscodingStage) and corrects ClipTranscodingStage.process to return [task] on the early-exit path.
  • Adds regression tests for the string-annotation fallback, single-task stages, union-annotated stages, and every stage whose override was removed.

Confidence Score: 5/5

Safe to merge — all five manual ray_stage_spec overrides are covered by auto-detection, the previously incorrect bare-object early return in ClipTranscodingStage is fixed, and the string-annotation fallback is regression-tested.

The auto-detection logic is narrow and conservative: it fires only for bare list[T] return annotations (not unions), which matches every removed override exactly. The ClipTranscodingStage early-exit bug is corrected and tested. The get_type_hints fallback for unresolvable string annotations is covered by a dedicated regression test. No existing behaviour is changed for stages that don't return lists.

No files require special attention.

Important Files Changed

Filename Overview
nemo_curator/stages/base.py Adds auto-detection logic via _process_returns_list / _annotation_includes_list; correctly handles resolved types, typing.List, and raw-string fallback. Logic and tests are consistent.
nemo_curator/stages/video/clipping/clip_extraction_stages.py Return annotation updated to list[VideoTask] and the early-exit path corrected from return task to return [task]; test updated to match.
nemo_curator/stages/text/download/base/url_generation.py Manual ray_stage_spec override removed; process already annotated list[FileGroupTask], so auto-detection covers it.
nemo_curator/stages/deduplication/semantic/pairwise_io.py Manual ray_stage_spec override removed; process annotated list[FileGroupTask], auto-detection applies correctly.
nemo_curator/stages/file_partitioning.py Manual ray_stage_spec override removed; process annotated list[FileGroupTask], auto-detection applies correctly.
nemo_curator/stages/audio/alm/alm_manifest_reader.py Manual ray_stage_spec override removed; process annotated list[AudioBatch], auto-detection applies correctly.
tests/stages/common/test_base.py New TestProcessingStageRaySpec class covers single-task, fanout, union, explicit-override, and string-annotation cases comprehensively.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["ray_stage_spec() called"] --> B["_process_returns_list()"]
    B --> C{"get_type_hints(cls.process)\nsucceeds?"}
    C -- Yes --> D["return_annotation = hints['return']"]
    C -- "No (NameError/TypeError)" --> E["return_annotation = cls.process.__annotations__['return']"]
    D --> F["_annotation_includes_list(annotation)"]
    E --> F
    F --> G{"isinstance(annotation, str)?"}
    G -- Yes --> H{"annotation.strip().startswith\n('list[', 'List[', 'typing.List[')"}
    G -- No --> I{"get_origin(annotation) is list?"}
    H -- True --> J["return True → {'is_fanout_stage': True}"]
    H -- False --> K["return False → {}"]
    I -- True --> J
    I -- False --> K
Loading

Reviews (5): Last reviewed commit: "fix: return clip fanout tasks consistent..." | Re-trigger Greptile

Comment thread nemo_curator/stages/base.py Outdated
origin = get_origin(annotation)
if origin is list:
return True
if origin in (UnionType, Union) or isinstance(annotation, UnionType):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The isinstance(annotation, UnionType) branch is unreachable: for any X | Y expression, get_origin(annotation) returns types.UnionType, so origin in (UnionType, Union) is already True before isinstance is evaluated. The isinstance guard can be removed without changing behaviour.

Suggested change
if origin in (UnionType, Union) or isinstance(annotation, UnionType):
if origin in (UnionType, Union):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 947e029: removed the unreachable isinstance(annotation, UnionType) branch while refining the detection logic.

Comment on lines +313 to +316
try:
return_annotation = get_type_hints(cls.process).get("return")
except (NameError, TypeError):
return_annotation = cls.process.__annotations__.get("return")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 String annotation fallback silently loses type information

When get_type_hints() raises NameError or TypeError (e.g. when from __future__ import annotations is active and a referenced type is not importable at inspection time), the fallback reads cls.process.__annotations__["return"] which is a raw string such as "list[OutputTask]". get_origin("list[OutputTask]") returns None, so _annotation_includes_list returns False and the stage is not auto-detected as a fanout stage even though it is one. The failure is silent — ray_stage_spec() simply returns {}. Stages that hit this path would need to manually override ray_stage_spec() as before.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 947e029: the fallback now handles raw string annotations like "list[OutputTask]" so list-returning stages are still detected when get_type_hints() cannot resolve the annotation. I added a regression test that forces that fallback path.

@svcnvidia-nemo-ci svcnvidia-nemo-ci added the waiting-on-maintainers Waiting on maintainers to respond label May 26, 2026

@sarahyurick sarahyurick left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nightcityblade !

@@ -77,11 +77,6 @@ def process(self, task: _EmptyTask) -> list[FileGroupTask]:
for i, url in enumerate(urls)
]

def ray_stage_spec(self) -> dict[str, Any]:
return {
"is_fanout_stage": True,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do this for all stages to make sure that this PR works for all of them? And ensure that each existing stage has a pytest to check that it is being set?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — I broadened the coverage beyond URLGenerationStage in the follow-up commit. Existing list-returning stages now have pytest coverage for the Ray fanout spec in common/text/audio/image tests (URLGenerationStage and FilePartitioning already had assertions; I added coverage for ALMManifestReaderStage, CreateInitialManifestFleursStage, AudioToDocumentStage, and ImageReaderStage). I also kept existing explicit ray_stage_spec overrides intact.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I see some test edits but let's remove the ray_stage_spec function from the actual stages and make sure the tests still pass. The ones I found:

Do all of these work with this PR? Are there any that should be considered exceptions/explicitly keep ray_stage_spec for some reason?


name = "MaybeFanoutProcessingStage"

def process(self, task: MockTask) -> MockTask | list[MockTask]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am undecided what should happen in this case. Maybe it should be up to the user?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I changed the inference to be conservative: Task | list[Task] is no longer auto-marked as a Ray fanout stage. If a maybe-fanout stage wants Ray fanout behavior, it can opt in by overriding ray_stage_spec(). I added tests for both the conservative default and explicit opt-in.

@svcnvidia-nemo-ci svcnvidia-nemo-ci added waiting-on-customer Waiting on the original author to respond and removed waiting-on-maintainers Waiting on maintainers to respond waiting-on-customer Waiting on the original author to respond labels Jun 2, 2026
@nightcityblade

nightcityblade commented Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

Thanks, I pushed a focused follow-up in ec6a75f.

What changed:

  • Removed redundant ray_stage_spec() fanout overrides from the list-returning stages that still had them on this branch: FilePartitioningStage, ALMManifestReaderStage, and ClusterWiseFilePartitioningStage.
  • Added a regression assertion for ClusterWiseFilePartitioningStage so it is covered by the base return-annotation inference too.
  • URLGenerationStage was already relying on the base inference from the previous commit, and the existing image/audio/Fleurs tests continue to cover those list-returning stages.

I intentionally did not remove the clip extraction override because that stage's process() returns a single VideoTask, not a list, so the new base inference would not mark it as fanout and removing it would change behavior. The other stage-specific ray_stage_spec() methods I left are for non-fanout Ray behavior (actor/RAFT/LSH/shuffle/max-calls), not redundant fanout inference.

Validation:

  • uv run ruff check nemo_curator/stages/file_partitioning.py nemo_curator/stages/audio/alm/alm_manifest_reader.py nemo_curator/stages/deduplication/semantic/pairwise_io.py tests/stages/deduplication/semantic/test_pairwise_io.py passed.
  • I attempted the relevant pytest set locally, but this machine is macOS and the package exits during test collection with the existing Linux-only guard; CI should be the source of truth for pytest here.

@nightcityblade nightcityblade force-pushed the fix/issue-1613-auto-fanout branch 2 times, most recently from 4e4581c to ec6a75f Compare June 5, 2026 03:10
@svcnvidia-nemo-ci svcnvidia-nemo-ci removed the waiting-on-customer Waiting on the original author to respond label Jun 5, 2026
@nightcityblade

Copy link
Copy Markdown
Contributor Author

Followed up on the fanout-inference review and pushed one more cleanup commit.

What changed:

  • removed the remaining explicit ray_stage_spec() override from ClipTranscodingStage
  • updated ClipTranscodingStage.process() to declare its real return type (list[VideoTask]), so it now participates in the same inference path as the other fanout stages in this PR

On the remaining exceptions question: after this change, the stage-specific overrides I found are the ones that still carry non-fanout Ray behavior/config (for example actor-stage / backend-specific settings), rather than plain is_fanout_stage=True duplication.

Validation:

  • python3 -m compileall nemo_curator/stages/base.py nemo_curator/stages/video/clipping/clip_extraction_stages.py

I also tried to run the targeted pytest files, but the local environment here is missing ray, so test collection stops in tests/conftest.py before those stage tests can run.

@nightcityblade

Copy link
Copy Markdown
Contributor Author

Thanks, I pushed a small follow-up in effa59a to address the Greptile finding on the clip fanout contract.\n\nWhat changed:\n- Updated the no-clips early-return path in ClipTranscodingStage.process() to return [task], matching the list[VideoTask] annotation and Ray fanout behavior.\n- Updated the existing clip transcoding pytest to assert that the no-clips path also returns a one-item task list.\n\nValidation:\n- uv run ruff check nemo_curator/stages/video/clipping/clip_extraction_stages.py tests/stages/video/clipping/test_clip_transcoding_stage.py passed.\n- python3 -m compileall nemo_curator/stages/video/clipping/clip_extraction_stages.py tests/stages/video/clipping/test_clip_transcoding_stage.py passed.\n- I also attempted the targeted pytest set for the fanout-related stage tests, but local collection still stops on macOS with the existing Linux-only package guard in nemo_curator/__init__.py, so CI/Linux should validate pytest execution.

Signed-off-by: nightcityblade <nightcityblade@gmail.com>
@nightcityblade

Copy link
Copy Markdown
Contributor Author

I refreshed the branch sign-offs and opened a replacement PR here: #2056. This branch now has the same focused 13-file diff with all local commits signed off.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Automatically detect when IS_FANOUT_STAGE should be set to True

3 participants