Skip to content

Pipeline resumability via source-level counter checkpointing#2063

Open
abhinavg4 wants to merge 9 commits into
mainfrom
abhinavg/resumability
Open

Pipeline resumability via source-level counter checkpointing#2063
abhinavg4 wants to merge 9 commits into
mainfrom
abhinavg/resumability

Conversation

@abhinavg4

Copy link
Copy Markdown
Contributor

Discussion (Design Doc)

#2034

Supersedes #2033, which was inadvertently squash-merged into the
abhinavg/sentinel-task feature branch by a stale auto-merge rule when its
base was retargeted. The feature branch was reset to the approved state and
resumability is tracked here instead. Same diff, stacked on #2062.

What

Adds opt-in pipeline resumability via source-level counter checkpointing,
built on top of the sentinel-task refactor in #2062 (this PR is stacked on
that branch — review/merge #2062 first).

Pipeline.run(checkpoint_path=...) tracks which source partitions have fully
drained through the pipeline and skips already-completed ones on a rerun, so an
interrupted run resumes without reprocessing finished work.

How

  • Sentinels (on Refactor empty/sentinel tasks: EmptyTask class + SentinelTask base #2062's SentinelTask base): NoneTask / FailedTask are
    bare subclasses — no identity of their own (dataset_name "none"/"failed",
    task_id assigned by the adapter like any task).
  • backends/base.py process_batch (always-on): a returned None
    ("filter this slot") becomes a NoneTask via a single inline comprehension so
    every output is a real Task and gets a task_id; sentinels are stripped
    before the next stage.
  • _apply_resumability_counters (gated on _is_active, counter-only): a
    source stamps _source_id, skips completed sources, fires +1; a non-source
    fires -1/0 per output (NoneTask-1, FailedTask → no delta, so its
    source stays pending and reruns). Counters key on the parent's identity —
    which is why the sentinels need none of their own. Ambiguous M→K batches
    warn + skip rather than misattribute.
  • LMDB actor + client. lmdb is a (locked) dependency but stays opt-in at
    import time: ACTOR_NAME lives in resumability_client so the always-imported
    worker path never imports lmdb; it loads only when resumability is used.
  • SLURM-array safe. State lives in <checkpoint_path>/.nemo_curator_metadata
    with one LMDB file per writer (<host>-<pid>), not a single shared file
    (LMDB can't be safely shared across hosts on a networked FS like Lustre). Each
    actor writes only its own file and reads the union of completed sources
    across all files on startup, so the tasks of a SLURM array can checkpoint to
    the same directory without contention.

Testing

  • tests/tasks/test_sentinels.pySentinelTask hierarchy (bare construction,
    payload rejection, EmptyTask rooted at "0", task_id not user-settable).
  • tests/backends/test_resumability_adapter.py — counter math + an end-to-end
    NoneNoneTask→strip case (actor RPCs mocked).
  • tests/utils/test_resumability_actor.py — counter dedup, anomaly recovery,
    lifecycle, and multi-writer union (SLURM-array safety).
  • Verified end-to-end in the nemo-curator container: a
    Source → Flaky(random FailedTask) → Sink pipeline re-run against one on-disk
    LMDB checkpoint converges, skips already-completed sources on resume, and
    processes each source exactly once.

@copy-pr-bot

copy-pr-bot Bot commented Jun 10, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test a560bc1

Comment on lines +42 to +44
def _is_active() -> bool:
"""True if a resumability actor is registered in this Ray cluster."""
return _actor() is not None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name / import is really vague, can we be more specific, is_resumability_actor_active()
Same for all methods in this file.

Also, I'm wondering if global _actor is safe versus creating an actor in it's namespace separately.
See https://github.com/NVIDIA-NeMo/Curator/blob/80ad7844ab4124579eb933a918d2d08895d38452/nemo_curator/stages/deduplication/id_generator.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the rename. About the global _actor,it should be ok. We have here: ACTOR_NAME = "nemo_curator_resumability"

Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment on lines +237 to +244
checkpoint_path (str | Path, optional): Directory used for
resumability. When set, completed source partitions are tracked
across runs and skipped on rerun; the tracking state lives in a
``.nemo_curator_metadata`` subdirectory. Multiple independent
runs (e.g. the tasks of a SLURM array) may point at the same
directory — each writes its own LMDB file, so there is no
shared-file contention. The actor lifecycle is owned by this
method; executors are not modified.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit ai slop too long a substring..

Comment thread nemo_curator/pipeline/pipeline.py Outdated
# The executor's ray.shutdown() may have run in its own
# finally:; reconnect to clean up the detached actor.
try:
ray.init(ignore_reinit_error=True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, do with ray.init() and then do this...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan is to remove ray.init() from this file and ensure that the user is able to do it via Ray client

Comment thread nemo_curator/tasks/sentinels.py Outdated
``EmptyTask`` seeds a pipeline (the implicit root id ``"0"``). The resumability
layer adds two more markers on the same :class:`SentinelTask` base:

- ``NoneTask`` — this slot was intentionally filtered. The resumability counter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- ``NoneTask`` this slot was intentionally filtered. The resumability counter
- ``NoneTask`` - this task was intentionally filtered. The resumability counter

Comment thread nemo_curator/tasks/sentinels.py Outdated
- ``NoneTask`` — this slot was intentionally filtered. The resumability counter
treats it as a consumed branch (decrements). The adapter auto-wraps a
returned ``None`` as a ``NoneTask``.
- ``FailedTask`` — this slot failed and should be retried on resume. The counter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- ``FailedTask``this slot failed and should be retried on resume. The counter
- ``FailedTask``this slot failed and should be retried on resume. The resumability counter

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Per-writer LMDB owner that tracks per-source pending counters for

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Too much AI slop here in the docstrings

Comment thread tests/backends/test_resumability_functional.py
# running writers: distinct hosts, or distinct pids on one host). A
# rerun whose pid recycles simply reopens and appends to its old file,
# which is safe (sequential in time).
wid = writer_id or f"{socket.gethostname()}-{os.getpid()}"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm how does this help SLURM Arrays (or even a singular job), on a retry won't we end up getting a different path (pid / hostname) and therefore the retry will think checkpoint doesn't exist and therefore?

I think in this case we might be missing the docstring of what is writer_id or when to specify it.. lol

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writer_id is ckpt path, will add in the docstring

Comment thread nemo_curator/utils/resumability_actor.py
Comment thread nemo_curator/utils/resumability_actor.py
The `isinstance(t, (NoneTask, FailedTask))` filter appeared in three places
(strip-before-next-stage, the counter's "real" outputs, and the source list).
Extract a single `_is_sentinel` helper so the marker definition lives in one
place. Behavior-identical.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 8f1dc0b

Comment thread nemo_curator/utils/resumability_actor.py
Bird's-eye docstring pass — these read fine in isolation but were wrong against
the merged codebase:

- base.py _post_process_task_ids: dropped the stale "until per-slot sentinels
  (NoneTask/FailedTask) land in a later PR" — those sentinels exist now; the
  filter+fan-out-in-one-batch case is still ambiguous, so describe the actual
  fall-through + the per-input-slot workaround instead of a future PR.
- pipeline.py: the sink flag is used by the resumability counters now (this
  branch), not "a follow-up PR"; point at _apply_resumability_counters.
- resumability_client._flush_deltas: there is no "watchdog poll" (the actor
  never raises); fixed, and corrected the stale "_max_pending_calls" ->
  "max_pending_calls" (Ray 2.54 name).
- resumability_actor: rename the dedup key "task_hash" -> "task_id" (it is the
  output task id, never a computed hash) across docstrings, comments, and the
  apply_deltas loop var, matching the client and the codebase's canonical term.

Docstrings/comments + one internal variable rename only; no behavior change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test ec2073b

Comment thread nemo_curator/backends/base.py Outdated
return self._source_counters(output_tasks)

# No outputs at all. Filtering is expressed as None -> NoneTask (a kept
# slot), so a stage that emits nothing is degenerate; there is no output

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we mean by "is degenerate".. can this ever happen? because we have a NoneTask so not output_tasks shouldn't be valid right?

Comment thread nemo_curator/backends/base.py
Tighten the resumability docstrings and inline comments to the load-bearing
facts, cutting repetition and over-explanation (~170 fewer lines): collapse the
actor module/class docstring duplication, condense the counter-logic comments
and _post_process_task_ids docstring, shorten the client/pipeline/sentinels
docstrings, and reduce the test module docstrings to a line or two. Also drop a
couple of commit-centric phrasings ("PR-A", a stale cross-reference) for
timeless wording. Comments/docstrings only — no behavior change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test a3c1d54

@VibhuJawa VibhuJawa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did an intial review to help my understanding . Left some comments

Comment thread nemo_curator/backends/base.py
Comment thread nemo_curator/backends/base.py Outdated
per_task: list[tuple[str, str, int]] = []
real = [t for t in output_tasks if not _is_sentinel(t)]

if len(input_tasks) == 1 and len(output_tasks) != 1:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:

Suggested change
if len(input_tasks) == 1 and len(output_tasks) != 1:
if len(input_tasks) == 1 and len(output_tasks) > 1:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks right

Comment thread nemo_curator/backends/base.py Outdated
Comment thread nemo_curator/backends/base.py Outdated
its own last id segment. Drop already-completed sources; each survivor fires ``+1``."""
sources = [t for t in output_tasks if not _is_sentinel(t)]
for t in sources:
t._source_id = t.task_id.rsplit("_", 1)[-1]

@VibhuJawa VibhuJawa Jun 23, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m a little worried that this is relying on task_id’s string encoding too directly.

task_id is effectively an id path, but here we’re parsing it with rsplit("_", 1) and treating the last path segment as the source identity. That works for the current common source shape, but it makes the resumability logic depend on delimiter
details that are not really owned by this code. It also maybe fragile for cases like N→N source stages where outputs without get_deterministic_id() can all get suffix 0, which would make multiple source partitions share _source_id == "0". (Which is an adverse case i guess ??)

Minor ask: Could we centralize this behind a small task-id helper/API instead of open-coding string parsing? For example, something like:

task.source_id = TaskId.parse(task.task_id).leaf()
 # or
task_id.get_last_segment()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answering both of the above.

We will always have _ delimited cases. The task ID is controlled by base.py only. Slightly above this function. I'm not sure if there's a good way to make resumability independent of this delimiter. But I'm comfortable with it since we assign task ID and we always use '_'

For the case when get_deterministic_id() all give the same value. This is an adverse case. Like this will break a ton of other stuff too (like writers and stuff). In the curator, we do not (Cannot?) ensure that these id's are unique across tasks. We just rely on the user to ensure this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor ask: Could we centralize this behind a small task-id helper/API instead of open-coding string parsing?

Great call out yes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Praateek also asked this somewhere. But yeah, I can make TaskId as a class or something. Or actually I can put this inside task. So we can do task.get_source_id()

Comment thread nemo_curator/backends/base.py
Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment thread nemo_curator/pipeline/pipeline.py Outdated
from nemo_curator.utils.resumability_actor import ResumabilityActor
from nemo_curator.utils.resumability_client import ACTOR_NAME

ray.init(ignore_reinit_error=True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. I don't think we should ray.init here because any subsequent ray.init will be a no-op that doesn't pass env vars and if the client hasn't been started can lead to weird behavior w.r.t the actor getting killed by the shutdowns if any.

Comment thread nemo_curator/pipeline/pipeline.py Outdated
name=ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
max_pending_calls=100,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rationale behind setting this value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No rationale. Just not to overburden the actor. Should we increase this as per you?

``lifetime="detached"`` and closed at end-of-run; ``apply_deltas`` is
fire-and-forget and never raises."""

def __init__(self, base_dir: str, map_size: int = _DEFAULT_MAP_SIZE, writer_id: str | None = None):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just note that creating a ray actor is asynchronous/lazy so if the db hasn't started up properly and we start calling methods that rely on the db being open we can run into issues. Check out how idgen actor solves this by calling ray.wait on another method to ensure init is completed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is addressed by a comment in pipeline.py also

Comment thread nemo_curator/utils/resumability_actor.py
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 6cd99ca

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test ff296a4

1. is_resumable gate. Add `is_resumable: bool = True` to ProcessingStage —
   resumable by DEFAULT. Only stages whose input→output mapping isn't
   source-attributable opt out (set False): the dedup ShuffleStage, LSHStage,
   and ConnectedComponentsStage (shuffle / fan-in). Pipeline.run(checkpoint_path=...)
   raises if any stage in the pipeline is not resumable, naming the offenders.

2. Actor lifecycle no longer calls ray.init/ray.shutdown anywhere (per review:
   a ray.init here is a no-op that drops the executor's env vars and risks the
   actor being killed by stray shutdowns). Instead:
   - create_resumability_actor() requires an already-running cluster
     (ray.is_initialized) and raises a clear "start RayClient first" error
     otherwise; the executor owns the ray.init that wraps execute().
   - the actor is detached + namespaced (namespace == name, like id_generator)
     so workers find it by (name, namespace) and it survives executor shutdowns.
   - actor.wait() (ray.get after spawn) blocks until the checkpoint scan is done
     (creating a Ray actor is lazy/async) and surfaces __init__ errors.
   - shutdown always ray.kill()s even if close() fails; no-op if Ray is already
     down (durable LMDB rows + actor dies with the cluster).

Also: drop max_pending_calls=100 (a cap that raises would drop fire-and-forget
deltas under load); guard apply_deltas against a closed env; note checkpoint_path
must be a LOCAL path (Ayush); TODO to rename client helpers.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 8b4638f

- Rename worker-side client helpers to resumability-specific names
  (praateek/Vibhu): _actor->_resumability_actor, _is_active->
  is_resumability_actor_active, _flush_deltas->flush_resumability_deltas,
  _skip_completed_sources->completed_resumability_sources. Drop the stale
  TODO and the stale max_pending_calls mention in the flush docstring.
- Key resumability counter deltas on the OUTPUT task_id via
  Task.get_source_id() (Vibhu), never the parent's id, so a source's +1
  can't collide with a downstream delta for the same partition.
- Fan-out branch condition !=1 -> >1 in _apply_resumability_counters
  (Vibhu); move `real` into that branch (only used there).
- Reword the empty-output comment (drop "degenerate"); fix the Task
  docstring Attributes (sarah): match field order, add data/_metadata.
- Update test patch targets to the renamed helpers; fix the actor-lookup
  assertion to the namespaced get_actor(name=, namespace=) signature.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 28a994c

…2e test

RayClient.start() launches the cluster and sets RAY_ADDRESS but does NOT
ray.init() the driver; the driver is only connected lazily inside
executor.execute(). create_resumability_actor() ran before execute(), so it
saw ray uninitialized and raised "requires a running Ray cluster" on every
resumable run. (Caught only by a real end-to-end run — every existing
resumability test mocks Ray or uses the undecorated actor class.)

Fix (per review): the pipeline owns the Ray session around the actor, using
the pattern Ray recommends for this:
- require a pre-existing cluster (RAY_ADDRESS) so our session shutdown can't
  tear down the cluster (and the detached actor) out from under the run;
- `with ray.init(): create_resumability_actor(...)` to spawn the detached
  actor, then disconnect BEFORE executor.execute() so the executor's own
  ray.init(runtime_env=...) runs un-nested and its env vars still propagate
  (a nested ray.init's runtime_env is silently dropped);
- a final `with ray.init(): shutdown_resumability_actor()` to close+kill it.
Executors are untouched. The RayClient-required check moves from
create_resumability_actor to Pipeline.run.

Add tests/backends/test_resumability_e2e.py: a real RayActorPoolExecutor +
pipeline.run(checkpoint_path=...) over two runs sharing one checkpoint dir
(using the autouse shared_ray_cluster fixture), asserting completed sources
are skipped and a failed one reruns. This exercises the full pipeline -> Ray
-> actor -> LMDB path and fails on exactly the bug above, so CI now covers it.

Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 22c7d57

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants