Add Omni-Fuse Tutorial#2069
Conversation
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
…modal_hybrid_real.yaml Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Greptile SummaryThis PR adds a full Omni-Fuse multimodal data curation tutorial under
Confidence Score: 5/5The tutorial is safe to merge. All blocking issues from earlier review rounds have been fixed, and the remaining findings are non-default edge cases that do not affect the happy path. The core pipeline flow — reader → SNS → EEE → projection → datablend — is logically sound end-to-end. The task_id, datablend_size/records, audio extension set, and README setup instructions have all been corrected. The two open observations (audio size guard for custom models, deepcopy fallback) only surface in non-default configurations or are practically unreachable, and neither corrupts data or blocks normal execution. omnifuse_tutorial/eee/backends.py — the input_audio branch in _describe_chat_completion_file lacks a size guard for custom audio models. omnifuse_tutorial/compat/curator.py — the copy.deepcopy(EmptyTask) fallback is worth hardening. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Script as Entry Script
participant Utils as utils.py
participant Reader as PairManifestReaderStage
participant SNS as SNSStage
participant EEE as EEEEmbeddingStage
participant Proj as ProjectionTrainingStage
participant DB as DatablendRankingStage
participant Compat as compat/curator.py
Script->>Utils: load_tutorial_config()
Script->>Utils: run_reader() / load_sns_task()
Utils->>Compat: make_empty_task()
Compat-->>Utils: EmptyTask
Utils->>Reader: process(EmptyTask)
Reader->>Compat: make_document_batch(task_id, records)
Compat-->>Reader: DocumentBatch
Reader-->>Utils: DocumentBatch (with pair records)
Utils->>SNS: process(DocumentBatch)
SNS->>Compat: records_from_task(task)
SNS->>Compat: make_document_batch(task_id, sns_records, metadata)
Compat-->>SNS: DocumentBatch
SNS-->>Utils: DocumentBatch (with SNS annotations)
Utils->>EEE: process(DocumentBatch)
EEE->>Compat: records_from_task(task)
Note over EEE: embed_raw + embed_annotation per expert, writes interleaved .npy files
EEE->>Compat: "make_document_batch(task_id, records, {embedding_bundle})"
Compat-->>EEE: DocumentBatch
EEE-->>Utils: DocumentBatch (with EmbeddingBundle in metadata)
Utils->>Proj: process(DocumentBatch)
Proj->>Proj: train_and_project(bundle)
Note over Proj: Linear or Torch MLP projection, writes model.json, embeddings.npy
Proj->>Compat: "make_document_batch(task_id, records, {projection_result})"
Compat-->>Proj: DocumentBatch
Proj-->>Utils: DocumentBatch (with ProjectionResult in metadata)
Utils->>DB: process(DocumentBatch)
DB->>DB: DatablendRanker.rank(records, projection)
DB->>DB: select_top(ranked)
Note over DB: writes datablend_ranked.jsonl, datablend_topk.jsonl
DB->>Compat: make_document_batch(task_id, selected)
Compat-->>DB: DocumentBatch
DB-->>Utils: DocumentBatch (ranked + selected records)
Utils-->>Script: final metadata dict
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Script as Entry Script
participant Utils as utils.py
participant Reader as PairManifestReaderStage
participant SNS as SNSStage
participant EEE as EEEEmbeddingStage
participant Proj as ProjectionTrainingStage
participant DB as DatablendRankingStage
participant Compat as compat/curator.py
Script->>Utils: load_tutorial_config()
Script->>Utils: run_reader() / load_sns_task()
Utils->>Compat: make_empty_task()
Compat-->>Utils: EmptyTask
Utils->>Reader: process(EmptyTask)
Reader->>Compat: make_document_batch(task_id, records)
Compat-->>Reader: DocumentBatch
Reader-->>Utils: DocumentBatch (with pair records)
Utils->>SNS: process(DocumentBatch)
SNS->>Compat: records_from_task(task)
SNS->>Compat: make_document_batch(task_id, sns_records, metadata)
Compat-->>SNS: DocumentBatch
SNS-->>Utils: DocumentBatch (with SNS annotations)
Utils->>EEE: process(DocumentBatch)
EEE->>Compat: records_from_task(task)
Note over EEE: embed_raw + embed_annotation per expert, writes interleaved .npy files
EEE->>Compat: "make_document_batch(task_id, records, {embedding_bundle})"
Compat-->>EEE: DocumentBatch
EEE-->>Utils: DocumentBatch (with EmbeddingBundle in metadata)
Utils->>Proj: process(DocumentBatch)
Proj->>Proj: train_and_project(bundle)
Note over Proj: Linear or Torch MLP projection, writes model.json, embeddings.npy
Proj->>Compat: "make_document_batch(task_id, records, {projection_result})"
Compat-->>Proj: DocumentBatch
Proj-->>Utils: DocumentBatch (with ProjectionResult in metadata)
Utils->>DB: process(DocumentBatch)
DB->>DB: DatablendRanker.rank(records, projection)
DB->>DB: select_top(ranked)
Note over DB: writes datablend_ranked.jsonl, datablend_topk.jsonl
DB->>Compat: make_document_batch(task_id, selected)
Compat-->>DB: DocumentBatch
DB-->>Utils: DocumentBatch (ranked + selected records)
Utils-->>Script: final metadata dict
Reviews (9): Last reviewed commit: "nvcf upload for large videos" | Re-trigger Greptile |
| # Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Small compatibility layer around NeMo Curator APIs.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import copy | ||
| from typing import Any | ||
|
|
||
| import pandas as pd | ||
| from nemo_curator.pipeline import Pipeline as CuratorPipeline | ||
| from nemo_curator.tasks import DocumentBatch, EmptyTask | ||
|
|
||
|
|
||
| def records_from_task(task: Any) -> list[dict[str, Any]]: | ||
| """Return task data as records from a Curator task.""" | ||
|
|
||
| data = task.data | ||
| if hasattr(data, "to_dict"): | ||
| try: | ||
| return [dict(row) for row in data.to_dict(orient="records")] | ||
| except TypeError: | ||
| return [dict(row) for row in data.to_dict("records")] | ||
| if isinstance(data, list): | ||
| return [dict(row) for row in data] | ||
| raise TypeError(f"Unsupported task data type: {type(data)!r}") | ||
|
|
||
|
|
||
| def make_document_batch( | ||
| task_id: str, | ||
| dataset_name: str, | ||
| records: list[dict[str, Any]], | ||
| metadata: dict[str, Any] | None = None, | ||
| stage_perf: list[Any] | None = None, | ||
| ) -> DocumentBatch: | ||
| """Construct a NeMo Curator DocumentBatch.""" | ||
|
|
||
| return DocumentBatch( | ||
| task_id=task_id, | ||
| dataset_name=dataset_name, | ||
| data=pd.DataFrame.from_records(records), | ||
| _metadata=metadata or {}, | ||
| _stage_perf=stage_perf or [], | ||
| ) | ||
|
|
||
|
|
||
| def make_empty_task() -> EmptyTask: | ||
| if callable(EmptyTask): | ||
| try: | ||
| return EmptyTask(task_id="empty", dataset_name="omnifuse", data=None) | ||
| except TypeError: | ||
| return EmptyTask() | ||
| return copy.deepcopy(EmptyTask) | ||
|
|
||
|
|
||
| def make_curator_pipeline(name: str, stages: list[Any], description: str | None = None) -> CuratorPipeline: | ||
| return CuratorPipeline(name=name, description=description, stages=stages) |
There was a problem hiding this comment.
Missing
ProcessingStage, Resources, and re-export of EmptyTask
Every stage file (stages/eee.py, stages/sns.py, stages/projection.py, stages/datablend.py, stages/reader.py) imports ProcessingStage and Resources from this module. Neither is defined here or re-exported from it. stages/reader.py also imports EmptyTask directly. All five stage modules will fail immediately with ImportError when loaded, making the entire pipeline non-runnable.
These classes need to be either imported from nemo_curator and re-exported, or implemented as shim classes within this compat module.
|
|
||
| from omnifuse_tutorial.compat.curator import make_curator_pipeline, make_empty_task | ||
| from omnifuse_tutorial.config.models import ExperimentConfig | ||
| from omnifuse_tutorial.data.io import write_json |
There was a problem hiding this comment.
Missing
omnifuse_tutorial/data/ package
omnifuse_tutorial/data/io.py and omnifuse_tutorial/data/loader.py are imported by at least eight files — pipeline.py, utils.py, stages/eee.py, stages/projection.py, stages/datablend.py, stages/reader.py, sns/backends.py, and projection/trainer.py — but the entire omnifuse_tutorial/data/ directory does not exist in the repository. Every entry point to the tutorial will fail at import time with ModuleNotFoundError: No module named 'omnifuse_tutorial.data'.
There was a problem hiding this comment.
Added in recent commit. Was .gitignored previously.
| def _audio_format(path: Path) -> str: | ||
| audio_format = path.suffix.lower().lstrip(".") | ||
| if audio_format == "mpeg": | ||
| audio_format = "mp3" | ||
| if audio_format not in {"wav", "mp3"}: | ||
| raise ValueError(f"NVIDIA audio descriptions support wav/mp3 only, got {path.suffix} for {path}") | ||
| return audio_format |
There was a problem hiding this comment.
_audio_format raises for extensions declared as supported
AUDIO_EXTENSIONS and MIME_TYPES both include .flac, .m4a, .aac, and .ogg, advertising them as valid inputs. But _audio_format raises ValueError for any extension other than .wav and .mp3. The API description path in NvidiaApiEEEBackend._describe_raw → _describe_file → describe_file_with_nvidia_api calls this function unconditionally for audio records, so a user who provides a .flac or .m4a file will receive a confusing ValueError rather than a clear unsupported-format message.
Reconcile by either removing the unsupported extensions from AUDIO_EXTENSIONS/MIME_TYPES, converting to a supported format before sending, or raising a clear error with guidance early in the audio code path.
| "pandas>=2.0", | ||
| "pyyaml>=6.0", | ||
| "requests>=2.31", | ||
| "spacy>=3.8.14", |
There was a problem hiding this comment.
Duplicate
spacy constraint — the same package appears twice with conflicting lower bounds (>=3.8.14 on line 17 and >=3.8,<4 on line 35). Keep a single, unambiguous entry.
| "spacy>=3.8.14", | |
| "spacy>=3.8.14,<4", |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| "qwen-vl-utils>=0.0.8", | ||
| "sentence-transformers>=2.2,<4", | ||
| "soundfile>=0.12", | ||
| "spacy>=3.8,<4", |
| path = Path(value) | ||
| except OSError: | ||
| return None | ||
| return path if path.exists() else path |
There was a problem hiding this comment.
_path_or_none always returns a Path, never None — return path if path.exists() else path is a tautology; both branches return the same value. The | None return type annotation is misleading. Either return None when the path does not exist, or simplify to return path.
| return path if path.exists() else path | |
| return path |
|
|
||
| ```bash | ||
| uv sync --extra dev |
There was a problem hiding this comment.
Incorrect setup directory in README
The instruction says to run uv sync --extra dev "from the Curator repository root", but uv sync reads pyproject.toml from the current directory. No uv workspace configuration ties this tutorial to the repository root, so running uv sync from the repo root installs the main Curator package rather than this tutorial's dependencies. Users should cd tutorials/multimodal/omni-fuse-data-curation/ first.
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
| return DocumentBatch( | ||
| task_id=task_id, | ||
| dataset_name=dataset_name, | ||
| data=pd.DataFrame.from_records(records), | ||
| _metadata=metadata or {}, | ||
| _stage_perf=stage_perf or [], | ||
| ) |
There was a problem hiding this comment.
task_id is declared as field(init=False, default="") in the Task base class, so it is intentionally excluded from DocumentBatch.__init__. Passing task_id=task_id here raises TypeError: __init__() got an unexpected keyword argument 'task_id' at runtime. Because every stage calls make_document_batch, the entire pipeline fails before any data is processed. The task_id is managed by the framework (set via _set_task_id() at each stage boundary) and must not be supplied to the constructor.
| return DocumentBatch( | |
| task_id=task_id, | |
| dataset_name=dataset_name, | |
| data=pd.DataFrame.from_records(records), | |
| _metadata=metadata or {}, | |
| _stage_perf=stage_perf or [], | |
| ) | |
| return DocumentBatch( | |
| dataset_name=dataset_name, | |
| data=pd.DataFrame.from_records(records), | |
| _metadata=metadata or {}, | |
| _stage_perf=stage_perf or [], | |
| ) |
There was a problem hiding this comment.
You're right to revert. My original comment was incorrect — task_id is defined as field(init=False, default="") on the Task base class, which means Python's @dataclass machinery excludes it from __init__ entirely. Passing task_id=task_id will indeed raise TypeError: __init__() got an unexpected keyword argument 'task_id' at runtime.
The current code in the PR still has this bug. The fix from commit 21b8091 (which you reverted) was correct — task_id should be omitted from the DocumentBatch(...) constructor call and set afterwards via _set_task_id() if needed, or simply left to the framework to assign.
Here's the correct version:
| return DocumentBatch( | |
| task_id=task_id, | |
| dataset_name=dataset_name, | |
| data=pd.DataFrame.from_records(records), | |
| _metadata=metadata or {}, | |
| _stage_perf=stage_perf or [], | |
| ) | |
| return DocumentBatch( | |
| dataset_name=dataset_name, | |
| data=pd.DataFrame.from_records(records), | |
| _metadata=metadata or {}, | |
| _stage_perf=stage_perf or [], | |
| ) |
If the caller needs to set a specific task_id, it should be done after construction:
batch = make_document_batch(...)
batch._set_task_id("", task_id) # or however it fits the pipeline DAGBut in most cases, the framework handles task_id assignment automatically — make_document_batch doesn't need the task_id parameter at all.
Tip: You can customize Greptile's behavior for this repo with .greptile/rules.md and .greptile/config.json.
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
| task = run_curator_step( | ||
| name=f"{config.experiment_id}-2-embed", | ||
| stages=[EEEEmbeddingStage(config=config)], | ||
| initial_task=load_sns_task(config), | ||
| ) | ||
| records_path = config.run_dir / "embeddings" / "records.jsonl" | ||
| write_jsonl(records_path, records_from_task(task)) | ||
| metadata = dict(getattr(task, "_metadata", {}) or {}) | ||
| metadata["embedding_records_path"] = str(records_path) | ||
| task._metadata = metadata |
There was a problem hiding this comment.
task_id is not a parameter of make_document_batch
load_sns_task, load_embedding_task, and load_projection_task all call make_document_batch(task_id=..., dataset_name=..., records=..., metadata=...), but the current make_document_batch signature in compat/curator.py is (dataset_name, records, metadata=None, stage_perf=None) — it has no task_id parameter. Every one of these three loaders will raise TypeError: make_document_batch() got an unexpected keyword argument 'task_id' as soon as it is called, breaking all three resume paths (2_embed.py, 3_project.py, 4_datablend.py). Remove the task_id= keyword from all three call sites.
DocumentBatch requires task_id This reverts commit 21b8091. Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
| selected = ranker.select_top(ranked) | ||
|
|
||
| output_dir = self.config.run_dir / "datablend" | ||
| ranked_path = write_jsonl(output_dir / "datablend_ranked.jsonl", ranked) | ||
| topk_path = write_jsonl(output_dir / "datablend_topk.jsonl", selected) | ||
| metadata.update( | ||
| { | ||
| "datablend_ranked_path": str(ranked_path), | ||
| "datablend_topk_path": str(topk_path), | ||
| "datablend_size": len(selected), | ||
| } | ||
| ) |
There was a problem hiding this comment.
datablend_size metadata contradicts the batch record count
make_document_batch is called with records=ranked (all N ranked records), but datablend_size in metadata is set to len(selected) (the top-k count). Any downstream code that consumes the output task via records_from_task(task) receives all N records, not datablend_size records — the two values will silently disagree. If the intent is for the curated output batch to contain only the selected subset, records=selected should be passed instead. If the intent is to carry all rankings forward, datablend_size should reflect len(ranked) (or be renamed to datablend_topk_size) so the contract is unambiguous.
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
| def _describe_chat_completion_file( | ||
| path: Path, | ||
| model: str, | ||
| content_type: str, | ||
| prompt: str, | ||
| api_base_url: str, | ||
| headers: dict[str, str], | ||
| timeout: int, | ||
| ) -> str: | ||
| encoded = base64.b64encode(path.read_bytes()).decode("utf-8") | ||
| mime = MIME_TYPES.get(path.suffix.lower(), "application/octet-stream") | ||
| if content_type == "input_audio": | ||
| media_content = {"type": "input_audio", "input_audio": {"data": encoded, "format": _audio_format(path)}} | ||
| else: | ||
| media_content = {content_type: {"url": f"data:{mime};base64,{encoded}"}, "type": content_type} | ||
| url = f"{api_base_url}/chat/completions" | ||
| payload = { | ||
| "model": model, | ||
| "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}, media_content]}], | ||
| "max_tokens": 512, | ||
| "temperature": 0.2, | ||
| "stream": False, | ||
| } | ||
| response = _post_nvidia_json_with_retries(url, headers, payload, timeout) | ||
| response = _resolve_nvidia_response(response, api_base_url, headers, timeout, model, url) | ||
| return _response_text(response.json(), model, url) |
There was a problem hiding this comment.
Video/image files base64-encoded inline with no size guard
_describe_chat_completion_file reads the entire file with path.read_bytes() and embeds it as a base64 data-URL in the JSON body for both image_url and video_url content types. The audio code path has an explicit NVCF_ASSET_UPLOAD_THRESHOLD_BYTES = 180 * 1024 check and falls back to NVCF asset upload for large files. No equivalent guard exists here. A typical 10-second video file is easily 10–50 MB before encoding; base64 adds ~33 %, producing a JSON body the NVIDIA API will reject with HTTP 413 or a generic error — the same limit that drove the audio threshold. This silently breaks the video modality for any non-trivial input. Consider adding a size check and routing large image/video files through _upload_nvcf_asset (or raising an early ValueError with a clear message) just as audio already does.
Signed-off-by: Harshil Kotamreddy <hkotamreddy@nvidia.com>
Description
This PR adds a tutorial for Omni-Fuse. It follows a similar structure to existing tutorials and builds a curator pipeline to perform retrieval-based data curation for multimodal datasets.
Usage
Instructions to run the tutorial can be found in the README.md file. Open to feedback/adding additional instructions.
Checklist