Skip to content

Commit be338ea

Browse files
authored
Merge branch 'main' into slurm_array
2 parents 3fc2dbc + 40987a3 commit be338ea

15 files changed

Lines changed: 319 additions & 21 deletions

File tree

.github/workflows/cicd-main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
strategy:
6363
fail-fast: false
6464
matrix:
65-
folder: ["backends", "config", "core", "models", "pipelines", "stages-audio", "stages-common", "stages-deduplication", "stages-image", "stages-interleaved", "stages-math_stages", "stages-synthetic", "stages-text", "stages-video", "eval", "tasks", "utils"]
65+
folder: ["backends", "benchmarking", "config", "core", "models", "pipelines", "stages-audio", "stages-common", "stages-deduplication", "stages-image", "stages-interleaved", "stages-math_stages", "stages-synthetic", "stages-text", "stages-video", "eval", "tasks", "utils"]
6666
needs: [pre-flight, cicd-wait-in-queue]
6767
runs-on: ubuntu-latest
6868
name: Unit_Test_${{ matrix.folder }}_CPU

benchmarking/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,13 @@ paths:
145145
host_path: /path/to/model_weights
146146
container_path: /model_weights # optional override
147147

148-
# Optional: Global timeout for all entries (seconds)
148+
# Optional: Global timeout for entries that omit timeout_s (seconds)
149149
default_timeout_s: 7200
150150

151+
# Optional: Maximum allowed effective timeout for any entry (seconds).
152+
# Defaults to 14340 (3h59m).
153+
max_timeout_s: 14340
154+
151155
# Optional: Delete scratch directories after each entry completes
152156
# The path {session_entry_dir}/scratch is automatically created when an entry starts and can be used by benchmark
153157
#scripts for writing temp files. This directory is automatically cleaned up on completion of the entry if

benchmarking/nightly-benchmark.yaml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,11 @@ datasets:
127127
path: "{datasets_path}/fleurs"
128128
# Timeout knobs consumed by tools/generate_ci_tests.py to compute each SLURM job's wall-clock:
129129
# max(entry.timeout_s (or default_timeout_s) + cleanup_timeout_s, min_timeout_s)
130-
# default_timeout_s: per-entry default; cleanup_timeout_s: post-run cleanup buffer; min_timeout_s:
131-
# floor covering container setup. (default_timeout_s is also the runner's per-entry default.)
130+
# default_timeout_s: per-entry default; max_timeout_s: per-entry ceiling after config merging;
131+
# cleanup_timeout_s: post-run cleanup buffer; min_timeout_s: floor covering container setup.
132+
# max_timeout_s is 3h59m so a max-length entry plus the 60s cleanup buffer fits in 4h.
132133
default_timeout_s: 7200
134+
max_timeout_s: 14340
133135
cleanup_timeout_s: 60
134136
min_timeout_s: 600
135137

@@ -739,10 +741,10 @@ entries:
739741
- name: audio_readspeech_xenna
740742
enabled: true
741743
script: audio_readspeech_benchmark.py
742-
# Hang guard only (~4h). Full dataset with all filters runs ~3.24h on 4×A100.
744+
# Hang guard only (3h30m). Full dataset with all filters runs ~3.24h on 4×A100.
743745
# Performance regressions should be tracked via a `requirements` check on
744746
# `time_taken_s` once a stable baseline is established (follow-up).
745-
timeout_s: 14400
747+
timeout_s: 12600
746748
args: >-
747749
--benchmark-results-path={session_entry_dir}
748750
--scratch-output-path={session_entry_dir}/scratch
@@ -775,9 +777,9 @@ entries:
775777
- name: audio_readspeech_raydata
776778
enabled: true
777779
script: audio_readspeech_benchmark.py
778-
# Hang guard only (~4h). Mirrors xenna config so the two executors are
780+
# Hang guard only (3h30m). Mirrors xenna config so the two executors are
779781
# directly comparable on the same workload (full dataset, all filters).
780-
timeout_s: 14400
782+
timeout_s: 12600
781783
args: >-
782784
--benchmark-results-path={session_entry_dir}
783785
--scratch-output-path={session_entry_dir}/scratch

benchmarking/runner/session.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ class Session:
4141
entries: list[Entry] = field(default_factory=list)
4242
sinks: list[Sink] = field(default_factory=list)
4343
default_timeout_s: int = 7200
44+
# Maximum allowed per-entry timeout after default_timeout_s has been applied.
45+
# 3h59m keeps generated CI wall-clock below common 4h limits once cleanup time is added.
46+
max_timeout_s: int = 14340
4447
# object store size is either a value in bytes (int), a fraction of total system memory (float), or None or the
4548
# value "default" (string) both representing the default object store size as used by "ray start".
4649
object_store_size: int | float | str | None = 0.5
@@ -55,7 +58,7 @@ class Session:
5558
path_resolver: PathResolver = None
5659
dataset_resolver: DatasetResolver = None
5760

58-
def __post_init__(self) -> None: # noqa: C901
61+
def __post_init__(self) -> None: # noqa: C901, PLR0912
5962
"""Post-initialization checks and updates for dataclass."""
6063
names = [entry.name for entry in self.entries]
6164
if len(names) != len(set(names)):
@@ -75,15 +78,27 @@ def __post_init__(self) -> None: # noqa: C901
7578
)
7679
raise ValueError(msg)
7780

81+
if not isinstance(self.max_timeout_s, int) or isinstance(self.max_timeout_s, bool) or self.max_timeout_s <= 0:
82+
msg = f"Invalid max_timeout_s: {self.max_timeout_s}; must be a positive integer."
83+
raise ValueError(msg)
84+
7885
# Update delete_scratch for each entry that has not been set to the session-level delete_scratch setting
7986
for entry in self.entries:
8087
if entry.delete_scratch is None:
8188
entry.delete_scratch = self.delete_scratch
8289

83-
# Update timeout_s for each entry that has not been set to the session-level default_timeout_s
90+
# Update timeout_s for each entry that has not been set to the session-level
91+
# default_timeout_s, then enforce the session-level maximum against effective values.
8492
for entry in self.entries:
8593
if entry.timeout_s is None:
8694
entry.timeout_s = self.default_timeout_s
95+
if entry.timeout_s > self.max_timeout_s:
96+
msg = (
97+
f"Entry '{entry.name}' has timeout_s={entry.timeout_s}, which exceeds "
98+
f"max_timeout_s={self.max_timeout_s}. Entry timeouts are validated after "
99+
"all YAML files have been merged and default_timeout_s has been applied."
100+
)
101+
raise ValueError(msg)
87102

88103
# Update object store size for each entry that has not been set.
89104
for entry in self.entries:

benchmarking/tools/generate_ci_tests.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
# Fallbacks used only when nightly-benchmark.yaml omits the corresponding key; the YAML config
2929
# is the source of truth for these timeout knobs.
3030
DEFAULT_TIMEOUT_S = 7200 # mirrors Session.default_timeout_s in runner/session.py
31+
# Slurm caps CI jobs at 4h, so this default leaves room for the 60s cleanup buffer.
32+
DEFAULT_MAX_TIMEOUT_S = 14340 # mirrors Session.max_timeout_s in runner/session.py
3133
DEFAULT_CLEANUP_TIMEOUT_S = 60
3234
DEFAULT_MIN_TIMEOUT_S = 600
3335

@@ -70,7 +72,14 @@ def session_name_from_env(env: Mapping[str, str] = os.environ) -> str | None:
7072
return None
7173

7274

73-
def generate_job(entry: dict, scope: str, default_timeout_s: int, cleanup_timeout_s: int, min_timeout_s: int) -> dict:
75+
def generate_job( # noqa: PLR0913
76+
entry: dict,
77+
scope: str,
78+
default_timeout_s: int,
79+
cleanup_timeout_s: int,
80+
min_timeout_s: int,
81+
max_timeout_s: int,
82+
) -> dict:
7483
"""
7584
Generate a GitLab CI job for a single benchmark entry.
7685
@@ -80,14 +89,20 @@ def generate_job(entry: dict, scope: str, default_timeout_s: int, cleanup_timeou
8089
default_timeout_s: Timeout used for entries that omit "timeout_s"
8190
cleanup_timeout_s: Buffer added on top of every entry's timeout for post-run cleanup
8291
min_timeout_s: Floor on the generated job time to cover container setup overhead
92+
max_timeout_s: Maximum allowed effective entry timeout before cleanup time is added
8393
8494
Returns:
8595
job: Dictionary defining the GitLab CI job
8696
"""
8797
ray = entry.get("ray", {})
98+
entry_timeout_s = entry.get("timeout_s", default_timeout_s)
99+
if entry_timeout_s > max_timeout_s:
100+
msg = f"Entry '{entry['name']}' has timeout_s={entry_timeout_s}, which exceeds max_timeout_s={max_timeout_s}"
101+
raise ValueError(msg)
102+
88103
# SLURM wall-clock = entry's effective timeout + a fixed cleanup buffer, floored at
89104
# min_timeout_s so short entries get enough time for container setup before their run starts.
90-
timeout_s = max(entry.get("timeout_s", default_timeout_s) + cleanup_timeout_s, min_timeout_s)
105+
timeout_s = max(entry_timeout_s + cleanup_timeout_s, min_timeout_s)
91106
time_str = seconds_to_time(timeout_s)
92107

93108
return {
@@ -124,6 +139,7 @@ def generate_pipeline(curator_dir: str, scope: str, session_name: str | None = N
124139
default_timeout_s = config.get("default_timeout_s", DEFAULT_TIMEOUT_S)
125140
cleanup_timeout_s = config.get("cleanup_timeout_s", DEFAULT_CLEANUP_TIMEOUT_S)
126141
min_timeout_s = config.get("min_timeout_s", DEFAULT_MIN_TIMEOUT_S)
142+
max_timeout_s = config.get("max_timeout_s", DEFAULT_MAX_TIMEOUT_S)
127143

128144
pipeline = {
129145
"include": ["curator/curator_ci_template.yml"],
@@ -137,7 +153,9 @@ def generate_pipeline(curator_dir: str, scope: str, session_name: str | None = N
137153
if not entry.get("enabled", True):
138154
continue
139155

140-
pipeline[entry["name"]] = generate_job(entry, scope, default_timeout_s, cleanup_timeout_s, min_timeout_s)
156+
pipeline[entry["name"]] = generate_job(
157+
entry, scope, default_timeout_s, cleanup_timeout_s, min_timeout_s, max_timeout_s
158+
)
141159
job_count += 1
142160

143161
if job_count == 0:

nemo_curator/backends/ray_data/adapter.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ def _process_batch_internal(self, batch: dict[str, Any]) -> dict[str, Any]:
7373
# For Task objects, we return them in the 'item' column
7474
return {"item": results}
7575

76+
def _build_resource_kwargs(self, ray_stage_spec: dict) -> dict[str, float]:
77+
"""Build num_cpus/num_gpus kwargs for map_batches.
78+
79+
Checks ray_stage_spec for RAY_NUM_CPUS first so stages can request a
80+
different CPU reservation for Ray Data (e.g. cpus=1.0 to enable stage
81+
fusion) without changing resources.cpus used by other executors.
82+
"""
83+
kwargs: dict[str, float] = {}
84+
ray_num_cpus = ray_stage_spec.get(RayStageSpecKeys.RAY_NUM_CPUS)
85+
if ray_num_cpus is not None:
86+
kwargs["num_cpus"] = ray_num_cpus # type: ignore[reportArgumentType]
87+
elif self.stage.resources.cpus > 0:
88+
kwargs["num_cpus"] = self.stage.resources.cpus # type: ignore[reportArgumentType]
89+
if self.stage.resources.gpus > 0:
90+
kwargs["num_gpus"] = self.stage.resources.gpus # type: ignore[reportArgumentType]
91+
return kwargs
92+
7693
def process_dataset(self, dataset: Dataset) -> Dataset:
7794
"""Process a Ray Data dataset through this stage.
7895
@@ -107,10 +124,7 @@ def process_dataset(self, dataset: Dataset) -> Dataset:
107124
if max_calls is not None:
108125
map_batches_kwargs["max_calls"] = max_calls
109126

110-
if self.stage.resources.cpus > 0:
111-
map_batches_kwargs["num_cpus"] = self.stage.resources.cpus # type: ignore[reportArgumentType]
112-
if self.stage.resources.gpus > 0:
113-
map_batches_kwargs["num_gpus"] = self.stage.resources.gpus # type: ignore[reportArgumentType]
127+
map_batches_kwargs.update(self._build_resource_kwargs(ray_stage_spec))
114128

115129
# Per-stage ray_remote_args (e.g. runtime_env with different pip versions per stage).
116130
ray_remote_args = copy.deepcopy(ray_stage_spec.get(RayStageSpecKeys.RAY_REMOTE_ARGS) or {})

nemo_curator/backends/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class RayStageSpecKeys(str, Enum):
134134
MAX_WORKERS = "max_workers"
135135
INITIAL_WORKERS = "initial_workers"
136136
RAY_REMOTE_ARGS = "ray_remote_args"
137+
RAY_NUM_CPUS = "ray_num_cpus"
137138

138139

139140
def get_worker_metadata_and_node_id() -> tuple[NodeInfo, WorkerMetadata]:

nemo_curator/stages/video/clipping/clip_extraction_stages.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ClipTranscodingStage(ProcessingStage[VideoTask, VideoTask]):
3838
software (libx264, libopenh264) and hardware (NVENC) encoding with configurable parameters.
3939
4040
Args:
41-
num_cpus_per_worker: Number of CPUs per worker.
41+
num_cpus_per_worker: Number of CPUs per worker for Xenna scheduling. Does not affect Ray Data CPU scheduling; use ray_data_num_cpus for that.
4242
encoder: Video encoder to use.
4343
encoder_threads: Number of threads per encoder.
4444
encode_batch_size: Number of clips to encode in parallel.
@@ -48,6 +48,7 @@ class ClipTranscodingStage(ProcessingStage[VideoTask, VideoTask]):
4848
num_clips_per_chunk: Number of clips per chunk. If the number of clips is larger than this, the clips will be split into chunks, and created VideoTasks for each chunk.
4949
verbose: Whether to print verbose logs.
5050
ffmpeg_verbose: Whether to print FFmpeg verbose logs.
51+
ray_data_num_cpus: CPU cores reserved per Ray Data actor for this stage. Defaults to 1.0 on the CPU encoder path to enable stage fusion with upstream stages. Set to None to fall back to resources.cpus. Does not affect Xenna scheduling.
5152
"""
5253

5354
num_cpus_per_worker: float = 6.0
@@ -61,6 +62,9 @@ class ClipTranscodingStage(ProcessingStage[VideoTask, VideoTask]):
6162
ffmpeg_verbose: bool = False
6263
verbose: bool = False
6364
name: str = "clip_transcoding"
65+
ray_data_num_cpus: float | None = (
66+
None # CPU reservation for Ray Data scheduler; set to 1.0 on CPU path to enable stage fusion
67+
)
6468

6569
def setup(self, worker_metadata: WorkerMetadata | None = None) -> None: # noqa: ARG002
6670
"""Setup method called once before processing begins.
@@ -83,6 +87,11 @@ def __post_init__(self) -> None:
8387
self.resources = Resources(gpus=1)
8488
else:
8589
self.resources = Resources(cpus=self.num_cpus_per_worker)
90+
if self.ray_data_num_cpus is None:
91+
# Default to 1.0 so Ray Data fuses this stage with VideoReaderStage
92+
# and FixedStrideExtractorStage. Kept separate from resources.cpus
93+
# so Xenna scheduling is unaffected.
94+
self.ray_data_num_cpus = 1.0
8695

8796
def inputs(self) -> tuple[list[str], list[str]]:
8897
return ["data"], ["source_bytes"]
@@ -92,9 +101,10 @@ def outputs(self) -> tuple[list[str], list[str]]:
92101

93102
def ray_stage_spec(self) -> dict[str, Any]:
94103
"""Ray stage specification for this stage."""
95-
return {
96-
RayStageSpecKeys.IS_FANOUT_STAGE: True,
97-
}
104+
spec: dict[str, Any] = {RayStageSpecKeys.IS_FANOUT_STAGE: True}
105+
if self.ray_data_num_cpus is not None:
106+
spec[RayStageSpecKeys.RAY_NUM_CPUS] = self.ray_data_num_cpus
107+
return spec
98108

99109
def process(self, task: VideoTask) -> VideoTask:
100110
video = task.data

nemo_curator/stages/video/clipping/video_frame_extraction.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
import subprocess
1616
from dataclasses import dataclass
1717
from pathlib import Path
18+
from typing import Any
1819

1920
import numpy as np
2021
import numpy.typing as npt
2122
from loguru import logger
2223

2324
from nemo_curator.backends.base import WorkerMetadata
25+
from nemo_curator.backends.utils import RayStageSpecKeys
2426
from nemo_curator.stages.base import ProcessingStage
2527
from nemo_curator.stages.resources import Resources
2628
from nemo_curator.tasks.video import VideoTask
@@ -100,6 +102,9 @@ class VideoFrameExtractionStage(ProcessingStage[VideoTask, VideoTask]):
100102
decoder_mode: str = "pynvc"
101103
verbose: bool = False
102104
name: str = "video_frame_extraction"
105+
ray_data_num_cpus: float | None = (
106+
None # CPU reservation for Ray Data scheduler; set to 1.0 on CPU path to enable stage fusion
107+
)
103108

104109
def inputs(self) -> tuple[list[str], list[str]]:
105110
return ["data"], []
@@ -130,6 +135,16 @@ def __post_init__(self) -> None:
130135
self.resources = Resources(gpu_memory_gb=10)
131136
else:
132137
self.resources = Resources(cpus=4.0)
138+
if self.ray_data_num_cpus is None:
139+
# Default to 1.0 so Ray Data fuses this stage with VideoReaderStage.
140+
# Kept separate from resources.cpus so Xenna scheduling is unaffected.
141+
self.ray_data_num_cpus = 1.0
142+
143+
def ray_stage_spec(self) -> dict[str, Any]:
144+
"""Ray stage specification for this stage."""
145+
if self.ray_data_num_cpus is not None:
146+
return {RayStageSpecKeys.RAY_NUM_CPUS: self.ray_data_num_cpus}
147+
return {}
133148

134149
def process(self, task: VideoTask) -> VideoTask:
135150
width, height = self.output_hw

tests/backends/ray_data/test_adapter.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ def test_process_dataset_rejects_managed_ray_remote_args(self):
130130
with pytest.raises(ValueError, match="must not override Curator-managed map_batches arguments"):
131131
_map_batches_kwargs(stage)
132132

133+
def test_build_resource_kwargs_uses_ray_num_cpus_from_spec_over_resources_cpus(self):
134+
stage = ConfigurableActorStage(ray_stage_spec={RayStageSpecKeys.RAY_NUM_CPUS: 1.0})
135+
kwargs = _map_batches_kwargs(stage)
136+
assert kwargs["num_cpus"] == 1.0
137+
138+
def test_build_resource_kwargs_falls_back_to_resources_cpus_when_ray_num_cpus_absent(self):
139+
stage = ConfigurableActorStage()
140+
kwargs = _map_batches_kwargs(stage)
141+
assert kwargs["num_cpus"] == stage.resources.cpus
142+
133143

134144
def _map_batches_kwargs(stage: ProcessingStage) -> dict[str, object]:
135145
dataset = RecordingDataset()

0 commit comments

Comments
 (0)