Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions nemo_curator/backends/ray_data/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ def _process_batch_internal(self, batch: dict[str, Any]) -> dict[str, Any]:
# For Task objects, we return them in the 'item' column
return {"item": results}

def process_dataset(self, dataset: Dataset, ignore_head_node: bool = False) -> Dataset:
def process_dataset(
self, dataset: Dataset, ignore_head_node: bool = False, initial_replicas: int | None = None
) -> Dataset:
"""Process a Ray Data dataset through this stage.

Args:
Expand All @@ -87,7 +89,7 @@ def process_dataset(self, dataset: Dataset, ignore_head_node: bool = False) -> D
map_batches_fn = create_actor_from_stage(self.stage)
concurrency_kwargs = {
"concurrency": calculate_concurrency_for_actors_for_stage(
self.stage, ignore_head_node=ignore_head_node
self.stage, ignore_head_node=ignore_head_node, initial_replicas=initial_replicas
),
}
else:
Expand Down
67 changes: 66 additions & 1 deletion nemo_curator/backends/ray_data/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,56 @@
from nemo_curator.tasks import EmptyTask, Task

from .adapter import RayDataStageAdapter
from .utils import compute_joint_initial_allocation, is_actor_stage

# ---------------------------------------------------------------------------
# Xenna-profiled stage speeds (items/s per actor), recorded from the
# 2026-05-28 benchmark run. Stage speeds are pipeline-specific because clip
# duration (and therefore work-per-item) varies between pipelines.
# ---------------------------------------------------------------------------
_SPEEDS_TRANSCODING: dict[str, float] = {
"ClipTranscodingStage": 0.658,
"ClipWriterStage": 39.7,
}
_SPEEDS_EMBEDDING: dict[str, float] = {
"ClipTranscodingStage": 0.645,
"ClipFrameExtractionStage": 3.618,
"CosmosEmbed1FrameCreationStage": 5.461,
"CosmosEmbed1EmbeddingStage": 9.856,
"ClipWriterStage": 36.7,
}
_SPEEDS_CAPTIONING: dict[str, float] = {
"CaptionEnhancementStage": 0.0996,
"CaptionGenerationStage": 0.1562,
"CaptionPreparationStage": 0.8121,
"ClipTranscodingStage": 0.637,
"ClipWriterStage": 39.9,
}
_SPEEDS_TRANSNETV2: dict[str, float] = {
"VideoFrameExtractionStage": 2.993,
"TransNetV2ClipExtractionStage": 16.84,
"ClipTranscodingStage": 0.764,
"ClipAestheticFilterStage": 67.84,
"CosmosEmbed1FrameCreationStage": 650.5,
"CosmosEmbed1EmbeddingStage": 836.6,
"ClipFrameExtractionStage": 711.7,
"ClipWriterStage": 40.8,
}


def _get_pipeline_speeds(actor_stages: list) -> "dict[str, float] | None":
"""Return the Xenna speed table that matches the given actor-stage set, or None."""
names = {s.__class__.__name__ for s in actor_stages}
if "CaptionEnhancementStage" in names:
return _SPEEDS_CAPTIONING
if "TransNetV2ClipExtractionStage" in names:
return _SPEEDS_TRANSNETV2
if "CosmosEmbed1EmbeddingStage" in names:
return _SPEEDS_EMBEDDING
if "ClipTranscodingStage" in names:
return _SPEEDS_TRANSCODING
return None
Comment on lines +63 to +74

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fragile pipeline identity heuristic: The cascade of single-feature checks (CaptionEnhancementStage present → captioning, etc.) silently picks a wrong speed table if a future pipeline combines these stages in a novel way. For example, a pipeline containing both CaptionEnhancementStage and TransNetV2ClipExtractionStage would match captioning even though transnetv2 speeds are also in play. Since a mismatched table causes a silent fallback to N=1 (via the all(n in stage_speeds) guard), correctness is preserved, but it may be worth adding an explicit warning when the matched table doesn't contain every stage in the pipeline.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!



if TYPE_CHECKING:
from nemo_curator.stages.base import ProcessingStage
Expand Down Expand Up @@ -77,6 +127,18 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
execute_setup_on_node(stages, ignore_head_node=self.ignore_head_node)
logger.info(f"Setup on node complete for all stages. Starting Ray Data pipeline with {len(stages)} stages")

# Compute joint initial allocation across all actor stages so that
# total resource demand stays within cluster capacity (mirrors Xenna startup).
_cluster = ray.cluster_resources()
_avail_cpus = _cluster.get("CPU", 0)
_avail_gpus = _cluster.get("GPU", 0)
Comment on lines +132 to +134

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Resource source mismatch: ray.cluster_resources() returns total cluster resources (including head node and in-use allocations), while calculate_concurrency_for_actors_for_stage calls get_available_cpu_gpu_resources() which returns currently available resources. When ignore_head_node=True, the available count is further reduced by subtracting head-node CPUs/GPUs, making it almost certain that initial_replicas > max_actors. Ray Data raises a ValueError when given concurrency=(min, max) with min > max.

Suggested change
_cluster = ray.cluster_resources()
_avail_cpus = _cluster.get("CPU", 0)
_avail_gpus = _cluster.get("GPU", 0)
from nemo_curator.backends.utils import get_available_cpu_gpu_resources
_avail_cpus, _avail_gpus = get_available_cpu_gpu_resources(
init_and_shutdown=False, ignore_head_node=self.ignore_head_node
)

_actor_stages = [s for s in stages if is_actor_stage(s) and s.num_workers() is None]
_stage_speeds = _get_pipeline_speeds(_actor_stages)
joint_allocation = compute_joint_initial_allocation(
_actor_stages, _avail_cpus, _avail_gpus, stage_speeds=_stage_speeds
)
logger.info(f"Joint initial allocation: {joint_allocation}")

# Process through each stage
for i, stage in enumerate(stages):
# TODO: add pipeline level config for verbosity
Expand All @@ -87,7 +149,10 @@ def execute(self, stages: list["ProcessingStage"], initial_tasks: list[Task] | N
adapter = RayDataStageAdapter(stage)

# Apply stage transformation
current_dataset = adapter.process_dataset(current_dataset, self.ignore_head_node)
initial_replicas = joint_allocation.get(stage.__class__.__name__)
current_dataset = adapter.process_dataset(
current_dataset, self.ignore_head_node, initial_replicas=initial_replicas
)
except Exception as e:
logger.error(f"Error during pipeline execution: {e}")
raise
Expand Down
93 changes: 89 additions & 4 deletions nemo_curator/backends/ray_data/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,104 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math

from nemo_curator.backends.utils import get_available_cpu_gpu_resources
from nemo_curator.stages.base import ProcessingStage


def compute_joint_initial_allocation(
actor_stages: list[ProcessingStage],
available_cpus: float,
available_gpus: float,
stage_speeds: dict[str, float] | None = None,
cpu_allocation_percentage: float = 0.85,
) -> dict[str, int]:
"""Compute initial replica counts for all actor stages jointly.

If stage_speeds is provided for every actor stage (items/s per actor, from Xenna
profiling), uses a throughput-balanced heuristic: sets N_i proportional to
1/speed_i so all stages process items at the same rate, then scales up as far as
the CPU and GPU budget allows.

For CPU-only pipelines (no GPU actors), cpu_allocation_percentage reserves a
fraction of CPUs for task-mode stages so actor reservations do not starve
upstream/downstream task pools.

If any actor stage is missing a speed, falls back to N=1 for all stages (safe:
Ray Data autoscales upward from min=1 as resources free up).

Returns:
dict mapping stage class name -> initial replica count (>= 1)
"""
if not actor_stages:
return {}

names = [s.__class__.__name__ for s in actor_stages]
if stage_speeds is not None and all(n in stage_speeds for n in names):
return _throughput_balanced_allocation(
actor_stages, available_cpus, available_gpus, stage_speeds, cpu_allocation_percentage
)
return dict.fromkeys(names, 1)


def _throughput_balanced_allocation(
actor_stages: list[ProcessingStage],
available_cpus: float,
available_gpus: float,
stage_speeds: dict[str, float],
cpu_allocation_percentage: float,
) -> dict[str, int]:
"""Scale all stages proportionally to speed so they run at the same rate.

Finds the largest integer scale K such that N_i = ceil(bottleneck_speed / speed_i * K)
fits within the CPU/GPU budget for every stage simultaneously.
"""
names = [s.__class__.__name__ for s in actor_stages]
speeds = [stage_speeds[n] for n in names]
bottleneck = min(speeds)
ratios = [bottleneck / spd for spd in speeds]

# Always reserve cpu_allocation_percentage for task-mode stages (VideoReader, etc.).
# Even GPU-bottlenecked pipelines have CPU-intensive task stages that need headroom.
avail_cpus = available_cpus * cpu_allocation_percentage

# Continuous upper bound for K (before applying ceil to individual counts)
cpu_weight = sum(r * s.resources.cpus for r, s in zip(ratios, actor_stages, strict=True))
gpu_weight = sum(r * s.resources.gpus for r, s in zip(ratios, actor_stages, strict=True))
k_cpu = int(avail_cpus / cpu_weight) if cpu_weight > 0 else 1
k_gpu = int(available_gpus / gpu_weight) if gpu_weight > 0 else k_cpu
k_max = max(1, min(k_cpu, k_gpu))

# Descend from k_max until the actual (ceil'd) counts fit the budget.
# Ceils can push usage above the continuous bound, so we check iteratively.
for k in range(k_max, 0, -1):
counts = [max(1, math.ceil(r * k)) for r in ratios]
cpu_used = sum(c * s.resources.cpus for c, s in zip(counts, actor_stages, strict=True))
gpu_used = sum(c * s.resources.gpus for c, s in zip(counts, actor_stages, strict=True))
if cpu_used <= avail_cpus and (available_gpus == 0 or gpu_used <= available_gpus):
return dict(zip(names, counts, strict=True))

return dict.fromkeys(names, 1)


def calculate_concurrency_for_actors_for_stage(
stage: ProcessingStage, ignore_head_node: bool = False
stage: ProcessingStage,
ignore_head_node: bool = False,
initial_replicas: int | None = None,
) -> tuple[int, int] | int:
"""
Calculate concurrency if we want to spin up actors based on available resources and stage requirements.

Args:
initial_replicas: If provided (from compute_joint_initial_allocation), use this as
the minimum (initial) replica count instead of 1. This mirrors Xenna's behaviour
of pre-allocating all workers upfront based on a joint resource budget.

Returns:
int | tuple[int, int]: Number of actors to use
int: Number of workers to use
tuple[int, int]: tuple of min / max actors to use and number of workers to use
tuple[int, int]: tuple of (initial, max) actors
"""
# If explicitly set, use the specified number of workers
num_workers = stage.num_workers()
Expand All @@ -49,8 +133,9 @@ def calculate_concurrency_for_actors_for_stage(
max_gpu_actors = available_gpus // stage.resources.gpus

# Take the minimum of CPU and GPU constraints
max_actors = min(max_cpu_actors, max_gpu_actors)
return (1, int(max_actors))
max_actors = int(min(max_cpu_actors, max_gpu_actors))
min_actors = initial_replicas if initial_replicas is not None else 1
return (min_actors, max_actors)
Comment on lines +136 to +138

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Even after fixing the resource source, initial_replicas could still exceed max_actors in rare timing windows (e.g., another actor briefly holds resources when calculate_concurrency_for_actors_for_stage queries available resources). Clamping min_actors to max_actors ensures the tuple is always valid without suppressing the intent of the heuristic.

Suggested change
max_actors = int(min(max_cpu_actors, max_gpu_actors))
min_actors = initial_replicas if initial_replicas is not None else 1
return (min_actors, max_actors)
max_actors = int(min(max_cpu_actors, max_gpu_actors))
min_actors = initial_replicas if initial_replicas is not None else 1
return (max(1, min(min_actors, max_actors)), max_actors)



def is_actor_stage(stage: ProcessingStage) -> bool:
Expand Down
Loading