From 31298ce1cf2d1636cd45e0d850ea75d6d9bc23a6 Mon Sep 17 00:00:00 2001 From: Denis Nikulin <denis.nikulin@akvelon.com> Date: Thu, 31 Oct 2024 09:31:40 +0400 Subject: [PATCH] Fixed an issue when the timeline update job stops working if network exception occurs (#5022) --- src/Agent.Worker/JobRunner.cs | 7 ++- .../JobServerQueue.cs | 50 +++++++++---------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/Agent.Worker/JobRunner.cs b/src/Agent.Worker/JobRunner.cs index 959834bff3..dfdd1ad8d2 100644 --- a/src/Agent.Worker/JobRunner.cs +++ b/src/Agent.Worker/JobRunner.cs @@ -557,7 +557,12 @@ private async Task ShutdownQueue(bool throwOnFailure) } catch (AggregateException ex) { - ExceptionsUtil.HandleAggregateException((AggregateException)ex, Trace.Error); + ExceptionsUtil.HandleAggregateException(ex, Trace.Error); + + if (throwOnFailure) + { + throw; + } } catch (Exception ex) when (!throwOnFailure) { diff --git a/src/Microsoft.VisualStudio.Services.Agent/JobServerQueue.cs b/src/Microsoft.VisualStudio.Services.Agent/JobServerQueue.cs index 8720ec4b57..087a47ec22 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/JobServerQueue.cs +++ b/src/Microsoft.VisualStudio.Services.Agent/JobServerQueue.cs @@ -437,15 +437,13 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { bool shouldDrain = ForceDrainTimelineQueue; - List<PendingTimelineRecord> pendingUpdates = new List<PendingTimelineRecord>(); + var pendingUpdates = new List<PendingTimelineRecord>(); foreach (var timeline in _allTimelines) { - ConcurrentQueue<TimelineRecord> recordQueue; - if (_timelineUpdateQueue.TryGetValue(timeline, out recordQueue)) + if (_timelineUpdateQueue.TryGetValue(timeline, out ConcurrentQueue<TimelineRecord> recordQueue)) { - List<TimelineRecord> records = new List<TimelineRecord>(); - TimelineRecord record; - while (recordQueue.TryDequeue(out record)) + var records = new List<TimelineRecord>(); + while (recordQueue.TryDequeue(out TimelineRecord record)) { records.Add(record); // process at most 25 timeline records update for each timeline. @@ -470,8 +468,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { foreach (var update in pendingUpdates) { - List<TimelineRecord> bufferedRecords; - if (_bufferedRetryRecords.TryGetValue(update.TimelineId, out bufferedRecords)) + if (_bufferedRetryRecords.TryGetValue(update.TimelineId, out List<TimelineRecord> bufferedRecords)) { update.PendingRecords.InsertRange(0, bufferedRecords); } @@ -484,7 +481,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { try { - Timeline newTimeline = await _jobServer.CreateTimelineAsync(_scopeIdentifier, _hubName, _planId, detailTimeline.Details.Id, default(CancellationToken)); + Timeline newTimeline = await _jobServer.CreateTimelineAsync(_scopeIdentifier, _hubName, _planId, detailTimeline.Details.Id, CancellationToken.None); _allTimelines.Add(newTimeline.Id); pendingSubtimelineUpdate = true; } @@ -502,7 +499,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) try { - await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken)); + await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, CancellationToken.None); if (_bufferedRetryRecords.Remove(update.TimelineId)) { Trace.Verbose("Cleanup buffered timeline record for timeline: {0}.", update.TimelineId); @@ -512,7 +509,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { Trace.Info("Catch exception during update timeline records, try to update these timeline records next time."); Trace.Error(ex); - _bufferedRetryRecords[update.TimelineId] = update.PendingRecords.ToList(); + _bufferedRetryRecords[update.TimelineId] = update.PendingRecords; if (update.TimelineId == _jobTimelineId) { mainTimelineRecordsUpdateErrors.Add(ex); @@ -532,26 +529,25 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) } else { - if (mainTimelineRecordsUpdateErrors.Count > 0 && + if (ForceDrainTimelineQueue) + { + ForceDrainTimelineQueue = false; + } + } + } + + if (runOnce) + { + if (mainTimelineRecordsUpdateErrors.Count > 0 && _bufferedRetryRecords.ContainsKey(_jobTimelineId) && _bufferedRetryRecords[_jobTimelineId] != null && _bufferedRetryRecords[_jobTimelineId].Any(r => r.Variables.Count > 0)) - { - Trace.Info("Fail to update timeline records with output variables. Throw exception to fail the job since output variables are critical to downstream jobs."); - throw new AggregateException(StringUtil.Loc("OutputVariablePublishFailed"), mainTimelineRecordsUpdateErrors); - } - else - { - if (ForceDrainTimelineQueue) - { - ForceDrainTimelineQueue = false; - } - if (runOnce) - { - break; - } - } + { + Trace.Info("Fail to update timeline records with output variables. Throw exception to fail the job since output variables are critical to downstream jobs."); + throw new AggregateException(StringUtil.Loc("OutputVariablePublishFailed"), mainTimelineRecordsUpdateErrors); } + + break; } await Task.Delay(_delayForTimelineUpdateDequeue);