Skip to content

Commit

Permalink
Handle Case Where Recently Launched Worker Does Not Immediately Heart…
Browse files Browse the repository at this point in the history
…beat (#741)

* Handle Case Where Recently Launched Worker Does Not Immediately Heartbeat

It seems like there is a race condition where a recently launched worker has
not sent a heartbeat, the duration is still within the missed heartbeat
threshold and the JobActor treats the lack of heartbeat with a resubmit.
If this is true, this can lead to mass resubmits when a new leader is elected.

* Update upload-artifact to v4

* Temporarily fix tests
  • Loading branch information
kmg-stripe authored Jan 13, 2025
1 parent 292c419 commit 86a0916
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/nebula-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
CI_BRANCH: ${{ github.ref }}
COVERALLS_REPO_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Upload Test Results
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: always()
with:
name: Unit Test Results
Expand All @@ -59,7 +59,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: Event File
path: ${{ github.event_path }}
Original file line number Diff line number Diff line change
Expand Up @@ -1943,9 +1943,13 @@ public void checkHeartBeats(Instant currentTime) {
acceptedAt);
}
} else {
// no heartbeat or heartbeat too old
if (!workerMeta.getLastHeartbeatAt().isPresent() || Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds()
> missedHeartBeatToleranceSecs) {
// no heartbeat in a timely manner since launched or heartbeat too old
// note: the worker has been launched
boolean noTimelyHeartbeatSinceLaunched = !workerMeta.getLastHeartbeatAt().isPresent()
&& Duration.between(Instant.ofEpochSecond(workerMeta.getLaunchedAt()), currentTime).getSeconds() > missedHeartBeatToleranceSecs;
boolean heartbeatTooOld = workerMeta.getLastHeartbeatAt().isPresent()
&& Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() > missedHeartBeatToleranceSecs;
if (noTimelyHeartbeatSinceLaunched || heartbeatTooOld) {
this.numWorkerMissingHeartbeat.increment();

if (!workerMeta.getLastHeartbeatAt().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,9 @@ public void testNoHeartBeatAfterLaunchResubmit() {
assertEquals(JobState.Accepted, resp4.getJobMetadata().get().getState());

// 1 original submissions and 0 resubmits because of worker not in launched state with HB timeouts
verify(schedulerMock, times(2)).scheduleWorkers(any());
verify(schedulerMock, times(1)).scheduleWorkers(any());
// 1 kills due to resubmits
verify(schedulerMock, times(1)).unscheduleAndTerminateWorker(eq(workerId2), any());
verify(schedulerMock, times(0)).unscheduleAndTerminateWorker(eq(workerId2), any());
} catch (Exception e) {
fail("unexpected exception " + e.getMessage());
}
Expand Down

0 comments on commit 86a0916

Please sign in to comment.