[train][Preemption handling 2/n] Fan out preemption signal to workers#64099
[train][Preemption handling 2/n] Fan out preemption signal to workers#64099liulehui wants to merge 5 commits into
Conversation
Signed-off-by: Lehui Liu <lehui@anyscale.com>
Signed-off-by: Lehui Liu <lehui@anyscale.com>
Signed-off-by: Lehui Liu <lehui@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request implements a mechanism to forward detected preemption signals from the PreemptionWatcher to individual worker actors, allowing the training loop to react to node drains. It introduces a thread-shared PreemptionContext on the worker's TrainContext and adds a mark_preempt RPC method to the worker actor to receive and store these signals. The feedback highlights a potential race condition where mark_preempt could be called before TrainContext is initialized, and suggests catching RuntimeError to queue the signal. Additionally, suggestions are provided to make internal dataclass fields private by setting init=False and to add defensive checks against unpopulated distributed contexts in the preemption callback.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
Signed-off-by: Lehui Liu <lehui@anyscale.com>
f27329b to
41588c7
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 41588c7. Configure here.
Signed-off-by: Lehui Liu <lehui@anyscale.com>
pseudo-rnd-thoughts
left a comment
There was a problem hiding this comment.
Looks good to me. Do we have any end to end tests like release tests to check the spot node preemption works fully?
| error=error, | ||
| training_report=training_report, | ||
| return_value=return_value, | ||
| preemption_info=train_context.preemption_context.get(), |
There was a problem hiding this comment.
With the threading lock, do we know the effect as the number of nodes scale? Does this act like a sync barrier between all nodes or is it async so minimal overhead is expected?

Description
PreemptionContextin the TrainContextRelated issues
#63968
Additional information
logs to ensure information got fan out to workers: