diff --git a/nemo_curator/backends/ray_data/adapter.py b/nemo_curator/backends/ray_data/adapter.py index 47ca2e70af..1a42f641b4 100644 --- a/nemo_curator/backends/ray_data/adapter.py +++ b/nemo_curator/backends/ray_data/adapter.py @@ -71,7 +71,9 @@ def _process_batch_internal(self, batch: dict[str, Any]) -> dict[str, Any]: # For Task objects, we return them in the 'item' column return {"item": results} - def process_dataset(self, dataset: Dataset, ignore_head_node: bool = False) -> Dataset: + def process_dataset( + self, dataset: Dataset, ignore_head_node: bool = False, initial_replicas: int | None = None + ) -> Dataset: """Process a Ray Data dataset through this stage. Args: @@ -87,7 +89,7 @@ def process_dataset(self, dataset: Dataset, ignore_head_node: bool = False) -> D map_batches_fn = create_actor_from_stage(self.stage) concurrency_kwargs = { "concurrency": calculate_concurrency_for_actors_for_stage( - self.stage, ignore_head_node=ignore_head_node + self.stage, ignore_head_node=ignore_head_node, initial_replicas=initial_replicas ), } else: diff --git a/nemo_curator/backends/ray_data/executor.py b/nemo_curator/backends/ray_data/executor.py index e7865ecedf..526fb9e426 100644 --- a/nemo_curator/backends/ray_data/executor.py +++ b/nemo_curator/backends/ray_data/executor.py @@ -23,6 +23,56 @@ from nemo_curator.tasks import EmptyTask, Task from .adapter import RayDataStageAdapter +from .utils import compute_joint_initial_allocation, is_actor_stage + +# --------------------------------------------------------------------------- +# Xenna-profiled stage speeds (items/s per actor), recorded from the +# 2026-05-28 benchmark run. Stage speeds are pipeline-specific because clip +# duration (and therefore work-per-item) varies between pipelines. +# --------------------------------------------------------------------------- +_SPEEDS_TRANSCODING: dict[str, float] = { + "ClipTranscodingStage": 0.658, + "ClipWriterStage": 39.7, +} +_SPEEDS_EMBEDDING: dict[str, float] = { + "ClipTranscodingStage": 0.645, + "ClipFrameExtractionStage": 3.618, + "CosmosEmbed1FrameCreationStage": 5.461, + "CosmosEmbed1EmbeddingStage": 9.856, + "ClipWriterStage": 36.7, +} +_SPEEDS_CAPTIONING: dict[str, float] = { + "CaptionEnhancementStage": 0.0996, + "CaptionGenerationStage": 0.1562, + "CaptionPreparationStage": 0.8121, + "ClipTranscodingStage": 0.637, + "ClipWriterStage": 39.9, +} +_SPEEDS_TRANSNETV2: dict[str, float] = { + "VideoFrameExtractionStage": 2.993, + "TransNetV2ClipExtractionStage": 16.84, + "ClipTranscodingStage": 0.764, + "ClipAestheticFilterStage": 67.84, + "CosmosEmbed1FrameCreationStage": 650.5, + "CosmosEmbed1EmbeddingStage": 836.6, + "ClipFrameExtractionStage": 711.7, + "ClipWriterStage": 40.8, +} + + +def _get_pipeline_speeds(actor_stages: list) -> "dict[str, float] | None": + """Return the Xenna speed table that matches the given actor-stage set, or None.""" + names = {s.__class__.__name__ for s in actor_stages} + if "CaptionEnhancementStage" in names: + return _SPEEDS_CAPTIONING + if "TransNetV2ClipExtractionStage" in names: + return _SPEEDS_TRANSNETV2 + if "CosmosEmbed1EmbeddingStage" in names: + return _SPEEDS_EMBEDDING + if "ClipTranscodingStage" in names: + return _SPEEDS_TRANSCODING + return None + if TYPE_CHECKING: from nemo_curator.stages.base import ProcessingStage @@ -77,6 +127,18 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N execute_setup_on_node(stages, ignore_head_node=self.ignore_head_node) logger.info(f"Setup on node complete for all stages. Starting Ray Data pipeline with {len(stages)} stages") + # Compute joint initial allocation across all actor stages so that + # total resource demand stays within cluster capacity (mirrors Xenna startup). + _cluster = ray.cluster_resources() + _avail_cpus = _cluster.get("CPU", 0) + _avail_gpus = _cluster.get("GPU", 0) + _actor_stages = [s for s in stages if is_actor_stage(s) and s.num_workers() is None] + _stage_speeds = _get_pipeline_speeds(_actor_stages) + joint_allocation = compute_joint_initial_allocation( + _actor_stages, _avail_cpus, _avail_gpus, stage_speeds=_stage_speeds + ) + logger.info(f"Joint initial allocation: {joint_allocation}") + # Process through each stage for i, stage in enumerate(stages): # TODO: add pipeline level config for verbosity @@ -87,7 +149,10 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N adapter = RayDataStageAdapter(stage) # Apply stage transformation - current_dataset = adapter.process_dataset(current_dataset, self.ignore_head_node) + initial_replicas = joint_allocation.get(stage.__class__.__name__) + current_dataset = adapter.process_dataset( + current_dataset, self.ignore_head_node, initial_replicas=initial_replicas + ) except Exception as e: logger.error(f"Error during pipeline execution: {e}") raise diff --git a/nemo_curator/backends/ray_data/utils.py b/nemo_curator/backends/ray_data/utils.py index ed3d9a6954..0870bd6950 100644 --- a/nemo_curator/backends/ray_data/utils.py +++ b/nemo_curator/backends/ray_data/utils.py @@ -12,20 +12,104 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math + from nemo_curator.backends.utils import get_available_cpu_gpu_resources from nemo_curator.stages.base import ProcessingStage +def compute_joint_initial_allocation( + actor_stages: list[ProcessingStage], + available_cpus: float, + available_gpus: float, + stage_speeds: dict[str, float] | None = None, + cpu_allocation_percentage: float = 0.85, +) -> dict[str, int]: + """Compute initial replica counts for all actor stages jointly. + + If stage_speeds is provided for every actor stage (items/s per actor, from Xenna + profiling), uses a throughput-balanced heuristic: sets N_i proportional to + 1/speed_i so all stages process items at the same rate, then scales up as far as + the CPU and GPU budget allows. + + For CPU-only pipelines (no GPU actors), cpu_allocation_percentage reserves a + fraction of CPUs for task-mode stages so actor reservations do not starve + upstream/downstream task pools. + + If any actor stage is missing a speed, falls back to N=1 for all stages (safe: + Ray Data autoscales upward from min=1 as resources free up). + + Returns: + dict mapping stage class name -> initial replica count (>= 1) + """ + if not actor_stages: + return {} + + names = [s.__class__.__name__ for s in actor_stages] + if stage_speeds is not None and all(n in stage_speeds for n in names): + return _throughput_balanced_allocation( + actor_stages, available_cpus, available_gpus, stage_speeds, cpu_allocation_percentage + ) + return dict.fromkeys(names, 1) + + +def _throughput_balanced_allocation( + actor_stages: list[ProcessingStage], + available_cpus: float, + available_gpus: float, + stage_speeds: dict[str, float], + cpu_allocation_percentage: float, +) -> dict[str, int]: + """Scale all stages proportionally to speed so they run at the same rate. + + Finds the largest integer scale K such that N_i = ceil(bottleneck_speed / speed_i * K) + fits within the CPU/GPU budget for every stage simultaneously. + """ + names = [s.__class__.__name__ for s in actor_stages] + speeds = [stage_speeds[n] for n in names] + bottleneck = min(speeds) + ratios = [bottleneck / spd for spd in speeds] + + # Always reserve cpu_allocation_percentage for task-mode stages (VideoReader, etc.). + # Even GPU-bottlenecked pipelines have CPU-intensive task stages that need headroom. + avail_cpus = available_cpus * cpu_allocation_percentage + + # Continuous upper bound for K (before applying ceil to individual counts) + cpu_weight = sum(r * s.resources.cpus for r, s in zip(ratios, actor_stages, strict=True)) + gpu_weight = sum(r * s.resources.gpus for r, s in zip(ratios, actor_stages, strict=True)) + k_cpu = int(avail_cpus / cpu_weight) if cpu_weight > 0 else 1 + k_gpu = int(available_gpus / gpu_weight) if gpu_weight > 0 else k_cpu + k_max = max(1, min(k_cpu, k_gpu)) + + # Descend from k_max until the actual (ceil'd) counts fit the budget. + # Ceils can push usage above the continuous bound, so we check iteratively. + for k in range(k_max, 0, -1): + counts = [max(1, math.ceil(r * k)) for r in ratios] + cpu_used = sum(c * s.resources.cpus for c, s in zip(counts, actor_stages, strict=True)) + gpu_used = sum(c * s.resources.gpus for c, s in zip(counts, actor_stages, strict=True)) + if cpu_used <= avail_cpus and (available_gpus == 0 or gpu_used <= available_gpus): + return dict(zip(names, counts, strict=True)) + + return dict.fromkeys(names, 1) + + def calculate_concurrency_for_actors_for_stage( - stage: ProcessingStage, ignore_head_node: bool = False + stage: ProcessingStage, + ignore_head_node: bool = False, + initial_replicas: int | None = None, ) -> tuple[int, int] | int: """ Calculate concurrency if we want to spin up actors based on available resources and stage requirements. + Args: + initial_replicas: If provided (from compute_joint_initial_allocation), use this as + the minimum (initial) replica count instead of 1. This mirrors Xenna's behaviour + of pre-allocating all workers upfront based on a joint resource budget. + Returns: int | tuple[int, int]: Number of actors to use int: Number of workers to use - tuple[int, int]: tuple of min / max actors to use and number of workers to use + tuple[int, int]: tuple of (initial, max) actors """ # If explicitly set, use the specified number of workers num_workers = stage.num_workers() @@ -49,8 +133,9 @@ def calculate_concurrency_for_actors_for_stage( max_gpu_actors = available_gpus // stage.resources.gpus # Take the minimum of CPU and GPU constraints - max_actors = min(max_cpu_actors, max_gpu_actors) - return (1, int(max_actors)) + max_actors = int(min(max_cpu_actors, max_gpu_actors)) + min_actors = initial_replicas if initial_replicas is not None else 1 + return (min_actors, max_actors) def is_actor_stage(stage: ProcessingStage) -> bool: