Skip to content

fix: auto-detect Ray fanout stages#2056

Open
nightcityblade wants to merge 6 commits into
NVIDIA-NeMo:mainfrom
nightcityblade:fix/issue-1613-auto-fanout
Open

fix: auto-detect Ray fanout stages#2056
nightcityblade wants to merge 6 commits into
NVIDIA-NeMo:mainfrom
nightcityblade:fix/issue-1613-auto-fanout

Conversation

@nightcityblade

Copy link
Copy Markdown
Contributor

Description

Closes #1613.

Automatically marks Ray Data stages as fanout stages when their process return annotation is a concrete list[...]. This lets stages like URLGenerationStage rely on the base ProcessingStage default instead of manually setting is_fanout_stage.

Supersedes #2025 with the same branch after refreshing the DCO sign-offs.

Usage

class MyFanoutStage(ProcessingStage[InputTask, OutputTask]):
    def process(self, task: InputTask) -> list[OutputTask]:
        ...

assert MyFanoutStage().ray_stage_spec() == {"is_fanout_stage": True}

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Tests:

  • python3 -m compileall nemo_curator/stages/base.py nemo_curator/stages/video/clipping/clip_extraction_stages.py tests/stages/video/clipping/test_clip_transcoding_stage.py
  • Earlier targeted ruff checks passed on the changed stage/test files; targeted pytest collection is blocked locally on macOS by NeMo-Curator's Linux-only runtime check.

nightcityblade added 5 commits June 5, 2026 11:10
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
Signed-off-by: nightcityblade <nightcityblade@gmail.com>
@copy-pr-bot

copy-pr-bot Bot commented Jun 8, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR eliminates boilerplate ray_stage_spec overrides across multiple stages by auto-detecting fanout stages at the base class level: if a stage's process method is annotated to return a concrete list[...], ray_stage_spec() automatically includes {"is_fanout_stage": True}. It also correctly changes ClipTranscodingStage.process to return list[VideoTask] to match the fanout contract it was already declaring.

  • ProcessingStage._process_returns_list inspects the return annotation via get_type_hints, falling back to __annotations__ on NameError/TypeError/AttributeError, and delegates to _annotation_includes_list which handles both resolved types (get_origin(annotation) is list) and string annotations (prefix match against list[, List[, typing.List[).
  • Four stages (FilePartitioningStage, URLGenerationStage, ClusterWiseFilePartitioningStage, ClipTranscodingStage) drop their manual overrides; ClipTranscodingStage.process is updated from returning a single VideoTask to list[VideoTask] for consistency.
  • Test coverage is thorough: six new fixture classes and a TestProcessingStageRaySpec suite exercise normal fanout, union/optional non-detection, explicit opt-in, string annotations, and the AttributeError fallback path.

Confidence Score: 5/5

The change is safe to merge. The auto-detection is purely additive inference from the return annotation, and any stage that needs different behaviour can still override ray_stage_spec() directly.

All affected stages already had process returning a list, so the auto-detection preserves the exact same runtime contract they were manually asserting. The ClipTranscodingStage early-return path is correctly wrapped in a list and covered by an updated test. Exception handling in _process_returns_list guards against all known annotation-resolution failure modes. Test suite is comprehensive.

No files require special attention.

Important Files Changed

Filename Overview
nemo_curator/stages/base.py Adds _process_returns_list and _annotation_includes_list helpers; ray_stage_spec now auto-returns {"is_fanout_stage": True} when the process return annotation is a concrete list type. Exception handling covers NameError, TypeError, and AttributeError with a string-annotation fallback.
nemo_curator/stages/deduplication/semantic/pairwise_io.py Removes manual ray_stage_spec override for ClusterWiseFilePartitioningStage; process already annotated list[FileGroupTask] so auto-detection handles it correctly.
nemo_curator/stages/file_partitioning.py Removes manual ray_stage_spec override; process returns list[FileGroupTask], so auto-detection handles it correctly.
nemo_curator/stages/text/download/base/url_generation.py Removes manual ray_stage_spec override; process returns list[FileGroupTask], picked up automatically.
nemo_curator/stages/video/clipping/clip_extraction_stages.py Changes process return type from VideoTask to list[VideoTask], wraps the early-return case in a list, and removes the manual ray_stage_spec override. Behaviour is preserved and now consistently list-returning.
tests/stages/common/test_base.py Adds six test fixture classes and a TestProcessingStageRaySpec suite covering: single-task stage, fanout detection, union-type non-detection, explicit opt-in, string annotations, and AttributeError fallback.
tests/stages/video/clipping/test_clip_transcoding_stage.py Updates existing test assertions to expect a list return from process after the signature change.
tests/stages/deduplication/semantic/test_pairwise_io.py Adds a test asserting ClusterWiseFilePartitioningStage.ray_stage_spec() returns {"is_fanout_stage": True} via the new auto-detection path.
tests/stages/audio/datasets/test_fleurs_create_initial_manifest.py Adds a test asserting the FLEURS initial-manifest stage auto-detects as a fanout stage.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["ray_stage_spec() called"] --> B["_process_returns_list()"]
    B --> C{"get_type_hints(cls.process) succeeds?"}
    C -- "Yes" --> D["return_annotation = hints['return']"]
    C -- "No (NameError / TypeError / AttributeError)" --> E["return_annotation = cls.process.__annotations__['return']"]
    D --> F["_annotation_includes_list(annotation)"]
    E --> F
    F --> G{"annotation is str?"}
    G -- "Yes" --> H{"starts with 'list[' / 'List[' / 'typing.List['?"}
    G -- "No" --> I{"get_origin(annotation) is list?"}
    H -- "True" --> J["returns True"]
    H -- "False" --> K["returns False"]
    I -- "True" --> J
    I -- "False" --> K
    J --> L["ray_stage_spec returns {is_fanout_stage: True}"]
    K --> M["ray_stage_spec returns {}"]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A["ray_stage_spec() called"] --> B["_process_returns_list()"]
    B --> C{"get_type_hints(cls.process) succeeds?"}
    C -- "Yes" --> D["return_annotation = hints['return']"]
    C -- "No (NameError / TypeError / AttributeError)" --> E["return_annotation = cls.process.__annotations__['return']"]
    D --> F["_annotation_includes_list(annotation)"]
    E --> F
    F --> G{"annotation is str?"}
    G -- "Yes" --> H{"starts with 'list[' / 'List[' / 'typing.List['?"}
    G -- "No" --> I{"get_origin(annotation) is list?"}
    H -- "True" --> J["returns True"]
    H -- "False" --> K["returns False"]
    I -- "True" --> J
    I -- "False" --> K
    J --> L["ray_stage_spec returns {is_fanout_stage: True}"]
    K --> M["ray_stage_spec returns {}"]
Loading

Reviews (2): Last reviewed commit: "Merge remote-tracking branch 'upstream/m..." | Re-trigger Greptile

Comment on lines +312 to +315
try:
return_annotation = get_type_hints(cls.process).get("return")
except (NameError, TypeError):
return_annotation = cls.process.__annotations__.get("return")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Adding AttributeError to the except tuple ensures the fallback path is reached when get_type_hints encounters a dotted-attribute annotation referencing a missing symbol (e.g. some_module.MissingType). Without it, the exception propagates out of ray_stage_spec().

Suggested change
try:
return_annotation = get_type_hints(cls.process).get("return")
except (NameError, TypeError):
return_annotation = cls.process.__annotations__.get("return")
try:
return_annotation = get_type_hints(cls.process).get("return")
except (NameError, TypeError, AttributeError):
return_annotation = cls.process.__annotations__.get("return")

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@svcnvidia-nemo-ci svcnvidia-nemo-ci added the waiting-on-maintainers Waiting on maintainers to respond label Jun 10, 2026
@sarahyurick sarahyurick self-requested a review June 11, 2026 20:57
@sarahyurick

Copy link
Copy Markdown
Contributor

Hi @nightcityblade can you resolve the merge conflicts here?

@svcnvidia-nemo-ci svcnvidia-nemo-ci added waiting-on-customer Waiting on the original author to respond and removed waiting-on-maintainers Waiting on maintainers to respond labels Jun 16, 2026
…-fanout

# Conflicts:
#	nemo_curator/stages/audio/alm/alm_manifest_reader.py
#	nemo_curator/stages/base.py
#	nemo_curator/stages/deduplication/semantic/pairwise_io.py
#	nemo_curator/stages/file_partitioning.py
#	nemo_curator/stages/video/clipping/clip_extraction_stages.py
#	tests/stages/audio/alm/test_alm_manifest_reader.py
#	tests/stages/audio/io/test_convert.py
#	tests/stages/image/io/test_image_reader.py
@nightcityblade

Copy link
Copy Markdown
Contributor Author

Resolved the merge conflicts by merging the latest main into fix/issue-1613-auto-fanout and keeping the fanout auto-detection changes aligned with the current base branch.

Also addressed the Greptile review feedback by catching AttributeError around get_type_hints(cls.process) and added coverage for that dotted missing-symbol annotation fallback.

Validation:

  • uv run python -m py_compile nemo_curator/stages/base.py tests/stages/common/test_base.py nemo_curator/stages/deduplication/semantic/pairwise_io.py nemo_curator/stages/file_partitioning.py nemo_curator/stages/video/clipping/clip_extraction_stages.py
  • uv run pytest tests/stages/common/test_base.py -q could not run on this macOS host because NeMo-Curator now raises its Linux-only platform guard during import.

@svcnvidia-nemo-ci svcnvidia-nemo-ci added waiting-on-maintainers Waiting on maintainers to respond and removed waiting-on-customer Waiting on the original author to respond labels Jun 17, 2026
@sarahyurick

Copy link
Copy Markdown
Contributor

Hi @nightcityblade can you check if any of the changes in #2086 are relevant for this PR? Thanks in advance!

@svcnvidia-nemo-ci svcnvidia-nemo-ci added waiting-on-customer Waiting on the original author to respond and removed waiting-on-maintainers Waiting on maintainers to respond labels Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-request waiting-on-customer Waiting on the original author to respond

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Automatically detect when IS_FANOUT_STAGE should be set to True

3 participants