Pipeline resumability via source-level counter checkpointing#2063
Pipeline resumability via source-level counter checkpointing#2063abhinavg4 wants to merge 6 commits into
Conversation
|
/ok to test a560bc1 |
… uniformly The per-source counter deltas were keyed on parent.task_id (the input task's id). But the source stage keys its +1 on the partition's OWN id, and that id is exactly the input id of the next stage — so a downstream delta reused the source's key. The actor treats "same key, different delta" as a conflicting re-fire and overwrites (pending += -old + new), driving the source's pending counter to -1 instead of 0. Result: a source whose output is filtered to a NoneTask never completed and re-ran on every resume. OK passthroughs escaped only by accident — returning the same object causes _post_process_task_ids to re-stamp parent.task_id to a deeper id before the counter reads it; a stage returning a NEW object hits the same collision. Fix: key every delta on the OUTPUT task id (one level deeper than the source's key, unique per (task, stage)), consistent with the source which already keys on its output partition. Specifically: - positional 1:1 keys on r.task_id: None -> -1, Failed -> 0 (no sink test; the source's +1 stays so it reruns), real -> -1 if sink else 0. - fan-out keys on output_tasks[0].task_id (always "<parent>_0": get_deterministic_id is consulted only for source stages, which never reach this branch). The net delta now counts FailedTask (+1, keep source open) and applies is_sink (real children leave at a sink): delta = (0 if is_sink else n_real) + n_failed - 1. sink and fan-out are independent. - empty output (a stage emitting nothing, not even a NoneTask) is skipped. Updated the adapter tests to the output-keyed contract and added fan-out (mixed real/None/Failed, sink fan-out) and empty-output cases. Verified end-to-end on an interactive node (RayActorPoolExecutor + on-disk LMDB): incremental resumption (5/10/15 -> only new sources run), None/Failed positional (only failed reruns), a new-object OK slot (completes, no longer re-runs), and fan-out with a Failed child (only that source reruns). 50 resumability unit tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
|
/ok to test a1b3195 |
|
Pushed a correctness fix ( Bug: a source whose output is filtered to a Root cause: per-source counter deltas were keyed on Fix: key every delta on the output task id (one level deeper, unique per Verified on an interactive node (
50 resumability unit tests pass; adapter tests updated to the output-keyed contract with new fan-out / empty-output cases. |
| ray.init(ignore_reinit_error=True) | ||
| ResumabilityActor.options( # type: ignore[attr-defined] | ||
| name=ACTOR_NAME, | ||
| lifetime="detached", | ||
| get_if_exists=True, | ||
| max_pending_calls=100, | ||
| ).remote(str(checkpoint_path)) |
There was a problem hiding this comment.
Silent actor init failure when LMDB setup fails
The ActorHandle returned by .remote() is discarded. If ResumabilityActor.__init__ raises — for example because the LMDB file cannot be opened (bad permissions, disk full, path is read-only) — the exception is stored in the returned ObjectRef and never surfaced. The actor is placed in a DEAD state and removed from the Ray name registry, so _actor() subsequently returns None, _is_active() returns False, and all checkpointing silently does nothing for the entire run. The user passed checkpoint_path expecting resumability to be active, but gets no error and no indication it isn't working.
A lightweight fix is to call a trivially-cheap method on the handle and ray.get it immediately after construction; this surfaces any __init__ exception synchronously before the pipeline starts.
| ray.init(ignore_reinit_error=True) | |
| ResumabilityActor.options( # type: ignore[attr-defined] | |
| name=ACTOR_NAME, | |
| lifetime="detached", | |
| get_if_exists=True, | |
| max_pending_calls=100, | |
| ).remote(str(checkpoint_path)) | |
| ray.init(ignore_reinit_error=True) | |
| actor_handle = ResumabilityActor.options( # type: ignore[attr-defined] | |
| name=ACTOR_NAME, | |
| lifetime="detached", | |
| get_if_exists=True, | |
| max_pending_calls=100, | |
| ).remote(str(checkpoint_path)) | |
| # Verify the actor started successfully; surfaces any __init__ exception | |
| # (e.g. LMDB open failure) before the pipeline begins so the user is not | |
| # left believing checkpointing is active when it silently isn't. | |
| ray.get(actor_handle.are_completed.remote([]), timeout=30) # type: ignore[attr-defined] |
There was a problem hiding this comment.
Agreed here, in fact we got bit by this (for a different reason)...
If during a retry checkpoint_path contains a lot of data, and the constructor of ResumabilityActor is loading it up, the process is also async.
The ray recommended way is to have a def wait() inside your actor and then do ray.get(actor_handle.wait()) explictly before moving on to next line of code..
Again see
Resumability is hard to cover with pure unit tests because the contract spans the adapter's counter logic, the LMDB-backed actor, and the source-skip on a second run. This drives all three end-to-end WITHOUT a Ray cluster or executor: the real BaseStageAdapter.process_batch over a source->sink flow, with the worker-side client helpers pointed at a real (undecorated) ResumabilityActor instance, across two runs that share a checkpoint dir (distinct per-writer LMDB files + union read). Covers: completed sources are skipped and a failed source reruns; a sink that returns a NEW object still completes (locks the output-id keying fix — under the old parent-id keying the source's +1 and the sink's delta collide and the source never completes); and a None-filtered source completes (is consumed, not left pending like Failed). Runs in the CPU lane (ray + lmdb are deps); no cluster. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
|
/ok to test b7423a1 |
Testing summaryResumability spans the adapter counter logic, the LMDB actor, and the 1. Unit tests (CI, mocked)
2. Functional test (CI, real actor — new in this PR)
3. Manual end-to-end matrix (on a Ray node,
|
| Group | Cases |
|---|---|
| linear 1→1 | transient fail, permanent fail, None completes, fully-resumed empty-stream, multi-source recover, multi-rerun-until-success |
| linear N-stage | fail at non-sink mid stage, None mid, new-object passthrough, fails at different depths |
| fan-out | failed child keeps source open, None child ok vs Failed blocks, fully-filtered source completes, mixed real/None/Failed children |
| fan-out at sink | is_sink term in the fan-out delta |
| fan-in / diamond | documented: not source-attributable → no resume (safe) |
| content-id source | resume is reorder-stable across runs |
| edges | empty source, all-filtered then empty rerun |
Known limitations pinned down (not regressions)
- Fan-in / aggregation (
M→K): one output descends from multiple sources, which source-level counting can't attribute → the counter logs a warning and skips → those sources reprocess every run (safe; no resume speedup). - Fully-resumed run (every source already complete) → the source stage emits nothing and
RayActorPoolExecutorraises"No tasks to process"rather than a clean no-op (pre-existing executor behavior surfaced by resume). - Index-based source ids are reorder-fragile — only
get_deterministic_id()content ids make resume safe under input reordering.
| try: | ||
| ray.init(ignore_reinit_error=True) | ||
| actor_handle = ray.get_actor(ACTOR_NAME) | ||
| ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined] | ||
| ray.kill(actor_handle) | ||
| except Exception as e: # noqa: BLE001 | ||
| logger.warning(f"resumability actor cleanup failed: {e}") |
There was a problem hiding this comment.
ray.kill skipped when close() times out — stale actor not cleaned up
ray.get(close.remote(), timeout=10) and ray.kill share the same try block. If close() exceeds the 10-second deadline (e.g., LMDB flushing a large write), GetTimeoutError is caught by except Exception, and ray.kill is never called. The actor remains alive with lifetime="detached".
On a subsequent pipeline.run(checkpoint_path=<different_path>) against the same Ray cluster, get_if_exists=True silently returns the stale actor — which was initialised with the old path. All checkpointing in the new run writes to the wrong LMDB location and reads a stale completed-sources set, so resumability silently does the wrong thing without any error.
The fix is to unconditionally call ray.kill even if close() didn't succeed, by nesting the close attempt inside its own try/except before the kill.
| def _is_active() -> bool: | ||
| """True if a resumability actor is registered in this Ray cluster.""" | ||
| return _actor() is not None |
There was a problem hiding this comment.
The name / import is really vague, can we be more specific, is_resumability_actor_active()
Same for all methods in this file.
Also, I'm wondering if global _actor is safe versus creating an actor in it's namespace separately.
See https://github.com/NVIDIA-NeMo/Curator/blob/80ad7844ab4124579eb933a918d2d08895d38452/nemo_curator/stages/deduplication/id_generator.py
| checkpoint_path (str | Path, optional): Directory used for | ||
| resumability. When set, completed source partitions are tracked | ||
| across runs and skipped on rerun; the tracking state lives in a | ||
| ``.nemo_curator_metadata`` subdirectory. Multiple independent | ||
| runs (e.g. the tasks of a SLURM array) may point at the same | ||
| directory — each writes its own LMDB file, so there is no | ||
| shared-file contention. The actor lifecycle is owned by this | ||
| method; executors are not modified. |
There was a problem hiding this comment.
nit ai slop too long a substring..
| # The executor's ray.shutdown() may have run in its own | ||
| # finally:; reconnect to clean up the detached actor. | ||
| try: | ||
| ray.init(ignore_reinit_error=True) |
There was a problem hiding this comment.
Same as above, do with ray.init() and then do this...
| ``EmptyTask`` seeds a pipeline (the implicit root id ``"0"``). The resumability | ||
| layer adds two more markers on the same :class:`SentinelTask` base: | ||
|
|
||
| - ``NoneTask`` — this slot was intentionally filtered. The resumability counter |
There was a problem hiding this comment.
| - ``NoneTask`` — this slot was intentionally filtered. The resumability counter | |
| - ``NoneTask`` - this task was intentionally filtered. The resumability counter |
| - ``NoneTask`` — this slot was intentionally filtered. The resumability counter | ||
| treats it as a consumed branch (decrements). The adapter auto-wraps a | ||
| returned ``None`` as a ``NoneTask``. | ||
| - ``FailedTask`` — this slot failed and should be retried on resume. The counter |
There was a problem hiding this comment.
| - ``FailedTask`` — this slot failed and should be retried on resume. The counter | |
| - ``FailedTask`` — this slot failed and should be retried on resume. The resumability counter |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """Per-writer LMDB owner that tracks per-source pending counters for |
There was a problem hiding this comment.
nit Too much AI slop here in the docstrings
| def _new_actor(base_dir: Path, writer_id: str): # noqa: ANN202 (undecorated Ray actor class instance) | ||
| """A real actor instance (undecorated class — no Ray cluster needed), | ||
| writing its own ``<writer_id>.mdb`` and reading the union on startup.""" | ||
| cls = ResumabilityActor.__ray_metadata__.modified_class # type: ignore[attr-defined] | ||
| return cls(str(base_dir), writer_id=writer_id) |
There was a problem hiding this comment.
What does this do? Why are we tapping into __ray_metadata__? seems private?
| # running writers: distinct hosts, or distinct pids on one host). A | ||
| # rerun whose pid recycles simply reopens and appends to its old file, | ||
| # which is safe (sequential in time). | ||
| wid = writer_id or f"{socket.gethostname()}-{os.getpid()}" |
There was a problem hiding this comment.
Hmmm how does this help SLURM Arrays (or even a singular job), on a retry won't we end up getting a different path (pid / hostname) and therefore the retry will think checkpoint doesn't exist and therefore?
I think in this case we might be missing the docstring of what is writer_id or when to specify it.. lol
| subdir=False, | ||
| lock=False, # sole writer of this file → no inter-process lock needed | ||
| max_dbs=1, | ||
| map_size=map_size, |
There was a problem hiding this comment.
Can you explain this arg more?
| sync=True, | ||
| readahead=False, | ||
| ) | ||
| self._db = self._env.open_db(_COMPLETED_DB) |
There was a problem hiding this comment.
What's the _COMPLETED_DB string above? How does this work?
The `isinstance(t, (NoneTask, FailedTask))` filter appeared in three places (strip-before-next-stage, the counter's "real" outputs, and the source list). Extract a single `_is_sentinel` helper so the marker definition lives in one place. Behavior-identical. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
|
/ok to test 8f1dc0b |
|
|
||
|
|
||
| @ray.remote(num_cpus=0, max_concurrency=1) | ||
| class ResumabilityActor: |
There was a problem hiding this comment.
TBH for better testing might be easier to break it into
class ResumabilityActorBase:
# all methods that can be unittestested w/o ray
......
@ray.remote(...)
class ResumabilityActor(ResumabilityActorBase):
# only ray related methodsnit see id_generator
Bird's-eye docstring pass — these read fine in isolation but were wrong against the merged codebase: - base.py _post_process_task_ids: dropped the stale "until per-slot sentinels (NoneTask/FailedTask) land in a later PR" — those sentinels exist now; the filter+fan-out-in-one-batch case is still ambiguous, so describe the actual fall-through + the per-input-slot workaround instead of a future PR. - pipeline.py: the sink flag is used by the resumability counters now (this branch), not "a follow-up PR"; point at _apply_resumability_counters. - resumability_client._flush_deltas: there is no "watchdog poll" (the actor never raises); fixed, and corrected the stale "_max_pending_calls" -> "max_pending_calls" (Ray 2.54 name). - resumability_actor: rename the dedup key "task_hash" -> "task_id" (it is the output task id, never a computed hash) across docstrings, comments, and the apply_deltas loop var, matching the client and the codebase's canonical term. Docstrings/comments + one internal variable rename only; no behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
|
/ok to test ec2073b |
| return self._source_counters(output_tasks) | ||
|
|
||
| # No outputs at all. Filtering is expressed as None -> NoneTask (a kept | ||
| # slot), so a stage that emits nothing is degenerate; there is no output |
There was a problem hiding this comment.
what do we mean by "is degenerate".. can this ever happen? because we have a NoneTask so not output_tasks shouldn't be valid right?
| if all(not t._source_id for t in input_tasks): | ||
| return output_tasks |
There was a problem hiding this comment.
Which tasks are these? What's a pre-source stage? Is it the initial_tasks?
Tighten the resumability docstrings and inline comments to the load-bearing
facts, cutting repetition and over-explanation (~170 fewer lines): collapse the
actor module/class docstring duplication, condense the counter-logic comments
and _post_process_task_ids docstring, shorten the client/pipeline/sentinels
docstrings, and reduce the test module docstrings to a line or two. Also drop a
couple of commit-centric phrasings ("PR-A", a stale cross-reference) for
timeless wording. Comments/docstrings only — no behavior change.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
|
/ok to test a3c1d54 |
VibhuJawa
left a comment
There was a problem hiding this comment.
Did an intial review to help my understanding . Left some comments
| if not output_tasks: | ||
| return output_tasks |
There was a problem hiding this comment.
Question: Why don't we just error here ?
There was a problem hiding this comment.
Hmmm, output_tasks can be empty, right? At the end of the pipeline or something?
| per_task: list[tuple[str, str, int]] = [] | ||
| real = [t for t in output_tasks if not _is_sentinel(t)] | ||
|
|
||
| if len(input_tasks) == 1 and len(output_tasks) != 1: |
There was a problem hiding this comment.
Question:
| if len(input_tasks) == 1 and len(output_tasks) != 1: | |
| if len(input_tasks) == 1 and len(output_tasks) > 1: |
| surviving source fires a ``+1``.""" | ||
| sources = [t for t in output_tasks if not isinstance(t, (NoneTask, FailedTask))] | ||
| for t in sources: | ||
| t._source_id = t.task_id.rsplit("_", 1)[-1] |
There was a problem hiding this comment.
Can we even have multiple delimited cases here, @abhinavg4 ? Or will it be like 0_1_2 ?
| its own last id segment. Drop already-completed sources; each survivor fires ``+1``.""" | ||
| sources = [t for t in output_tasks if not _is_sentinel(t)] | ||
| for t in sources: | ||
| t._source_id = t.task_id.rsplit("_", 1)[-1] |
There was a problem hiding this comment.
I’m a little worried that this is relying on task_id’s string encoding too directly.
task_id is effectively an id path, but here we’re parsing it with rsplit("_", 1) and treating the last path segment as the source identity. That works for the current common source shape, but it makes the resumability logic depend on delimiter
details that are not really owned by this code. It also maybe fragile for cases like N→N source stages where outputs without get_deterministic_id() can all get suffix 0, which would make multiple source partitions share _source_id == "0". (Which is an adverse case i guess ??)
Minor ask: Could we centralize this behind a small task-id helper/API instead of open-coding string parsing? For example, something like:
task.source_id = TaskId.parse(task.task_id).leaf()
# or
task_id.get_last_segment()There was a problem hiding this comment.
Answering both of the above.
We will always have _ delimited cases. The task ID is controlled by base.py only. Slightly above this function. I'm not sure if there's a good way to make resumability independent of this delimiter. But I'm comfortable with it since we assign task ID and we always use '_'
For the case when get_deterministic_id() all give the same value. This is an adverse case. Like this will break a ton of other stuff too (like writers and stuff). In the curator, we do not (Cannot?) ensure that these id's are unique across tasks. We just rely on the user to ensure this.
There was a problem hiding this comment.
Minor ask: Could we centralize this behind a small task-id helper/API instead of open-coding string parsing?
Great call out yes.
There was a problem hiding this comment.
I think Praateek also asked this somewhere. But yeah, I can make TaskId as a class or something. Or actually I can put this inside task. So we can do task.get_source_id()
| for c in real: | ||
| if not c._source_id: | ||
| c._source_id = parent._source_id | ||
| elif len(output_tasks) == len(input_tasks): |
There was a problem hiding this comment.
We're using the number of inputs and outputs as a proxy for whether a stage is 1:1 or fan-in/fan-out type stages. However in some cases (like shuffle) it might be the case that by chance the number of inputs and output tasks are identical. However since the inputs are completely shuffled I don't think we can make assumptions about reusambility.
| Args: | ||
| executor (BaseExecutor): Executor to use | ||
| initial_tasks (list[Task], optional): Initial tasks to start the pipeline with. Defaults to None. | ||
| checkpoint_path (str | Path, optional): Resumability directory. When |
There was a problem hiding this comment.
lets note that this must be local not a remote path
| from nemo_curator.utils.resumability_actor import ResumabilityActor | ||
| from nemo_curator.utils.resumability_client import ACTOR_NAME | ||
|
|
||
| ray.init(ignore_reinit_error=True) |
There was a problem hiding this comment.
+1 on this. I don't think we should ray.init here because any subsequent ray.init will be a no-op that doesn't pass env vars and if the client hasn't been started can lead to weird behavior w.r.t the actor getting killed by the shutdowns if any.
| name=ACTOR_NAME, | ||
| lifetime="detached", | ||
| get_if_exists=True, | ||
| max_pending_calls=100, |
There was a problem hiding this comment.
what's the rationale behind setting this value?
| ``lifetime="detached"`` and closed at end-of-run; ``apply_deltas`` is | ||
| fire-and-forget and never raises.""" | ||
|
|
||
| def __init__(self, base_dir: str, map_size: int = _DEFAULT_MAP_SIZE, writer_id: str | None = None): |
There was a problem hiding this comment.
just note that creating a ray actor is asynchronous/lazy so if the db hasn't started up properly and we start calling methods that rely on the db being open we can run into issues. Check out how idgen actor solves this by calling ray.wait on another method to ensure init is completed.
| METADATA_DIRNAME = ".nemo_curator_metadata" | ||
|
|
||
|
|
||
| @ray.remote(num_cpus=0, max_concurrency=1) |
There was a problem hiding this comment.
should this be 0 cpus? are we okay scheduling work on cpus sharing with this actor?
Discussion (Design Doc)
#2034
What
Adds opt-in pipeline resumability via source-level counter checkpointing,
built on top of the sentinel-task refactor in #2062 (this PR is stacked on
that branch — review/merge #2062 first).
Pipeline.run(checkpoint_path=...)tracks which source partitions have fullydrained through the pipeline and skips already-completed ones on a rerun, so an
interrupted run resumes without reprocessing finished work.
How
SentinelTaskbase):NoneTask/FailedTaskarebare subclasses — no identity of their own (
dataset_name"none"/"failed",task_idassigned by the adapter like any task).backends/base.pyprocess_batch(always-on): a returnedNone("filter this slot") becomes a
NoneTaskvia a single inline comprehension soevery output is a real
Taskand gets atask_id; sentinels are strippedbefore the next stage.
_apply_resumability_counters(gated on_is_active, counter-only): asource stamps
_source_id, skips completed sources, fires+1; a non-sourcefires
-1/0per output (NoneTask→-1,FailedTask→ no delta, so itssource stays pending and reruns). Counters key on the parent's identity —
which is why the sentinels need none of their own. Ambiguous
M→Kbatcheswarn + skip rather than misattribute.
lmdbis a (locked) dependency but stays opt-in atimport time:
ACTOR_NAMElives inresumability_clientso the always-importedworker path never imports
lmdb; it loads only when resumability is used.<checkpoint_path>/.nemo_curator_metadatawith one LMDB file per writer (
<host>-<pid>), not a single shared file(LMDB can't be safely shared across hosts on a networked FS like Lustre). Each
actor writes only its own file and reads the union of completed sources
across all files on startup, so the tasks of a SLURM array can checkpoint to
the same directory without contention.
Testing
tests/tasks/test_sentinels.py—SentinelTaskhierarchy (bare construction,payload rejection,
EmptyTaskrooted at"0",task_idnot user-settable).tests/backends/test_resumability_adapter.py— counter math + an end-to-endNone→NoneTask→strip case (actor RPCs mocked).tests/utils/test_resumability_actor.py— counter dedup, anomaly recovery,lifecycle, and multi-writer union (SLURM-array safety).
nemo-curatorcontainer: aSource → Flaky(random FailedTask) → Sinkpipeline re-run against one on-diskLMDB checkpoint converges, skips already-completed sources on resume, and
processes each source exactly once.