Skip to content

Aaftabv/qwen1967 local noio control#2093

Draft
mohammadaaftabv wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
mohammadaaftabv:aaftabv/qwen1967-local-noio-control
Draft

Aaftabv/qwen1967 local noio control#2093
mohammadaaftabv wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
mohammadaaftabv:aaftabv/qwen1967-local-noio-control

Conversation

@mohammadaaftabv

Copy link
Copy Markdown
Contributor

Description

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

@copy-pr-bot

copy-pr-bot Bot commented Jun 22, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@mohammadaaftabv mohammadaaftabv marked this pull request as draft June 22, 2026 11:07
@greptile-apps

greptile-apps Bot commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces end-to-end in-process audio ASR inference using Qwen3-Omni via vLLM, along with the scaffolding needed to run it efficiently at scale: a duration-aware bucketing scheduler (BatchPolicy / BucketQueueScheduler), a NeMo tarred dataset reader, a PayloadLifecycleGroupStage for memory-bounded waveform handling, worker-level GPU utilization sampling, and perf identity tracking across Ray/Xenna backends.

  • ASR pipeline: ASRStage (pluggable adapter) + QwenOmniASRAdapter (two-turn vLLM inference) with centralized chunk scheduling, result stitching, and language mapping.
  • Infra additions: NemoTarShardDiscoveryStage / NemoTarShardReaderStage for streaming NeMo tarred datasets; PayloadLifecycleGroupStage with a Ray actor admission controller to cap node memory; GpuUtilSampler for per-GPU windowed utilization metrics; worker perf identity stamped on every output task for attribution in AudioPerformanceSummary.
  • Backend changes: BaseStageAdapter gains centralized scheduler plan execution and TaskWindow support; Ray Data and Xenna adapters gain upstream prebatching and per-stage verbosity configuration.

Confidence Score: 4/5

The core inference and scheduling logic is well-structured and guarded; the main risks are observability gaps, a potential indefinite hang in the admission controller under worker crash, and a fragile private-module monkey-patch in the Xenna executor.

The new code is carefully guarded against most edge cases (empty batches, missing deps, teardown failures). All findings are quality/observability issues rather than correctness bugs on the hot path: silent NVML error suppression, a cross-module private-symbol import, a missing None guard in verbosity config parsing, a backwards brace-range expansion that silently emits zero shards, and a named Ray actor that retains stale budget after a worker crash.

nemo_curator/stages/payload_lifecycle.py (admission spin with no timeout), nemo_curator/backends/xenna/executor.py (private-module patch and verbosity None guard), nemo_curator/utils/gpu_sampler.py (silent NVML error suppression)

Important Files Changed

Filename Overview
nemo_curator/stages/audio/inference/asr/stage.py Generic ASR Curator stage with pluggable adapter; handles chunking, bucketing, chunk-result stitching, and language mapping. Guard in process_batch correctly enforces the scheduler path when batch_policy is enabled.
nemo_curator/models/asr/qwen_omni.py New Qwen3-Omni in-process vLLM ASR adapter with two-turn inference. Solid guards around missing deps, teardown, and empty-batch cases; trust_remote_code=True is hardcoded but expected for Qwen.
nemo_curator/stages/audio/io/nemo_tarred_reader.py NeMo tarred dataset reader with shard discovery and streaming audio decode; collision-safe ManifestIndex, resume support, and decode-based duration enforcement. Backwards brace-range expansion silently emits no paths.
nemo_curator/stages/payload_lifecycle.py New PayloadLifecycleGroupStage with byte-token admission actor for memory-bounded waveform lifecycle. Polling loop in _acquire has no timeout; a worker crash can leave stale budget in the named actor causing indefinite hangs.
nemo_curator/utils/gpu_sampler.py New background NVML GPU utilization sampler with per-GPU windowed mean. GPU read errors inside _loop are silently swallowed with no log, making transient NVML failures invisible.
nemo_curator/backends/base.py Extended with centralized scheduler plan types, TaskWindow processing, GPU sampler integration, and perf identity stamping; backward-compatible with existing stages.
nemo_curator/stages/audio/metrics/performance.py New 900-line audio pipeline performance summary; imports private _norm_uuid from gpu_sampler across module boundaries creating fragile coupling.
nemo_curator/backends/xenna/executor.py Adds verbosity config helpers, monitoring fail-open monkey-patch, and quieter defaults. _get_verbosity_config has no guard for None values; monkey-patch targets a private module with no stability contract.
nemo_curator/models/vllm_model.py Refactored to extract VLLMBase shared engine management; class-level _llm/_sampling_params are correctly shadowed by instance attributes on assignment.
nemo_curator/backends/ray_data/adapter.py Extended with prebatch planning for upstream batching; cleanly splits centralized-batching and preplanned-batch flows via separate map_batches functions.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant E as Executor (Xenna/RayData)
    participant BA as BaseStageAdapter
    participant AS as ASRStage
    participant BP as BatchPolicy (BucketQueueScheduler)
    participant AD as QwenOmniASRAdapter
    participant GS as GpuUtilSampler

    E->>AS: setup_on_node() - prefetch_weights (no GPU)
    E->>AS: setup() - adapter.setup() (load vLLM engine)
    GS-->>BA: background NVML polling starts

    E->>BA: process_batch(tasks)
    BA->>BA: build_scheduled_task_batch_plan(stage, tasks)
    BA->>AS: build_prebucketed_tasks(tasks) - chunk tasks
    BA->>BP: bucketize_with_costs(chunks, cost_fn) - SchedulerReadyBatches

    loop For each SchedulerReadyBatch
        BA->>BA: process_scheduler_ready_batch(ready_batch)
        BA->>AS: stage.process_batch(chunk_tasks)
        AS->>AD: adapter.transcribe_batch(items) [Turn 1]
        AD-->>AS: ASRResult list (pred_texts)
        AS->>AD: adapter.transcribe_batch(items) [Turn 2 if followup_prompt]
        AD-->>AS: ASRResult list (disfluency_texts)
        AS-->>BA: processed chunk tasks
        BA->>GS: window_stats(t0, t1) - GPU util metrics
        BA->>BA: stamp perf identity + GPU metrics on tasks
    end

    BA->>AS: assemble_prebucketed_task_results(parent_tasks, chunks)
    AS-->>E: assembled parent AudioTasks (pred_text written)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant E as Executor (Xenna/RayData)
    participant BA as BaseStageAdapter
    participant AS as ASRStage
    participant BP as BatchPolicy (BucketQueueScheduler)
    participant AD as QwenOmniASRAdapter
    participant GS as GpuUtilSampler

    E->>AS: setup_on_node() - prefetch_weights (no GPU)
    E->>AS: setup() - adapter.setup() (load vLLM engine)
    GS-->>BA: background NVML polling starts

    E->>BA: process_batch(tasks)
    BA->>BA: build_scheduled_task_batch_plan(stage, tasks)
    BA->>AS: build_prebucketed_tasks(tasks) - chunk tasks
    BA->>BP: bucketize_with_costs(chunks, cost_fn) - SchedulerReadyBatches

    loop For each SchedulerReadyBatch
        BA->>BA: process_scheduler_ready_batch(ready_batch)
        BA->>AS: stage.process_batch(chunk_tasks)
        AS->>AD: adapter.transcribe_batch(items) [Turn 1]
        AD-->>AS: ASRResult list (pred_texts)
        AS->>AD: adapter.transcribe_batch(items) [Turn 2 if followup_prompt]
        AD-->>AS: ASRResult list (disfluency_texts)
        AS-->>BA: processed chunk tasks
        BA->>GS: window_stats(t0, t1) - GPU util metrics
        BA->>BA: stamp perf identity + GPU metrics on tasks
    end

    BA->>AS: assemble_prebucketed_task_results(parent_tasks, chunks)
    AS-->>E: assembled parent AudioTasks (pred_text written)
Loading

Reviews (1): Last reviewed commit: "Enable eager vLLM mode for Qwen Omni" | Re-trigger Greptile

Comment on lines +88 to +93
try:
utils[k] = float(pynvml.nvmlDeviceGetUtilizationRates(handle).gpu)
mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
mems[k] = 100.0 * float(mem.used) / float(mem.total) if mem.total else 0.0
except Exception: # noqa: BLE001
continue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 GPU read errors in the sampling loop are silently swallowed with no log message. A recurring NVML failure (e.g., a driver bug or reset) would show up only as missing metrics with no diagnostic trace, making it very hard to distinguish "no samples in window" from "NVML kept failing".

Suggested change
try:
utils[k] = float(pynvml.nvmlDeviceGetUtilizationRates(handle).gpu)
mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
mems[k] = 100.0 * float(mem.used) / float(mem.total) if mem.total else 0.0
except Exception: # noqa: BLE001
continue
try:
utils[k] = float(pynvml.nvmlDeviceGetUtilizationRates(handle).gpu)
mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
mems[k] = 100.0 * float(mem.used) / float(mem.total) if mem.total else 0.0
except Exception as exc: # noqa: BLE001
logger.debug("GpuUtilSampler: NVML read error for handle {}: {}", k, exc)
continue

summarize_samples,
)
from nemo_curator.tasks import Task
from nemo_curator.utils.gpu_sampler import _norm_uuid

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Importing _norm_uuid (a private symbol denoted by the leading underscore) from gpu_sampler across module boundaries creates tight coupling. If the function is renamed or moved during a refactor, this silent inter-module dependency breaks without a clear import error at the call site. The function should either be made public (rename to norm_uuid) or re-declared in a shared utility module.

Suggested change
from nemo_curator.utils.gpu_sampler import _norm_uuid
from nemo_curator.utils.gpu_sampler import norm_uuid as _norm_uuid # see gpu_sampler.py

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment thread nemo_curator/backends/xenna/executor.py Outdated
Comment on lines +204 to +211
def _get_verbosity_config(self, key: str) -> VerbosityLevel:
"""Get Xenna verbosity level from enum, integer, or string config."""
value = self._get_pipeline_config(key)
if isinstance(value, VerbosityLevel):
return value
if isinstance(value, str):
return VerbosityLevel[value.upper()]
return VerbosityLevel(value)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _get_verbosity_config falls through to VerbosityLevel(value) when value is neither a VerbosityLevel nor a str. If a user sets any verbosity key to null in their YAML, yaml.safe_load returns None, and VerbosityLevel(None) raises ValueError — not a graceful error. A None guard or a fallback to the default string prevents this.

Suggested change
def _get_verbosity_config(self, key: str) -> VerbosityLevel:
"""Get Xenna verbosity level from enum, integer, or string config."""
value = self._get_pipeline_config(key)
if isinstance(value, VerbosityLevel):
return value
if isinstance(value, str):
return VerbosityLevel[value.upper()]
return VerbosityLevel(value)
def _get_verbosity_config(self, key: str) -> VerbosityLevel:
"""Get Xenna verbosity level from enum, integer, or string config."""
value = self._get_pipeline_config(key)
if isinstance(value, VerbosityLevel):
return value
if isinstance(value, str):
return VerbosityLevel[value.upper()]
if value is None:
default = self._default_pipeline_config.get(key, "NONE")
return VerbosityLevel[str(default).upper()]
return VerbosityLevel(value)

Comment on lines +297 to +303
def _acquire(self, amount_bytes: int) -> None:
if amount_bytes <= 0:
return
while True:
if self._ray_get(self._admission.try_acquire.remote(self._node_id, amount_bytes)):
return
time.sleep(self.admission_poll_interval_s)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unbounded admission spin with no timeout or deadline

_acquire loops forever polling try_acquire every admission_poll_interval_s. If a pipeline worker crashes between _acquire and the matching finally: _release — on a SIGKILL the finally block may not run — the admission actor retains the unreleased budget permanently for that node, and all other workers waiting on that node's budget hang indefinitely. The named actor (get_if_exists=True) persists the stale state across pipeline restarts in the same Ray session. Adding a configurable max_wait_s or clearing the actor state on pipeline initialization would prevent this hang.

Comment on lines +42 to +50
def _expand_nemo_path(pattern: str) -> list[str]:
"""Expand NeMo brace patterns like ``__OP_0..N_CL_``."""
match = re.search(r"_OP_(\d+)\.\.(\d+)_CL_", pattern)
if not match:
return [pattern]
start, end = int(match.group(1)), int(match.group(2))
prefix = pattern[: match.start()]
suffix = pattern[match.end() :]
return [f"{prefix}{i}{suffix}" for i in range(start, end + 1)]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _expand_nemo_path silently returns an empty list when the brace-range end is less than the start (e.g. _OP_5..3_CL_). The subsequent len(manifest_paths) != len(tar_paths) check would not catch this if both patterns produce the same empty expansion — the pipeline would emit zero tasks for that corpus with no warning. A guard that logs on a backwards range surfaces this misconfiguration clearly.

Suggested change
def _expand_nemo_path(pattern: str) -> list[str]:
"""Expand NeMo brace patterns like ``__OP_0..N_CL_``."""
match = re.search(r"_OP_(\d+)\.\.(\d+)_CL_", pattern)
if not match:
return [pattern]
start, end = int(match.group(1)), int(match.group(2))
prefix = pattern[: match.start()]
suffix = pattern[match.end() :]
return [f"{prefix}{i}{suffix}" for i in range(start, end + 1)]
def _expand_nemo_path(pattern: str) -> list[str]:
"""Expand NeMo brace patterns like ``__OP_0..N_CL_``."""
match = re.search(r"_OP_(\d+)\.\.(\d+)_CL_", pattern)
if not match:
return [pattern]
start, end = int(match.group(1)), int(match.group(2))
if end < start:
logger.warning(
"NeMo path pattern {} has backwards range ({}..{}); no paths emitted",
pattern,
start,
end,
)
return []
prefix = pattern[: match.start()]
suffix = pattern[match.end() :]
return [f"{prefix}{i}{suffix}" for i in range(start, end + 1)]

Comment thread nemo_curator/backends/xenna/executor.py Outdated
Comment on lines +22 to +49
from nemo_curator.backends.base import (
BaseExecutor,
)
from nemo_curator.backends.utils import register_loguru_serializer
from nemo_curator.backends.xenna.adapter import create_named_xenna_stage_adapter
from nemo_curator.backends.xenna.adapter import (
create_named_xenna_stage_adapter,
)
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import EmptyTask, Task
from nemo_curator.tasks import EmptyTask, Task, flatten_task_windows


def _patch_xenna_monitoring_fail_open() -> None:
"""Keep non-critical Xenna resource monitoring from failing the pipeline."""
try:
from cosmos_xenna.pipelines.private import monitoring
except Exception as exc: # noqa: BLE001
logger.debug(f"Could not import Xenna monitoring for fail-open patch: {exc}")
return

if getattr(monitoring.RayResourceMonitor.update, "_curator_fail_open", False):
return

original_update = monitoring.RayResourceMonitor.update

def update_fail_open(self): # noqa: ANN001, ANN202
try:
return original_update(self)
except Exception as exc: # noqa: BLE001

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Monkey-patching a private Xenna module at runtime

_patch_xenna_monitoring_fail_open imports from cosmos_xenna.pipelines.private — a module with no stability contract — and replaces RayResourceMonitor.update on the class. If _node_ids is renamed or the return-type contract changes in a Xenna update, the patch silently returns a wrong value ({}) instead of failing loudly. Consider tracking this as a known Xenna issue to be removed once the upstream fix lands, and verifying whether {node_id: None for ...} is the correct fallback return value.

Preserve the backend-visible payload lifecycle and local windowed bucketing
while integrating current main backend and pipeline semantics.

Harden payload admission liveness, reject reversed NeMo brace ranges, scope
Qwen-ASR helper dependencies to the CUDA audio extra, regenerate the lockfile,
and bring all changed Python files to a clean Ruff baseline.

Signed-off-by: Mohammad Aaftab <aaftaabv@gmail.com>
@mohammadaaftabv mohammadaaftabv force-pushed the aaftabv/qwen1967-local-noio-control branch from 968792a to 8c7a2ff Compare June 28, 2026 19:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant