-
Notifications
You must be signed in to change notification settings - Fork 292
Pipeline resumability via source-level counter checkpointing #2063
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3a6ef8d
a1b3195
b7423a1
8f1dc0b
ec2073b
a3c1d54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -17,14 +17,23 @@ | |||||
| from dataclasses import dataclass | ||||||
| from typing import TYPE_CHECKING, Any | ||||||
|
|
||||||
| from loguru import logger | ||||||
|
|
||||||
| from nemo_curator.core.utils import ignore_ray_head_node | ||||||
| from nemo_curator.tasks import Task | ||||||
| from nemo_curator.tasks.sentinels import FailedTask, NoneTask | ||||||
| from nemo_curator.utils.performance_utils import StageTimer | ||||||
| from nemo_curator.utils.resumability_client import _flush_deltas, _is_active, _skip_completed_sources | ||||||
|
|
||||||
| if TYPE_CHECKING: | ||||||
| from nemo_curator.stages.base import ProcessingStage | ||||||
|
|
||||||
|
|
||||||
| def _is_sentinel(task: Task) -> bool: | ||||||
| """A payload-less marker (NoneTask/FailedTask), stripped before the next stage.""" | ||||||
| return isinstance(task, (NoneTask, FailedTask)) | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class NodeInfo: | ||||||
| """Generic node information for setup_on_node calls across backends. | ||||||
|
|
@@ -85,9 +94,20 @@ def process_batch(self, tasks: list[Task]) -> list[Task]: | |||||
| # Use the batch processing logic | ||||||
| results = self.stage.process_batch(tasks) | ||||||
|
|
||||||
| # Guarantee every emitted task has a task_id (derived id, or uuid fallback). | ||||||
| # Replace a returned None ("filter this slot") with a NoneTask so every | ||||||
| # output gets a task_id; sentinels are stripped again below. | ||||||
| results = [NoneTask() if r is None else r for r in results] | ||||||
|
|
||||||
| # Assign every emitted task a task_id (derived, or uuid fallback). | ||||||
| results = self._post_process_task_ids(tasks, results) | ||||||
|
|
||||||
| # Opt-in resumability: fire per-source deltas (no-op when no actor registered). | ||||||
| if _is_active(): | ||||||
| results = self._apply_resumability_counters(tasks, results) | ||||||
|
|
||||||
| # Sentinels never propagate to the next stage. | ||||||
| results = [r for r in results if not _is_sentinel(r)] | ||||||
|
|
||||||
| # Log performance stats and add to result tasks | ||||||
| _, stage_perf_stats = self._timer.log_stats() | ||||||
| # Consume and attach any custom metrics recorded by the stage during this call | ||||||
|
|
@@ -100,41 +120,22 @@ def process_batch(self, tasks: list[Task]) -> list[Task]: | |||||
| return results | ||||||
|
|
||||||
| def _post_process_task_ids(self, input_tasks: list[Task], output_tasks: list[Task | None]) -> list[Task]: | ||||||
| """Assign a deterministic ``task_id`` to every emitted task. | ||||||
|
|
||||||
| This is the single place task ids are assigned — it runs for every | ||||||
| stage on every backend (all backend adapters subclass this), so it | ||||||
| makes no difference whether a stage defines ``process`` or overrides | ||||||
| ``process_batch``. ``task_id`` is the task's id path (parents + own segment); ids are | ||||||
| re-derived at each stage boundary so the same object passing through | ||||||
| N stages gets N ids. | ||||||
|
|
||||||
| The input→output mapping decides each output's PARENT; whether the | ||||||
| stage is a source decides each output's SEGMENT (content id vs index) | ||||||
| — the two are independent. ``None`` outputs (Curator's "return None to | ||||||
| filter") are NOT removed before the length check — keeping them in | ||||||
| place preserves positional alignment for filter stages — and are then | ||||||
| dropped from the returned list. | ||||||
|
|
||||||
| - single input → every output is its child (fan-out): ``parent_<seg>`` | ||||||
| - ``len(output) == len(input)`` → positional 1:1: each ``parent_i_<seg>``; | ||||||
| a ``None`` slot just means input ``i`` was filtered. | ||||||
| - any other (ambiguous) cardinality across a batch → a random ``uuid`` | ||||||
| prefixed with ``"r"`` (e.g. ``"r3f9a…"``), so ``task_id`` is never | ||||||
| empty even when a derived id is not possible. The ``"r"`` prefix flags | ||||||
| the id as non-deterministic / ancestry-not-tracked (see | ||||||
| ``Task.task_id`` docstring). | ||||||
|
|
||||||
| ``seg`` is the output's content id (``Task.get_deterministic_id()``) | ||||||
| for a source stage when available, else the positional index — so a | ||||||
| source partition keeps a stable id across reorderings regardless of | ||||||
| whether the source is 1→N or N→N. | ||||||
|
|
||||||
| Note: a stage that BOTH filters and fans out within a single batch | ||||||
| (returning a flat list rather than a per-input slot) cannot be mapped | ||||||
| positionally; if its length happens to equal the input length the 1:1 | ||||||
| assumption may misattribute parents. That combination is unsupported | ||||||
| until per-slot sentinels (NoneTask/FailedTask) land in a later PR. | ||||||
| """Assign a deterministic ``task_id`` (parent id + own segment) to every | ||||||
| emitted task. Runs once per stage on every backend, so ``process`` vs | ||||||
| ``process_batch`` makes no difference; ids are re-derived at each stage | ||||||
| boundary, so one object passing through N stages gets N ids. | ||||||
|
|
||||||
| - single input → fan-out: each output is ``parent_<seg>`` | ||||||
| - ``len(output) == len(input)`` → positional 1:1: ``parent_i_<seg>``; a | ||||||
| ``None`` slot means input ``i`` was filtered (kept for alignment, then | ||||||
| dropped from the result) | ||||||
| - any other cardinality → a random ``"r"``-prefixed uuid (non-deterministic, | ||||||
| ancestry-not-tracked; see ``Task.task_id``) | ||||||
|
|
||||||
| ``seg`` is the content id (``get_deterministic_id()``) for a source stage, | ||||||
| else the positional index. A stage that both filters and fans out in one | ||||||
| batch can't be mapped positionally and falls to the ``"r"`` case — return | ||||||
| one value (or ``None``) per input to stay positional. | ||||||
| """ | ||||||
| is_source = getattr(self.stage, "is_source_stage", False) | ||||||
|
|
||||||
|
|
@@ -168,6 +169,85 @@ def _post_process_task_ids(self, input_tasks: list[Task], output_tasks: list[Tas | |||||
| task.task_id = "r" + uuid.uuid4().hex | ||||||
| return out | ||||||
|
|
||||||
| # Resumability (opt-in): stamp _source_id, fire per-source deltas, drop | ||||||
| # completed sources. task_ids are already assigned; sentinels stripped by caller. | ||||||
| def _apply_resumability_counters(self, input_tasks: list[Task], output_tasks: list[Task]) -> list[Task]: # noqa: C901 | ||||||
| # Dedup key is always an OUTPUT task_id, never the input's: the source | ||||||
| # already keyed its +1 on that id, and an output id is one level deeper, | ||||||
| # so it's unique to the (task, stage) that produced it. | ||||||
| stage = self.stage | ||||||
| if getattr(stage, "is_source_stage", False): | ||||||
| return self._source_counters(output_tasks) | ||||||
|
|
||||||
| # No outputs to key on (filtering uses None->NoneTask, so this is degenerate): skip. | ||||||
| if not output_tasks: | ||||||
| return output_tasks | ||||||
|
|
||||||
| # Pre-source: inputs have no _source_id yet; nothing to track. | ||||||
| if all(not t._source_id for t in input_tasks): | ||||||
| return output_tasks | ||||||
|
Comment on lines
+187
to
+188
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which tasks are these? What's a pre-source stage? Is it the initial_tasks? |
||||||
|
|
||||||
| is_sink = stage.is_sink_stage | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. General question: why would the user ever want something other than source stage being the first stage and sink stage being the last stage of the pipeline? Like if the last stage failed but the second to last stage was the sink stage, they just don't want to rerun the last stage?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For sync stage not being the last stage, I think PDF pipelines are a good example. I have this metadata stage called stagePerfLogging at the end, which is needed because I cannot do stuff similar to benchmarking, since the pipeline.run never returns. As for the source stage, I don't have an example in mind, but we don't need to force this assumption hence, I would prefer for us to keep it relaxed. From a user's perspective, if they don't specify, the default is that the first stage is source and the last stage is sync.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also for source we have a case where user might provide initial task and no source stage is defined. |
||||||
| per_task: list[tuple[str, str, int]] = [] | ||||||
| real = [t for t in output_tasks if not _is_sentinel(t)] | ||||||
|
|
||||||
| if len(input_tasks) == 1 and len(output_tasks) != 1: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question:
Suggested change
|
||||||
| # Fan-out (1->N): parent consumed (-1); each real child continues | ||||||
| # (+1, or 0 at a sink); each FailedTask keeps the source open (+1); | ||||||
| # NoneTask contributes 0. | ||||||
| parent = input_tasks[0] | ||||||
| n_failed = sum(1 for t in output_tasks if isinstance(t, FailedTask)) | ||||||
| continuing = 0 if is_sink else len(real) | ||||||
| delta = continuing + n_failed - 1 | ||||||
| # Key on output[0].task_id (not parent.task_id, which collides with the | ||||||
| # source's +1). Non-source children are indexed positionally, so | ||||||
| # output[0] is always "<parent>_0". | ||||||
| per_task.append((output_tasks[0].task_id, parent._source_id, delta)) | ||||||
| for c in real: | ||||||
| if not c._source_id: | ||||||
| c._source_id = parent._source_id | ||||||
|
Comment on lines
+194
to
+208
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The fan-out delta The 1:1 positional branch handles |
||||||
| elif len(output_tasks) == len(input_tasks): | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're using the number of inputs and outputs as a proxy for whether a stage is 1:1 or fan-in/fan-out type stages. However in some cases (like shuffle) it might be the case that by chance the number of inputs and output tasks are identical. However since the inputs are completely shuffled I don't think we can make assumptions about reusambility. |
||||||
| # Positional 1:1; each delta keys on the output id (r.task_id). | ||||||
| for parent, r in zip(input_tasks, output_tasks, strict=True): | ||||||
| sid = parent._source_id | ||||||
| if isinstance(r, NoneTask): # filtered -> consumed | ||||||
| per_task.append((r.task_id, sid, -1)) | ||||||
| continue | ||||||
| if isinstance(r, FailedTask): # failed -> source stays open (no sink test) | ||||||
| per_task.append((r.task_id, sid, 0)) | ||||||
| continue | ||||||
| per_task.append((r.task_id, sid, -1 if is_sink else 0)) # real: sink -1, else 0 | ||||||
| if not r._source_id: | ||||||
| r._source_id = sid | ||||||
| else: | ||||||
| # M->K (M!=K): can't attribute parents; skip (source stays pending -> reprocessed). | ||||||
| logger.warning( | ||||||
| f"resumability: {type(stage).__name__} produced {len(output_tasks)} outputs " | ||||||
| f"for {len(input_tasks)} inputs; can't attribute sources, skipping counter " | ||||||
| f"update for this batch." | ||||||
| ) | ||||||
| return output_tasks | ||||||
|
|
||||||
| _flush_deltas(per_task) | ||||||
| return output_tasks | ||||||
|
|
||||||
| def _source_counters(self, output_tasks: list[Task]) -> list[Task]: | ||||||
| """Source stage: each output is a source partition; its ``_source_id`` is | ||||||
| its own last id segment. Drop already-completed sources; each survivor fires ``+1``.""" | ||||||
| sources = [t for t in output_tasks if not _is_sentinel(t)] | ||||||
| for t in sources: | ||||||
| t._source_id = t.task_id.rsplit("_", 1)[-1] | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The correct extraction for sources is to strip just the parent prefix (
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we even have multiple delimited cases here, @abhinavg4 ? Or will it be like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’m a little worried that this is relying on
Minor ask: Could we centralize this behind a small task-id helper/API instead of open-coding string parsing? For example, something like: task.source_id = TaskId.parse(task.task_id).leaf()
# or
task_id.get_last_segment()
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Answering both of the above. We will always have For the case when get_deterministic_id() all give the same value. This is an adverse case. Like this will break a ton of other stuff too (like writers and stuff). In the curator, we do not (Cannot?) ensure that these id's are unique across tasks. We just rely on the user to ensure this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor ask: Could we centralize this behind a small task-id helper/API instead of open-coding string parsing? Great call out yes.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Praateek also asked this somewhere. But yeah, I can make TaskId as a class or something. Or actually I can put this inside task. So we can do task.get_source_id() |
||||||
| completed = _skip_completed_sources([t._source_id for t in sources]) | ||||||
| per_task: list[tuple[str, str, int]] = [] | ||||||
| survivors: list[Task] = [] | ||||||
| for t in sources: | ||||||
| if t._source_id in completed: | ||||||
| continue | ||||||
| per_task.append((t.task_id, t._source_id, +1)) | ||||||
| survivors.append(t) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a little confused why the source ID here is the last part of the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is since source stage was the 3rd stage in your example. So each task_id is of one of the following format: 0_1_2_{sid}_2_1 For the {sid}, the index purely depends on what index does source stage happens. It always starts with zero since an empty task. For most cases, the task_id will have this format: 0_{sid}_0_0_0_0: Single fanout at source and then filters. |
||||||
| _flush_deltas(per_task) | ||||||
| return survivors | ||||||
|
|
||||||
| def setup_on_node(self, node_info: NodeInfo | None = None, worker_metadata: WorkerMetadata | None = None) -> None: | ||||||
| """Setup the stage on a node. | ||||||
|
|
||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |||||||||||||||||||||||||||||||||||||||
| # See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||||||
| # limitations under the License. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| from pathlib import Path | ||||||||||||||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| from loguru import logger | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -107,8 +108,9 @@ def build(self) -> None: | |||||||||||||||||||||||||||||||||||||||
| # 3. Source / sink defaults: at most one stage may be explicitly | ||||||||||||||||||||||||||||||||||||||||
| # marked; if none, the first stage is the source and the last is | ||||||||||||||||||||||||||||||||||||||||
| # the sink. The source flag activates content-based ids in the | ||||||||||||||||||||||||||||||||||||||||
| # default ``process_batch``; the sink flag is used by the | ||||||||||||||||||||||||||||||||||||||||
| # resumability layer in a follow-up PR. | ||||||||||||||||||||||||||||||||||||||||
| # default ``process_batch``; the sink flag tells the resumability | ||||||||||||||||||||||||||||||||||||||||
| # counters that a sink consumes its outputs (see | ||||||||||||||||||||||||||||||||||||||||
| # ``BaseStageAdapter._apply_resumability_counters``). | ||||||||||||||||||||||||||||||||||||||||
| self._assign_source_sink_roles() | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def _assign_source_sink_roles(self) -> None: | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -222,18 +224,32 @@ def describe(self) -> str: | |||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| return "\n".join(lines) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def run(self, executor: BaseExecutor | None = None, initial_tasks: list[Task] | None = None) -> list[Task] | None: | ||||||||||||||||||||||||||||||||||||||||
| def run( | ||||||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||||||
| executor: BaseExecutor | None = None, | ||||||||||||||||||||||||||||||||||||||||
| initial_tasks: list[Task] | None = None, | ||||||||||||||||||||||||||||||||||||||||
| checkpoint_path: str | Path | None = None, | ||||||||||||||||||||||||||||||||||||||||
| ) -> list[Task] | None: | ||||||||||||||||||||||||||||||||||||||||
| """Run the pipeline. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||
| executor (BaseExecutor): Executor to use | ||||||||||||||||||||||||||||||||||||||||
| initial_tasks (list[Task], optional): Initial tasks to start the pipeline with. Defaults to None. | ||||||||||||||||||||||||||||||||||||||||
| checkpoint_path (str | Path, optional): Resumability directory. When | ||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets note that this must be local not a remote path |
||||||||||||||||||||||||||||||||||||||||
| set, completed source partitions are tracked (in a | ||||||||||||||||||||||||||||||||||||||||
| ``.nemo_curator_metadata`` subdir) and skipped on rerun. Multiple | ||||||||||||||||||||||||||||||||||||||||
| runs (e.g. a SLURM array) may share the directory — each writes | ||||||||||||||||||||||||||||||||||||||||
| its own LMDB file, so there is no contention. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||||||||
| list[Task] | None: List of tasks | ||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||
| self.build() | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| if checkpoint_path is not None: | ||||||||||||||||||||||||||||||||||||||||
| checkpoint_path = Path(checkpoint_path).absolute() | ||||||||||||||||||||||||||||||||||||||||
| checkpoint_path.mkdir(parents=True, exist_ok=True) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| if executor is None: | ||||||||||||||||||||||||||||||||||||||||
| from nemo_curator.backends.xenna import XennaExecutor | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
@@ -263,4 +279,41 @@ def run(self, executor: BaseExecutor | None = None, initial_tasks: list[Task] | | |||||||||||||||||||||||||||||||||||||||
| if initial_tasks: | ||||||||||||||||||||||||||||||||||||||||
| assign_root_task_ids(initial_tasks) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| return executor.execute(self.stages, initial_tasks) | ||||||||||||||||||||||||||||||||||||||||
| if checkpoint_path is None: | ||||||||||||||||||||||||||||||||||||||||
| return executor.execute(self.stages, initial_tasks) | ||||||||||||||||||||||||||||||||||||||||
| return self._run_with_resumability(executor, initial_tasks, checkpoint_path) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def _run_with_resumability( | ||||||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||||||
| executor: BaseExecutor, | ||||||||||||||||||||||||||||||||||||||||
| initial_tasks: list[Task] | None, | ||||||||||||||||||||||||||||||||||||||||
| checkpoint_path: Path, | ||||||||||||||||||||||||||||||||||||||||
| ) -> list[Task] | None: | ||||||||||||||||||||||||||||||||||||||||
| """Own the resumability-actor lifecycle (executors unmodified): spawn it | ||||||||||||||||||||||||||||||||||||||||
| ``lifetime="detached"`` so it survives executor-local ``ray.shutdown()``, | ||||||||||||||||||||||||||||||||||||||||
| run, then close. The actor never raises, so there's no error path here.""" | ||||||||||||||||||||||||||||||||||||||||
| import ray | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| from nemo_curator.utils.resumability_actor import ResumabilityActor | ||||||||||||||||||||||||||||||||||||||||
| from nemo_curator.utils.resumability_client import ACTOR_NAME | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| ray.init(ignore_reinit_error=True) | ||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this needed? Shouldn't there have already been a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not enforce that right now, right? Like if the user forgets to include that, still our pipelines run. Ideally, I can add a check in the pipeline.run saying please either start a Ray client with RayClient.start() or SlurmClient with SLurmClient.start(). Would you prefer that? I would personally prefer that tbh.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah good point, it might be nice to enforce it in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't do IOW, the following doesn't work since X won't be propogated. ray.init()
ray.init(env_vars=X)
ray.shutdown()
ray.shutdown()
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so you might have to do with ray.init():
start_actor()
executor.execute()
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 on this. I don't think we should ray.init here because any subsequent ray.init will be a no-op that doesn't pass env vars and if the client hasn't been started can lead to weird behavior w.r.t the actor getting killed by the shutdowns if any. |
||||||||||||||||||||||||||||||||||||||||
| ResumabilityActor.options( # type: ignore[attr-defined] | ||||||||||||||||||||||||||||||||||||||||
| name=ACTOR_NAME, | ||||||||||||||||||||||||||||||||||||||||
| lifetime="detached", | ||||||||||||||||||||||||||||||||||||||||
| get_if_exists=True, | ||||||||||||||||||||||||||||||||||||||||
| max_pending_calls=100, | ||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the rationale behind setting this value? |
||||||||||||||||||||||||||||||||||||||||
| ).remote(str(checkpoint_path)) | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+300
to
+306
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The A lightweight fix is to call a trivially-cheap method on the handle and
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed here, in fact we got bit by this (for a different reason)... If during a retry The ray recommended way is to have a Again see
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||
| return executor.execute(self.stages, initial_tasks) | ||||||||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||||||||
| # The executor's ray.shutdown() may have run in its own | ||||||||||||||||||||||||||||||||||||||||
| # finally:; reconnect to clean up the detached actor. | ||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||
| ray.init(ignore_reinit_error=True) | ||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, do |
||||||||||||||||||||||||||||||||||||||||
| actor_handle = ray.get_actor(ACTOR_NAME) | ||||||||||||||||||||||||||||||||||||||||
| ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined] | ||||||||||||||||||||||||||||||||||||||||
| ray.kill(actor_handle) | ||||||||||||||||||||||||||||||||||||||||
| except Exception as e: # noqa: BLE001 | ||||||||||||||||||||||||||||||||||||||||
| logger.warning(f"resumability actor cleanup failed: {e}") | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+310
to
+319
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
With The safest fix is to enqueue a no-op drain call after
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree here!
Comment on lines
+313
to
+319
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
On a subsequent The fix is to unconditionally call |
||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why don't we just error here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, output_tasks can be empty, right? At the end of the pipeline or something?