diff --git a/tutorials/multimodal/omni-fuse-data-curation/.env.example b/tutorials/multimodal/omni-fuse-data-curation/.env.example new file mode 100644 index 0000000000..f63f05150f --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/.env.example @@ -0,0 +1,14 @@ +# Copy this file to .env and fill in real values. Do not commit .env. + +# Required for the API-first hybrid tutorial path. +NV_BUILD_API_KEY= + +# Defaults to NVIDIA Build's OpenAI-compatible API base URL. +NVIDIA_API_BASE_URL=https://integrate.api.nvidia.com/v1 + +# Optional if you use Hugging Face gated/local model paths outside the API backend. +HF_TOKEN= + +# Required for the LanguageBind fusion expert unless LanguageBind is cloned into +# ./third_party/LanguageBind. +LANGUAGEBIND_ROOT= diff --git a/tutorials/multimodal/omni-fuse-data-curation/.python-version b/tutorials/multimodal/omni-fuse-data-curation/.python-version new file mode 100644 index 0000000000..7eebfafa04 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/.python-version @@ -0,0 +1 @@ +3.12.11 diff --git a/tutorials/multimodal/omni-fuse-data-curation/.rayignore b/tutorials/multimodal/omni-fuse-data-curation/.rayignore new file mode 100644 index 0000000000..df18d9aaf1 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/.rayignore @@ -0,0 +1,12 @@ +.git/ +.venv/ +__pycache__/ +.pytest_cache/ +.ruff_cache/ +.mypy_cache/ +datasets/ +model_files/ +outputs/ +tutorials/multimodal/omni-fuse-data-curation/outputs/ +tutorials/multimodal/omni-fuse-data-curation/tmp/ +*.egg-info/ diff --git a/tutorials/multimodal/omni-fuse-data-curation/0_validate_inputs.py b/tutorials/multimodal/omni-fuse-data-curation/0_validate_inputs.py new file mode 100644 index 0000000000..4532c4878c --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/0_validate_inputs.py @@ -0,0 +1,31 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Validate config, data manifests, API keys, and local model assets.""" + +from __future__ import annotations + +from utils import config_parser, load_tutorial_config, print_outputs, validate_inputs + + +def main() -> int: + parser = config_parser(__doc__ or "") + args = parser.parse_args() + config = load_tutorial_config(args.config) + print_outputs(validate_inputs(config)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tutorials/multimodal/omni-fuse-data-curation/1_sns.py b/tutorials/multimodal/omni-fuse-data-curation/1_sns.py new file mode 100644 index 0000000000..75694ad5a0 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/1_sns.py @@ -0,0 +1,39 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Run Symmetric Nucleus Subsampling over paired multimodal records.""" + +from __future__ import annotations + +from utils import config_parser, load_tutorial_config, print_outputs, run_sns + + +def main() -> int: + parser = config_parser(__doc__ or "") + args = parser.parse_args() + config = load_tutorial_config(args.config) + task = run_sns(config) + metadata = dict(getattr(task, "_metadata", {}) or {}) + print_outputs( + { + "run_dir": str(config.run_dir), + "sns_manifest_path": metadata.get("sns_manifest_path"), + "sns_records_path": metadata.get("sns_records_path"), + } + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tutorials/multimodal/omni-fuse-data-curation/2_embed.py b/tutorials/multimodal/omni-fuse-data-curation/2_embed.py new file mode 100644 index 0000000000..ab6c0e632f --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/2_embed.py @@ -0,0 +1,39 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Run the Expert Embedding Engine for SNS-refined records.""" + +from __future__ import annotations + +from utils import config_parser, load_tutorial_config, print_outputs, run_eee + + +def main() -> int: + parser = config_parser(__doc__ or "") + args = parser.parse_args() + config = load_tutorial_config(args.config) + task = run_eee(config) + metadata = dict(getattr(task, "_metadata", {}) or {}) + print_outputs( + { + "run_dir": str(config.run_dir), + "embedding_metadata_path": metadata.get("embedding_metadata_path"), + "embedding_records_path": metadata.get("embedding_records_path"), + } + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tutorials/multimodal/omni-fuse-data-curation/3_project.py b/tutorials/multimodal/omni-fuse-data-curation/3_project.py new file mode 100644 index 0000000000..da34b1c3df --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/3_project.py @@ -0,0 +1,41 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Train the Omni-Fuse projection network and project raw embeddings.""" + +from __future__ import annotations + +from utils import config_parser, load_tutorial_config, print_outputs, run_projection + + +def main() -> int: + parser = config_parser(__doc__ or "") + args = parser.parse_args() + config = load_tutorial_config(args.config) + task = run_projection(config) + metadata = dict(getattr(task, "_metadata", {}) or {}) + print_outputs( + { + "run_dir": str(config.run_dir), + "projection_model_path": metadata.get("projection_model_path"), + "projection_metrics_path": metadata.get("projection_metrics_path"), + "projected_embeddings_path": metadata.get("projected_embeddings_path"), + "projection_records_path": metadata.get("projection_records_path"), + } + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tutorials/multimodal/omni-fuse-data-curation/4_datablend.py b/tutorials/multimodal/omni-fuse-data-curation/4_datablend.py new file mode 100644 index 0000000000..ffff81e053 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/4_datablend.py @@ -0,0 +1,40 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Rank SNS-refined records into a query-conditioned datablend.""" + +from __future__ import annotations + +from utils import config_parser, load_tutorial_config, print_outputs, run_datablend + + +def main() -> int: + parser = config_parser(__doc__ or "") + args = parser.parse_args() + config = load_tutorial_config(args.config) + task = run_datablend(config) + metadata = dict(getattr(task, "_metadata", {}) or {}) + print_outputs( + { + "run_dir": str(config.run_dir), + "datablend_ranked_path": metadata.get("datablend_ranked_path"), + "datablend_topk_path": metadata.get("datablend_topk_path"), + "datablend_size": metadata.get("datablend_size"), + } + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tutorials/multimodal/omni-fuse-data-curation/README.md b/tutorials/multimodal/omni-fuse-data-curation/README.md new file mode 100644 index 0000000000..44e575629a --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/README.md @@ -0,0 +1,214 @@ +# Omni-Fuse Data Curation + +Omni-Fuse (see paper [here](https://arxiv.org/pdf/2605.01163v1)) curates paired multimodal datasets by improving pair alignment and +then ranking the resulting records for a target data blend. This tutorial uses +NeMo Curator task/stage abstractions to implement the curation pipeline: + +1. Validate paired data manifests and model assets. +2. Apply Symmetric Nucleus Subsampling (SNS). +3. Run the Expert Embedding Engine (EEE). +4. Train/apply the projection network. +5. Export a query-ranked datablend. + +The tutorial is API-first hybrid. It uses NVIDIA API models where hosted +endpoints preserve the intended Omni-Fuse role, and local models where the +current implementation needs local model execution. + +## Setup + +Install the tutorial dependencies from the tutorial directory: + +```bash +cd tutorials/multimodal/omni-fuse-data-curation/ +uv sync --extra dev +``` + +You will need to do the following before you're able to run the tutorial: +- Ensure `ffmpeg` is installed and added to `PATH`. +- Log in to Hugging Face using `hf auth login` +- Copy `.env.example` to `.env` in this tutorial directory. +- Create an API key at `build.nvidia.com` and set the `NV_BUILD_API_KEY` variable in the `.env` file. +- Clone the [LanguageBind](https://github.com/pku-yuangroup/languagebind) repository. Either clone it to `third_party/` or set the `LANGUAGEBIND_ROOT` variable in the `.env` file. +- Download pre-trained weights for CG-DETR model from [Lighthouse](https://github.com/line/lighthouse#pre-trained-weights) and save it to `model_files/best.ckpt` We use `cg_detr/qvhighlight/clip/best.ckpt`. +- We recommend using GPUs as we run several local models +- Set the paths to the datasets you want to use in `configs/omni_fuse_hybrid.yaml` and change other settings as you see fit. + +### Data Layout + +This tutorial is bring-your-own data. Each pool contains raw files, text +annotations, and a `pair_mapping.jsonl` file: + +```text +my_pool/ + raw/ + annotations/ + pair_mapping.jsonl +``` + +Each mapping row must contain a raw path and either an annotation path or inline +annotation text: + +```json +{"id": "sample-1", "data_path": "raw/sample.jpg", "annotation_path": "annotations/sample.txt"} +{"id": "sample-2", "data_path": "raw/sample.wav", "annotation": "A person speaks over background music."} +``` + +Supported raw modalities are `text`, `image`, `audio`, and `video`. Configure +each pool in `configs/omni_fuse_hybrid.yaml`: + +```yaml +data_pools: + - name: "image_caption_pool" + modality: "image" + root_dir: "/path/to/image_pool" + mapping_file: "pair_mapping.jsonl" + n_samples: 1 +``` + +Use small `n_samples` values while validating the tutorial. + +### Model Backends + +The default config uses `sns.backend: hybrid` and `eee.backend: hybrid`. +If you wish to use strictly api-based or local models, you can change these to `api` or `local`. However, this won't work out of the box and you will have to modify code to fit your requirements. + +API-backed components: + +- Modality descriptions for backward SNS and the text-based EEE expert: + - `nvidia/nemotron-nano-12b-v2-vl` for text, image, and video. + - `google/gemma-3n-e4b-it` for audio. +- Text embeddings: + - `nvidia/llama-nemotron-embed-1b-v2`. + +Local components: + +- SNS multimodal similarity and MI gating: + - `nvidia/omni-embed-nemotron-3b`. +- SNS image forward extraction: + - `IDEA-Research/grounding-dino-tiny`. +- SNS audio forward extraction: + - `lighthouse-emnlp2024/AM-DETR`. +- SNS video forward extraction: + - CG-DETR from Lighthouse with `model_files/best.ckpt`. +- EEE fusion expert: + - LanguageBind. +- EEE end-to-end expert: + - `nvidia/omni-embed-nemotron-3b`. + + + +## Step 0: Validate Inputs + +```bash +python 0_validate_inputs.py --config configs/omni_fuse_hybrid.yaml +``` + +This checks the data manifests, API key availability, LanguageBind checkout, +and CG-DETR checkpoint path. + +## Step 1: Symmetric Nucleus Subsampling + +```bash +python 1_sns.py --config configs/omni_fuse_hybrid.yaml +``` + +SNS writes: + +```text +outputs//sns/manifest.jsonl +outputs//sns/records.jsonl +``` + +In hybrid mode, backward extraction uses API descriptions and API text +embeddings. Forward extraction for image/audio/video uses local +Grounding-DINO/AM-DETR/CG-DETR and local Omni-Embed MI gating. + +## Step 2: Expert Embeddings + +```bash +python 2_embed.py --config configs/omni_fuse_hybrid.yaml +``` + +EEE writes interleaved, raw, and annotation embeddings for each expert: + +```text +outputs//embeddings/text_based_*.npy +outputs//embeddings/fusion_*.npy +outputs//embeddings/e2e_*.npy +outputs//embeddings/metadata.json +outputs//embeddings/records.jsonl +``` + +The text-based expert uses NVIDIA API descriptions and text embeddings. The +fusion and e2e experts use LanguageBind and Omni-Embed locally. + +## Step 3: Projection + +```bash +python 3_project.py --config configs/omni_fuse_hybrid.yaml +``` + +The projection stage trains a small MLP over concatenated expert embeddings +using contrastive, cluster-bias, and scale-bias losses. It writes: + +```text +outputs//projection/model.json +outputs//projection/loss_history.json +outputs//projection/metrics.json +outputs//projection/projected_embeddings.npy +outputs//projection/annotation_embeddings.npy +``` + +## Step 4: Datablend Ranking + +```bash +python 4_datablend.py --config configs/omni_fuse_hybrid.yaml +``` + +The datablend stage embeds the query through the text-based expert and ranks +projected records by cosine similarity: + +```text +outputs//datablend/datablend_ranked.jsonl +outputs//datablend/datablend_topk.jsonl +``` + +## End-to-End Script + +Run every step in order: + +```bash +CONFIG=configs/omni_fuse_hybrid.yaml bash e2e.sh +``` + +Set `PYTHON_BIN` if you want to use a specific interpreter: + +```bash +PYTHON_BIN="uv run python" CONFIG=configs/omni_fuse_hybrid.yaml bash e2e.sh +``` + +## Output Layout + +```text +outputs// + config.resolved.json + sns/ + manifest.jsonl + records.jsonl + media/ + embeddings/ + metadata.json + records.jsonl + *_interleaved.npy + *_raw.npy + *_annotation.npy + projection/ + model.json + loss_history.json + metrics.json + projected_embeddings.npy + annotation_embeddings.npy + datablend/ + datablend_ranked.jsonl + datablend_topk.jsonl +``` diff --git a/tutorials/multimodal/omni-fuse-data-curation/configs/omni_fuse_hybrid.yaml b/tutorials/multimodal/omni-fuse-data-curation/configs/omni_fuse_hybrid.yaml new file mode 100644 index 0000000000..7cbf9d59c9 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/configs/omni_fuse_hybrid.yaml @@ -0,0 +1,76 @@ +experiment_id: "omni-fuse-hybrid-tutorial" +description: "API-first hybrid Omni-Fuse multimodal data curation tutorial" +output_dir: "../outputs" + +data_pools: + - name: "image_caption_pool" + modality: "image" + root_dir: "/path/to/image_pool" + mapping_file: "pair_mapping.jsonl" + n_samples: 1 + - name: "audio_caption_pool" + modality: "audio" + root_dir: "/path/to/audio_pool" + mapping_file: "pair_mapping.jsonl" + n_samples: 1 + - name: "video_instruction_pool" + modality: "video" + root_dir: "/path/to/video_pool" + mapping_file: "pair_mapping.jsonl" + n_samples: 1 + - name: "text_instruction_pool" + modality: "text" + root_dir: "/path/to/text_pool" + mapping_file: "pair_mapping.jsonl" + n_samples: 1 + +sns: + enabled: true + backend: "hybrid" + direction: "bidirectional" + mi_ratio: 0.75 + mi_eps: 0.05 + tau_forward_text: 0.05 + tau_forward_image: 0.10 + tau_forward_audio: 0.10 + tau_forward_video: 0.10 + tau_backward: 0.10 + nvidia_model: "nvidia/omni-embed-nemotron-3b" + grounding_dino_model_id: "IDEA-Research/grounding-dino-tiny" + amdetr_repo_id: "lighthouse-emnlp2024/AM-DETR" + cg_detr_checkpoint: "../model_files/best.ckpt" + require_forward_models: true + use_ann_components: true + +eee: + backend: "hybrid" + experts: ["text-based", "fusion", "e2e"] + embedding_dim: 2048 + batch_size: 1 + nvidia_api_base_url: "https://integrate.api.nvidia.com/v1" + nvidia_text_describer_model: "nvidia/nemotron-nano-12b-v2-vl" + nvidia_image_describer_model: "nvidia/nemotron-nano-12b-v2-vl" + nvidia_video_describer_model: "nvidia/nemotron-nano-12b-v2-vl" + nvidia_audio_describer_model: "google/gemma-3n-e4b-it" + nvidia_embedding_model: "nvidia/llama-nemotron-embed-1b-v2" + nvidia_multimodal_model: "nvidia/omni-embed-nemotron-3b" + +projection: + enabled: true + backend: "torch" + num_epochs: 1 + batch_size: 4 + hidden_layer_size: 256 + contrastive_loss_weight: 0.99 + bias_loss_weight: 0.01 + scale_loss_weight: 0.0001 + +datablend: + query: "high quality multimodal instruction tuning data" + top_k: 4 + include_metadata: true + +runtime: + device: "auto" + dtype: "float32" + offline_mode: false diff --git a/tutorials/multimodal/omni-fuse-data-curation/e2e.sh b/tutorials/multimodal/omni-fuse-data-curation/e2e.sh new file mode 100755 index 0000000000..5fab4f7e19 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/e2e.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +TUTORIAL_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CONFIG="${CONFIG:-${TUTORIAL_DIR}/configs/omni_fuse_hybrid.yaml}" +PYTHON_BIN="${PYTHON_BIN:-python}" +read -r -a PYTHON_CMD <<< "${PYTHON_BIN}" + +"${PYTHON_CMD[@]}" "${TUTORIAL_DIR}/0_validate_inputs.py" --config "${CONFIG}" +"${PYTHON_CMD[@]}" "${TUTORIAL_DIR}/1_sns.py" --config "${CONFIG}" +"${PYTHON_CMD[@]}" "${TUTORIAL_DIR}/2_embed.py" --config "${CONFIG}" +"${PYTHON_CMD[@]}" "${TUTORIAL_DIR}/3_project.py" --config "${CONFIG}" +"${PYTHON_CMD[@]}" "${TUTORIAL_DIR}/4_datablend.py" --config "${CONFIG}" diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/__init__.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/__init__.py new file mode 100644 index 0000000000..0c320a276a --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""NeMo Curator rewrite of the Omni-Fuse data curation pipeline.""" + +from omnifuse_tutorial.config.models import ExperimentConfig +from omnifuse_tutorial.pipeline import build_pipeline + +__all__ = ["ExperimentConfig", "build_pipeline"] diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/compat/__init__.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/compat/__init__.py new file mode 100644 index 0000000000..32460e672a --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/compat/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Compatibility helpers for optional NeMo Curator imports.""" diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/compat/curator.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/compat/curator.py new file mode 100644 index 0000000000..af48309ff0 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/compat/curator.py @@ -0,0 +1,73 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Small compatibility layer around NeMo Curator APIs.""" + +from __future__ import annotations + +import copy +import inspect +from typing import Any + +import pandas as pd +from nemo_curator.pipeline import Pipeline as CuratorPipeline +from nemo_curator.tasks import DocumentBatch, EmptyTask + + +def records_from_task(task: Any) -> list[dict[str, Any]]: + """Return task data as records from a Curator task.""" + + data = task.data + if hasattr(data, "to_dict"): + try: + return [dict(row) for row in data.to_dict(orient="records")] + except TypeError: + return [dict(row) for row in data.to_dict("records")] + if isinstance(data, list): + return [dict(row) for row in data] + raise TypeError(f"Unsupported task data type: {type(data)!r}") + + +def make_document_batch( + task_id: str, + dataset_name: str, + records: list[dict[str, Any]], + metadata: dict[str, Any] | None = None, + stage_perf: list[Any] | None = None, +) -> DocumentBatch: + """Construct a NeMo Curator DocumentBatch.""" + + kwargs = { + "dataset_name": dataset_name, + "data": pd.DataFrame.from_records(records), + "_metadata": metadata or {}, + "_stage_perf": stage_perf or [], + } + if "task_id" in inspect.signature(DocumentBatch).parameters: + kwargs["task_id"] = task_id + + return DocumentBatch(**kwargs) + + +def make_empty_task() -> EmptyTask: + if callable(EmptyTask): + try: + return EmptyTask(task_id="empty", dataset_name="omnifuse", data=None) + except TypeError: + return EmptyTask() + return copy.deepcopy(EmptyTask) + + +def make_curator_pipeline(name: str, stages: list[Any], description: str | None = None) -> CuratorPipeline: + return CuratorPipeline(name=name, description=description, stages=stages) diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/__init__.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/__init__.py new file mode 100644 index 0000000000..f2f6beb44c --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Configuration models and loading helpers.""" diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/loader.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/loader.py new file mode 100644 index 0000000000..0cae7af71a --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/loader.py @@ -0,0 +1,79 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Configuration loading helpers.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + +import yaml + +from omnifuse_tutorial.config.models import ExperimentConfig + + +def load_config(path: str | Path) -> ExperimentConfig: + config_path = Path(path) + _load_dotenv(Path.cwd() / ".env") + _load_dotenv(config_path.parent / ".env") + data = _load_mapping(config_path) + config = ExperimentConfig.from_dict(data) + return _resolve_relative_paths(config, config_path.parent) + + +def _load_mapping(path: Path) -> dict[str, Any]: + suffix = path.suffix.lower() + text = path.read_text(encoding="utf-8") + if suffix == ".json": + return json.loads(text) + if suffix in {".yaml", ".yml"}: + loaded = yaml.safe_load(text) + if not isinstance(loaded, dict): + raise ValueError(f"Config must be a mapping: {path}") + return loaded + raise ValueError(f"Unsupported config suffix: {path.suffix}") + + +def _resolve_relative_paths(config: ExperimentConfig, base_dir: Path) -> ExperimentConfig: + if not config.output_dir.is_absolute(): + config.output_dir = (base_dir / config.output_dir).resolve() + if config.runtime.cache_dir and not config.runtime.cache_dir.is_absolute(): + config.runtime.cache_dir = (base_dir / config.runtime.cache_dir).resolve() + if config.sns.sns_output_dir and not config.sns.sns_output_dir.is_absolute(): + config.sns.sns_output_dir = (base_dir / config.sns.sns_output_dir).resolve() + if config.sns.cg_detr_checkpoint and not config.sns.cg_detr_checkpoint.is_absolute(): + config.sns.cg_detr_checkpoint = (base_dir / config.sns.cg_detr_checkpoint).resolve() + if config.projection.save_weights_path and not config.projection.save_weights_path.is_absolute(): + config.projection.save_weights_path = (base_dir / config.projection.save_weights_path).resolve() + for pool in config.data_pools: + if not pool.root_dir.is_absolute(): + pool.root_dir = (base_dir / pool.root_dir).resolve() + return config + + +def _load_dotenv(path: Path) -> None: + if not path.exists(): + return + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = value diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/models.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/models.py new file mode 100644 index 0000000000..011666d701 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/config/models.py @@ -0,0 +1,320 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Configuration dataclasses for the standalone Omni-Fuse Curator pipeline.""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any, Literal + +Modality = Literal["text", "image", "video", "audio", "point-cloud"] +SNSDirection = Literal["forward", "backward", "bidirectional"] +ExpertName = Literal["text-based", "fusion", "e2e"] + + +@dataclass +class DataPoolConfig: + name: str + modality: Modality + root_dir: Path + mapping_file: str = "pair_mapping.jsonl" + n_samples: int | None = None + shuffle: bool = False + max_file_size_mb: int | None = None + max_video_frames: int | None = None + max_audio_duration_seconds: int | None = None + + @classmethod + def from_dict(cls, value: dict[str, Any]) -> DataPoolConfig: + name = value.get("name", value.get("title")) + modality = value.get("modality", value.get("data_modality")) + root_dir = value.get("root_dir", value.get("data_root_dir")) + if name is None: + raise ValueError(f"Data pool missing name/title: {value}") + if modality is None: + raise ValueError(f"Data pool missing modality/data_modality: {value}") + if root_dir is None: + raise ValueError(f"Data pool missing root_dir/data_root_dir: {value}") + return cls( + name=str(name), + modality=modality, + root_dir=Path(root_dir), + mapping_file=str(value.get("mapping_file", "pair_mapping.jsonl")), + n_samples=value.get("n_samples"), + shuffle=bool(value.get("shuffle", False)), + max_file_size_mb=value.get("max_file_size_mb"), + max_video_frames=value.get("max_video_frames"), + max_audio_duration_seconds=value.get("max_audio_duration_seconds"), + ) + + +@dataclass +class SNSConfig: + enabled: bool = True + backend: Literal["auto", "hybrid", "local", "api"] = "auto" + direction: SNSDirection = "bidirectional" + mi_ratio: float = 0.95 + mi_eps: float = 0.05 + tau_forward_text: float = 0.30 + tau_forward_image: float = 0.30 + tau_forward_video: float = 0.20 + tau_forward_audio: float = 0.25 + tau_backward: float = 0.35 + grid_size: int = 5 + max_patches: int = 4 + max_video_segments: int = 5 + max_audio_segments: int = 5 + min_segment_duration: float = 2.0 + bbox_padding_px: int = 0 + reinject: bool = False + sns_output_dir: Path | None = None + grounding_dino_model_id: str = "IDEA-Research/grounding-dino-tiny" + cg_detr_checkpoint: Path = Path("model_files/best.ckpt") + amdetr_repo_id: str = "lighthouse-emnlp2024/AM-DETR" + require_forward_models: bool = True + use_ann_components: bool = True + nvidia_model: str = "nvidia/omni-embed-nemotron-3b" + + @classmethod + def from_dict(cls, value: dict[str, Any] | None) -> SNSConfig: + value = value or {} + sns_output_dir = value.get("sns_output_dir") + cg_detr_checkpoint = value.get("cg_detr_checkpoint") + cfg = cls(**{key: item for key, item in value.items() if key not in {"sns_output_dir", "cg_detr_checkpoint"}}) + if sns_output_dir: + cfg.sns_output_dir = Path(sns_output_dir) + if cg_detr_checkpoint: + cfg.cg_detr_checkpoint = Path(cg_detr_checkpoint) + return cfg + + +@dataclass +class EEEConfig: + experts: list[ExpertName] = field(default_factory=lambda: ["text-based", "fusion", "e2e"]) + backend: Literal["hybrid", "local", "api"] = "hybrid" + embedding_dim: int = 2048 + batch_size: int = 32 + text_prompt_base: str = "Describe this in detail." + text_prompt_prefix: str = "Focus specifically on the aspects highlighted in this annotation." + nvidia_api_key: str | None = None + nvidia_api_base_url: str = "https://integrate.api.nvidia.com/v1" + nvidia_text_describer_model: str = "nvidia/nemotron-nano-12b-v2-vl" + nvidia_image_describer_model: str = "nvidia/nemotron-nano-12b-v2-vl" + nvidia_video_describer_model: str = "nvidia/nemotron-nano-12b-v2-vl" + nvidia_audio_describer_model: str = "google/gemma-3n-e4b-it" + nvidia_embedding_model: str = "nvidia/llama-nemotron-embed-1b-v2" + nvidia_multimodal_model: str = "nvidia/omni-embed-nemotron-3b" + + @classmethod + def from_dict(cls, value: dict[str, Any] | None) -> EEEConfig: + value = dict(value or {}) + if "text_expert_backend" in value and "backend" not in value: + value["backend"] = value.pop("text_expert_backend") + cfg = cls(**value) + if not cfg.experts: + raise ValueError("eee.experts cannot be empty") + if cfg.backend not in {"hybrid", "local", "api"}: + raise ValueError(f"Unsupported eee.backend: {cfg.backend}") + return cfg + + +@dataclass +class ProjectionConfig: + enabled: bool = True + backend: Literal["auto", "linear", "torch"] = "auto" + num_epochs: int = 100 + batch_size: int = 128 + learning_rate: float = 1e-3 + contrastive_loss_weight: float = 0.99 + scale_loss_weight: float = 0.0001 + bias_loss_weight: float = 0.01 + contrastive_temperature: float = 0.07 + num_layers: int = 3 + hidden_layer_size: int = 512 + dropout: float = 0.1 + output_embeddings: bool = True + save_weights_path: Path | None = None + eval_recall_k: int = 10 + verbose: bool = False + + @classmethod + def from_dict(cls, value: dict[str, Any] | None) -> ProjectionConfig: + value = dict(value or {}) + save_weights_path = value.get("save_weights_path") + cfg = cls(**{key: item for key, item in value.items() if key != "save_weights_path"}) + if save_weights_path: + cfg.save_weights_path = Path(save_weights_path) + return cfg + + +@dataclass +class DatablendConfig: + query: str + top_k: int | None = None + blend_fraction: float | None = None + include_metadata: bool = True + + @classmethod + def from_dict(cls, value: dict[str, Any] | None) -> DatablendConfig: + if not value or not value.get("query"): + raise ValueError("datablend.query is required") + return cls( + query=str(value["query"]), + top_k=value.get("top_k"), + blend_fraction=value.get("blend_fraction"), + include_metadata=bool(value.get("include_metadata", True)), + ) + + +@dataclass +class RuntimeConfig: + device: str = "auto" + dtype: str = "float32" + offline_mode: bool = False + cache_dir: Path | None = None + + @classmethod + def from_dict(cls, value: dict[str, Any] | None) -> RuntimeConfig: + value = value or {} + cache_dir = value.get("cache_dir") + return cls( + device=value.get("device", "auto"), + dtype=value.get("dtype", "float32"), + offline_mode=bool(value.get("offline_mode", False)), + cache_dir=Path(cache_dir) if cache_dir else None, + ) + + +@dataclass +class ExperimentConfig: + experiment_id: str + output_dir: Path + data_pools: list[DataPoolConfig] + sns: SNSConfig + eee: EEEConfig + projection: ProjectionConfig + datablend: DatablendConfig + runtime: RuntimeConfig = field(default_factory=RuntimeConfig) + description: str = "" + embedsim_config_name: str = "" + reranking_enabled: bool = True + random_shuffle: bool = False + strict_data_validation: bool = False + downstream_eval: dict[str, Any] = field(default_factory=dict) + log_wandb: bool = False + log_local: bool = True + + @classmethod + def from_dict(cls, value: dict[str, Any]) -> ExperimentConfig: + value = dict(value) + data_pool_values = value.get("data_pools", value.get("data_pools_config", [])) + data_pools = [DataPoolConfig.from_dict(item) for item in data_pool_values] + if not data_pools: + raise ValueError("data_pools cannot be empty") + random_shuffle = bool(value.get("random_shuffle", False)) + if random_shuffle: + for pool in data_pools: + pool.shuffle = True + experiment_id = str(value.get("experiment_id", "")).strip() + if not experiment_id: + raise ValueError("experiment_id is required") + eee_value = _eee_value_from_experiment(value) + projection_value = value.get("projection", value.get("awn")) + datablend_value = ( + value.get("datablend") + or _datablend_from_downstream(value.get("downstream_eval")) + or {"query": "Describe the media content in detail"} + ) + return cls( + experiment_id=experiment_id, + description=str(value.get("description", "")), + output_dir=Path(value.get("output_dir", value.get("experiment_dir", "outputs"))), + data_pools=data_pools, + sns=SNSConfig.from_dict(value.get("sns")), + eee=EEEConfig.from_dict(eee_value), + projection=ProjectionConfig.from_dict(projection_value), + datablend=DatablendConfig.from_dict(datablend_value), + runtime=RuntimeConfig.from_dict(value.get("runtime")), + embedsim_config_name=str(value.get("embedsim_config_name", "")), + reranking_enabled=bool(value.get("reranking_enabled", True)), + random_shuffle=random_shuffle, + strict_data_validation=bool(value.get("strict_data_validation", False)), + downstream_eval=dict(value.get("downstream_eval") or {}), + log_wandb=bool(value.get("log_wandb", False)), + log_local=bool(value.get("log_local", True)), + ) + + @property + def run_dir(self) -> Path: + return self.output_dir / self.experiment_id + + def to_dict(self) -> dict[str, Any]: + value = asdict(self) + return _stringify_paths(value) + + +def _stringify_paths(value: Any) -> Any: + if isinstance(value, Path): + return str(value) + if isinstance(value, list): + return [_stringify_paths(item) for item in value] + if isinstance(value, dict): + return {key: _redact_or_stringify(key, item) for key, item in value.items()} + return value + + +def _redact_or_stringify(key: str, value: Any) -> Any: + lowered = key.lower() + if any(marker in lowered for marker in ("api_key", "token", "secret", "password")) and value: + return "***REDACTED***" + return _stringify_paths(value) + + +def _eee_value_from_experiment(value: dict[str, Any]) -> dict[str, Any] | None: + eee = dict(value.get("eee") or {}) + if "experts" in value and "experts" not in eee: + eee["experts"] = [item.value if hasattr(item, "value") else str(item) for item in value["experts"]] + if "text_expert_backend" in value and "backend" not in eee: + backend = value["text_expert_backend"] + eee["backend"] = backend.value if hasattr(backend, "value") else str(backend) + for key in ( + "text_prompt_base", + "text_prompt_prefix", + "nvidia_api_key", + "nvidia_api_base_url", + "nvidia_text_describer_model", + "nvidia_image_describer_model", + "nvidia_video_describer_model", + "nvidia_audio_describer_model", + "nvidia_embedding_model", + ): + if key in value and key not in eee: + eee[key] = value[key] + return eee or None + + +def _datablend_from_downstream(value: Any) -> dict[str, Any] | None: + if not isinstance(value, dict): + return None + query = value.get("query") + if not query: + return None + result: dict[str, Any] = {"query": query} + if value.get("train_count") is not None: + result["top_k"] = value["train_count"] + if value.get("blend_fraction") is not None: + result["blend_fraction"] = value["blend_fraction"] + return result diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/__init__.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/__init__.py new file mode 100644 index 0000000000..17b16d0dc1 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Data loading and artifact helpers.""" diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/io.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/io.py new file mode 100644 index 0000000000..4867fe8030 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/io.py @@ -0,0 +1,96 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Small stdlib I/O helpers used by pipeline stages.""" + +from __future__ import annotations + +import json +import math +import os +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Iterable + + +def ensure_dir(path: str | Path) -> Path: + path = Path(path) + path.mkdir(parents=True, exist_ok=True) + return path + + +def write_json(path: str | Path, payload: object) -> Path: + path = Path(path) + ensure_dir(path.parent) + path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") + return path + + +def write_jsonl(path: str | Path, rows: Iterable[dict]) -> Path: + path = Path(path) + ensure_dir(path.parent) + with path.open("w", encoding="utf-8") as handle: + for row in rows: + handle.write(json.dumps(row, sort_keys=True) + "\n") + return path + + +def read_jsonl(path: str | Path) -> list[dict]: + rows: list[dict] = [] + with Path(path).open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def write_npy(path: str | Path, rows: list[list[float]]) -> Path: + """Write a 2D float64 NumPy .npy file.""" + + import numpy as np + + path = Path(path) + ensure_dir(path.parent) + n_rows = len(rows) + n_cols = len(rows[0]) if rows else 0 + for row in rows: + if len(row) != n_cols: + raise ValueError("All rows must have the same length") + + array = np.empty((0, 0), dtype=np.float64) if n_rows == 0 else np.asarray(rows, dtype=np.float64) + with path.open("wb") as handle: + np.save(handle, array, allow_pickle=False) + return path + + +def cosine_similarity(left: list[float], right: list[float]) -> float: + if len(left) != len(right): + raise ValueError("Vectors must have matching dimensions") + dot = sum(a * b for a, b in zip(left, right)) + left_norm = math.sqrt(sum(a * a for a in left)) + right_norm = math.sqrt(sum(b * b for b in right)) + if left_norm == 0 or right_norm == 0: + return 0.0 + return dot / (left_norm * right_norm) + + +def stable_relpath(path: str | Path, start: str | Path | None = None) -> str: + path = Path(path) + try: + return os.path.relpath(path, start=start or Path.cwd()) + except ValueError: + return str(path) diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/loader.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/loader.py new file mode 100644 index 0000000000..f043296933 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/data/loader.py @@ -0,0 +1,136 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Paired data-pool loader.""" + +from __future__ import annotations + +import csv +import hashlib +import json +import random +from pathlib import Path +from typing import Any + +from omnifuse_tutorial.config.models import DataPoolConfig + + +def load_pool_records(pool: DataPoolConfig) -> list[dict[str, Any]]: + mapping_path = pool.root_dir / pool.mapping_file + if not mapping_path.exists(): + raise FileNotFoundError(f"Mapping file not found: {mapping_path}") + + if mapping_path.suffix.lower() == ".csv": + mapping_rows = _read_csv(mapping_path) + else: + mapping_rows = _read_jsonl(mapping_path) + + records = [_normalize_mapping_row(pool, row, index) for index, row in enumerate(mapping_rows)] + records = [record for record in records if record is not None] + if pool.shuffle: + rng = random.Random(0) # noqa: S311 - deterministic tutorial sampling, not security. + rng.shuffle(records) + if pool.n_samples is not None: + records = records[: pool.n_samples] + return records + + +def load_all_pools(pools: list[DataPoolConfig]) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + for pool in pools: + records.extend(load_pool_records(pool)) + return records + + +def _read_jsonl(path: Path) -> list[dict[str, Any]]: + rows: list[dict[str, Any]] = [] + with path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def _read_csv(path: Path) -> list[dict[str, Any]]: + with path.open("r", encoding="utf-8", newline="") as handle: + return [dict(row) for row in csv.DictReader(handle)] + + +def _normalize_mapping_row(pool: DataPoolConfig, row: dict[str, Any], index: int) -> dict[str, Any] | None: + raw_rel = row.get("data_path") or row.get("raw_path") or row.get("path") + ann_rel = row.get("annotation_path") or row.get("caption_path") or row.get("label_path") + annotation_text = row.get("annotation") or row.get("caption") or row.get("text") + if not raw_rel: + raise ValueError(f"Mapping row missing data_path/raw_path/path in pool {pool.name}: {row}") + raw_path = _resolve(pool.root_dir, str(raw_rel)) + if not raw_path.exists(): + raise FileNotFoundError(f"Raw data file not found for pool {pool.name}: {raw_path}") + + if pool.max_file_size_mb is not None and raw_path.exists(): + max_bytes = pool.max_file_size_mb * 1024 * 1024 + if raw_path.stat().st_size > max_bytes: + return None + + annotation_path = _resolve(pool.root_dir, str(ann_rel)) if ann_rel else None + if annotation_text is None and annotation_path: + annotation_text = annotation_path.read_text(encoding="utf-8").strip() + if annotation_text is None: + raise ValueError(f"Mapping row has no annotation text/path in pool {pool.name}: {row}") + + raw_text = None + if pool.modality == "text" and raw_path.exists(): + raw_text = raw_path.read_text(encoding="utf-8").strip() + + record_id = row.get("id") or _stable_id(pool.name, str(raw_rel), str(ann_rel or annotation_text)) + metadata = { + key: value + for key, value in row.items() + if key + not in { + "id", + "data_path", + "raw_path", + "path", + "annotation_path", + "caption_path", + "label_path", + "annotation", + "caption", + "text", + } + } + return { + "pair_id": str(record_id), + "pool": pool.name, + "pool_index": index, + "modality": pool.modality, + "raw_path": str(raw_path), + "annotation_path": str(annotation_path) if annotation_path else None, + "annotation": str(annotation_text).strip(), + "raw_text": raw_text, + "metadata": metadata, + } + + +def _resolve(root: Path, value: str) -> Path: + path = Path(value) + if path.is_absolute(): + return path + return (root / path).resolve() + + +def _stable_id(*parts: str) -> str: + digest = hashlib.sha256("\x1f".join(parts).encode("utf-8")).hexdigest() + return digest[:16] diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/datablend/__init__.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/datablend/__init__.py new file mode 100644 index 0000000000..f03e9c88c6 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/datablend/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Datablend export package.""" diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/datablend/ranker.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/datablend/ranker.py new file mode 100644 index 0000000000..2f934d0976 --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/datablend/ranker.py @@ -0,0 +1,64 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Query-based datablend ranking.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from omnifuse_tutorial.config.models import DatablendConfig +from omnifuse_tutorial.data.io import cosine_similarity +from omnifuse_tutorial.eee.backends import EEEBackend +from omnifuse_tutorial.projection.trainer import ProjectionResult + + +@dataclass +class DatablendRanker: + config: DatablendConfig + backend: EEEBackend + + def rank(self, records: list[dict[str, Any]], projection: ProjectionResult) -> list[dict[str, Any]]: + query_embedding = self.backend.embed_query(self.config.query, expert="text-based") + rows: list[dict[str, Any]] = [] + for index, (record, projected) in enumerate(zip(records, projection.projected_raw)): + score = cosine_similarity(projected, query_embedding) + row = { + "rank": 0, + "score": score, + "pair_id": record["pair_id"], + "pool": record["pool"], + "modality": record["modality"], + "raw_path": record["raw_path"], + "annotation": record.get("sns_annotation") or record.get("annotation"), + "original_annotation": record.get("annotation"), + "source_index": index, + } + if self.config.include_metadata: + row["metadata"] = record.get("metadata", {}) + rows.append(row) + + rows.sort(key=lambda item: item["score"], reverse=True) + for rank, row in enumerate(rows, start=1): + row["rank"] = rank + return rows + + def select_top(self, ranked: list[dict[str, Any]]) -> list[dict[str, Any]]: + if self.config.top_k is not None: + return ranked[: self.config.top_k] + if self.config.blend_fraction is not None: + count = max(1, int(len(ranked) * self.config.blend_fraction)) + return ranked[:count] + return ranked diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/eee/__init__.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/eee/__init__.py new file mode 100644 index 0000000000..1a60e555cb --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/eee/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Expert embedding engine package.""" diff --git a/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/eee/backends.py b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/eee/backends.py new file mode 100644 index 0000000000..aaf09d3a6b --- /dev/null +++ b/tutorials/multimodal/omni-fuse-data-curation/omnifuse_tutorial/eee/backends.py @@ -0,0 +1,715 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Embedding backends for the Expert Embedding Engine.""" + +from __future__ import annotations + +import base64 +import hashlib +import io +import math +import os +import re +import time +import wave +from collections.abc import Callable +from pathlib import Path +from typing import Any, Literal, Protocol + + +class EEEBackend(Protocol): + def embed_raw(self, record: dict[str, Any], expert: str) -> list[float]: ... + + def embed_annotation(self, record: dict[str, Any], expert: str) -> list[float]: ... + + def embed_query(self, query: str, expert: str = "text-based") -> list[float]: ... + + +BackendName = Literal["hybrid", "local", "api"] +BackendFactory = Callable[[Any, Any], EEEBackend] + +IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"} +VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v"} +AUDIO_EXTENSIONS = {".wav", ".mp3"} +TEXT_EXTENSIONS = {".txt", ".md", ".json", ".csv"} +SUPPORTED_EXPERTS = {"text-based", "fusion", "e2e"} +PHI4_MULTIMODAL_MODEL = "microsoft/phi-4-multimodal-instruct" +GEMMA_3N_E4B_MODEL = "google/gemma-3n-e4b-it" +AUDIO_URL_CHAT_MODELS = {PHI4_MULTIMODAL_MODEL, GEMMA_3N_E4B_MODEL} +NVCF_ASSET_UPLOAD_THRESHOLD_BYTES = 180 * 1024 +AUDIO_INLINE_PREVIEW_BYTES = 160 * 1024 +NVCF_ASSET_BASE_URL = "https://api.nvcf.nvidia.com/v2/nvcf" +MIME_TYPES = { + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".png": "image/png", + ".webp": "image/webp", + ".bmp": "image/bmp", + ".gif": "image/gif", + ".wav": "audio/wav", + ".mp3": "audio/mpeg", + ".mp4": "video/mp4", + ".mov": "video/quicktime", + ".avi": "video/x-msvideo", + ".mkv": "video/x-matroska", + ".webm": "video/webm", + ".m4v": "video/x-m4v", +} + + +class LocalEEEBackend: + """Full local EEE backend: text expert, LanguageBind fusion, Omni-Embed e2e.""" + + def __init__(self, config: Any | None = None, runtime: Any | None = None, embedding_dim: int = 2048): + from omnifuse_tutorial.eee.local_models import FullLocalEEEBackend + + self._backend = FullLocalEEEBackend(config=config, runtime=runtime, embedding_dim=embedding_dim) + + def embed_raw(self, record: dict[str, Any], expert: str) -> list[float]: + return self._backend.embed_raw(record, expert) + + def embed_annotation(self, record: dict[str, Any], expert: str) -> list[float]: + return self._backend.embed_annotation(record, expert) + + def embed_query(self, query: str, expert: str = "text-based") -> list[float]: + return self._backend.embed_query(query, expert) + + def unload(self) -> None: + self._backend.unload() + + +class NvidiaApiEEEBackend: + """NVIDIA API backend aligned with EmbedSim's API text expert path.""" + + def __init__( + self, + embedding_dim: int = 2048, + api_key: str | None = None, + api_base_url: str = "https://integrate.api.nvidia.com/v1", + text_model: str = "nvidia/nemotron-nano-12b-v2-vl", + image_model: str = "nvidia/nemotron-nano-12b-v2-vl", + video_model: str = "nvidia/nemotron-nano-12b-v2-vl", + audio_model: str = GEMMA_3N_E4B_MODEL, + embedding_model: str = "nvidia/llama-nemotron-embed-1b-v2", + timeout: int = 120, + batch_size: int = 4, + ): + self.embedding_dim = embedding_dim + self.api_key = api_key or os.environ.get("NV_BUILD_API_KEY") or os.environ.get("NVIDIA_API_KEY") + if not self.api_key: + raise ValueError("NVIDIA API key required. Set eee.nvidia_api_key or NV_BUILD_API_KEY.") + self.api_base_url = os.environ.get("NVIDIA_API_BASE_URL", api_base_url).rstrip("/") + self.text_model = text_model + self.image_model = image_model + self.video_model = video_model + self.audio_model = audio_model + self.embedding_model = embedding_model + self.timeout = timeout + self.batch_size = max(1, min(int(batch_size), 16)) + + def embed_raw(self, record: dict[str, Any], expert: str) -> list[float]: + _validate_expert(expert) + if expert == "text-based": + text = self._describe_raw(record) + else: + # The sibling API toggle is primarily for the text expert. For the + # other experts, preserve distinct expert spaces by embedding a + # modality-aware textual representation through the API encoder. + text = _raw_feature_text(record) + return self._embed_text(text, expert) + + def embed_annotation(self, record: dict[str, Any], expert: str) -> list[float]: + _validate_expert(expert) + annotation = _text_or_empty(record.get("sns_annotation")) or _text_or_empty(record.get("annotation")) + return self._embed_text(annotation, expert) + + def embed_query(self, query: str, expert: str = "text-based") -> list[float]: + _validate_expert(expert) + return self._embed_text(query, expert) + + def _describe_raw(self, record: dict[str, Any]) -> str: + modality = str(record.get("modality") or "text") + raw_text = _text_or_empty(record.get("sns_raw_text")) or _text_or_empty(record.get("raw_text")) + if raw_text: + return raw_text + if modality == "text": + return _read_text_path(record.get("raw_path")) or _raw_feature_text(record) + + raw_path = _path_or_none(record.get("raw_path")) + if raw_path is None or not raw_path.exists(): + return _raw_feature_text(record) + + prompt = _prompt_for_modality(modality) + if modality == "image": + return self._describe_file(raw_path, self.image_model, "image_url", prompt) + if modality == "audio": + return self._describe_file(raw_path, self.audio_model, "input_audio", prompt) + if modality == "video": + return self._describe_file(raw_path, self.video_model, "video_url", prompt) + return _raw_feature_text(record) + + def _describe_file(self, path: Path, model: str, content_type: str, prompt: str) -> str: + return describe_file_with_nvidia_api( + path=path, + model=model, + content_type=content_type, + prompt=prompt, + api_base_url=self.api_base_url, + headers=self._headers(), + timeout=self.timeout, + ) + + def _embed_text(self, text: str, expert: str) -> list[float]: + import requests + + response = requests.post( + f"{self.api_base_url}/embeddings", + headers=self._headers(), + json={ + "model": self.embedding_model, + "input": [text], + "input_type": "query" if expert == "text-based" else "passage", + "encoding_format": "float", + "truncate": "END", + }, + timeout=self.timeout, + ) + response.raise_for_status() + vector = response.json()["data"][0]["embedding"] + return _resize_and_normalize([float(item) for item in vector], self.embedding_dim) + + def _headers(self) -> dict[str, str]: + return { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + +class HybridEEEBackend: + """API-first EEE backend with local fusion and true multimodal experts. + + The text-based expert uses NVIDIA API descriptions and text embeddings. + The fusion and e2e experts stay local because the current tutorial code + uses LanguageBind and Omni-Embed-Nemotron for those actual expert roles. + """ + + def __init__(self, config: Any, runtime: Any | None = None): + self.api = NvidiaApiEEEBackend( + embedding_dim=int(getattr(config, "embedding_dim", 2048)), + api_key=getattr(config, "nvidia_api_key", None), + api_base_url=getattr(config, "nvidia_api_base_url", "https://integrate.api.nvidia.com/v1"), + text_model=getattr(config, "nvidia_text_describer_model", "nvidia/nemotron-nano-12b-v2-vl"), + image_model=getattr(config, "nvidia_image_describer_model", "nvidia/nemotron-nano-12b-v2-vl"), + video_model=getattr(config, "nvidia_video_describer_model", "nvidia/nemotron-nano-12b-v2-vl"), + audio_model=getattr(config, "nvidia_audio_describer_model", GEMMA_3N_E4B_MODEL), + embedding_model=getattr(config, "nvidia_embedding_model", "nvidia/llama-nemotron-embed-1b-v2"), + batch_size=int(getattr(config, "batch_size", 4)), + ) + self.local = LocalEEEBackend( + config=config, + runtime=runtime, + embedding_dim=int(getattr(config, "embedding_dim", 2048)), + ) + + def embed_raw(self, record: dict[str, Any], expert: str) -> list[float]: + expert = _validate_expert(expert) + if expert == "text-based": + return self.api.embed_raw(record, expert) + return self.local.embed_raw(record, expert) + + def embed_annotation(self, record: dict[str, Any], expert: str) -> list[float]: + expert = _validate_expert(expert) + if expert == "text-based": + return self.api.embed_annotation(record, expert) + return self.local.embed_annotation(record, expert) + + def embed_query(self, query: str, expert: str = "text-based") -> list[float]: + expert = _validate_expert(expert) + if expert == "text-based": + return self.api.embed_query(query, expert) + return self.local.embed_query(query, expert) + + def describe_record(self, record: dict[str, Any]) -> str: + return self.api._describe_raw(record) + + def unload(self) -> None: + self.local.unload() + + +def describe_file_with_nvidia_api( + *, + path: Path, + model: str, + content_type: str, + prompt: str, + api_base_url: str, + headers: dict[str, str], + timeout: int, +) -> str: + if content_type == "input_audio" and model in AUDIO_URL_CHAT_MODELS: + return _describe_audio_url_chat_file(path, model, prompt, api_base_url, headers, timeout) + return _describe_chat_completion_file(path, model, content_type, prompt, api_base_url, headers, timeout) + + +def _describe_chat_completion_file( + path: Path, + model: str, + content_type: str, + prompt: str, + api_base_url: str, + headers: dict[str, str], + timeout: int, +) -> str: + mime = MIME_TYPES.get(path.suffix.lower(), "application/octet-stream") + if content_type == "input_audio": + encoded = base64.b64encode(path.read_bytes()).decode("utf-8") + media_content = {"type": "input_audio", "input_audio": {"data": encoded, "format": _audio_format(path)}} + content: str | list[dict[str, Any]] = [{"type": "text", "text": prompt}, media_content] + request_headers = headers + elif content_type in {"image_url", "video_url"} and path.stat().st_size > NVCF_ASSET_UPLOAD_THRESHOLD_BYTES: + asset_id = _upload_nvcf_asset(path, mime, headers, timeout) + request_headers = _headers_with_nvcf_asset(headers, asset_id) + tag_name = "img" if content_type == "image_url" else "video" + content = f'{prompt}\n<{tag_name} src="data:{mime};asset_id,{asset_id}" />' + else: + encoded = base64.b64encode(path.read_bytes()).decode("utf-8") + media_content = {content_type: {"url": f"data:{mime};base64,{encoded}"}, "type": content_type} + content = [{"type": "text", "text": prompt}, media_content] + request_headers = headers + url = f"{api_base_url}/chat/completions" + payload = { + "model": model, + "messages": [{"role": "user", "content": content}], + "max_tokens": 512, + "temperature": 0.2, + "stream": False, + } + response = _post_nvidia_json_with_retries(url, request_headers, payload, timeout) + response = _resolve_nvidia_response(response, api_base_url, request_headers, timeout, model, url) + return _response_text(response.json(), model, url) + + +def _describe_audio_url_chat_file( + path: Path, + model: str, + prompt: str, + api_base_url: str, + headers: dict[str, str], + timeout: int, +) -> str: + audio_format = _audio_format(path) + mime = "audio/wav" if audio_format == "wav" else "audio/mpeg" + request_headers = dict(headers) + used_asset = False + if path.stat().st_size > NVCF_ASSET_UPLOAD_THRESHOLD_BYTES: + asset_id = _upload_nvcf_asset(path, mime, headers, timeout) + request_headers = _headers_with_nvcf_asset(headers, asset_id) + used_asset = True + content: str | list[dict[str, Any]] = f'{prompt}\n