Skip to content

Commit f901ac8

Browse files
committed
Add progress tracking
1 parent 407aeb6 commit f901ac8

File tree

8 files changed

+593
-13
lines changed

8 files changed

+593
-13
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"services": [
3+
{
4+
"serviceName": "S3",
5+
"type": "patch",
6+
"changeLogMessages": [
7+
"Added progress tracking events to multipart download operations"
8+
]
9+
}
10+
]
11+
}

sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ internal interface IDownloadManager : IDisposable
4646
/// </summary>
4747
/// <param name="discoveryResult">Results from the discovery phase.</param>
4848
/// <param name="cancellationToken">A token to cancel the download operation.</param>
49+
/// <param name="progressCallback">Optional callback for progress tracking events.</param>
4950
/// <returns>A task that completes when all downloads finish or an error occurs.</returns>
50-
Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken);
51+
Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken, EventHandler<WriteObjectProgressArgs> progressCallback = null);
5152

5253
/// <summary>
5354
/// Exception that occurred during downloads, if any.

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadCommand.cs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ internal partial class MultipartDownloadCommand : BaseCommand<TransferUtilityDow
3636
private readonly IAmazonS3 _s3Client;
3737
private readonly TransferUtilityDownloadRequest _request;
3838
private readonly TransferUtilityConfig _config;
39+
40+
// Track last known transferred bytes from coordinator's progress events
41+
private long _lastKnownTransferredBytes;
3942

4043
private static Logger Logger
4144
{
@@ -111,5 +114,89 @@ private FileDownloadConfiguration CreateConfiguration()
111114
_request.FilePath
112115
);
113116
}
117+
118+
#region Event Firing Methods
119+
120+
/// <summary>
121+
/// Fires the DownloadInitiatedEvent to notify subscribers that the download has started.
122+
/// This event is fired exactly once at the beginning of the download operation.
123+
/// </summary>
124+
private void FireTransferInitiatedEvent()
125+
{
126+
var transferInitiatedEventArgs = new DownloadInitiatedEventArgs(_request, _request.FilePath);
127+
_request.OnRaiseTransferInitiatedEvent(transferInitiatedEventArgs);
128+
}
129+
130+
/// <summary>
131+
/// Fires the DownloadCompletedEvent to notify subscribers that the download completed successfully.
132+
/// This event is fired exactly once when all parts have been downloaded and assembled.
133+
/// Downloads are complete, so transferred bytes equals total bytes.
134+
/// </summary>
135+
/// <param name="response">The unified TransferUtilityDownloadResponse containing S3 metadata</param>
136+
/// <param name="totalBytes">The total number of bytes in the file</param>
137+
private void FireTransferCompletedEvent(TransferUtilityDownloadResponse response, long totalBytes)
138+
{
139+
var transferCompletedEventArgs = new DownloadCompletedEventArgs(
140+
_request,
141+
response,
142+
_request.FilePath,
143+
totalBytes,
144+
totalBytes);
145+
_request.OnRaiseTransferCompletedEvent(transferCompletedEventArgs);
146+
}
147+
148+
/// <summary>
149+
/// Fires the DownloadFailedEvent to notify subscribers that the download failed.
150+
/// This event is fired exactly once when an error occurs during the download.
151+
/// Uses the last known transferred bytes from progress tracking.
152+
/// </summary>
153+
/// <param name="totalBytes">Total file size if known, otherwise -1</param>
154+
private void FireTransferFailedEvent(long totalBytes = -1)
155+
{
156+
var eventArgs = new DownloadFailedEventArgs(
157+
_request,
158+
_request.FilePath,
159+
System.Threading.Interlocked.Read(ref _lastKnownTransferredBytes),
160+
totalBytes);
161+
_request.OnRaiseTransferFailedEvent(eventArgs);
162+
}
163+
164+
#endregion
165+
166+
#region Progress Tracking
167+
168+
/// <summary>
169+
/// Callback for part download progress.
170+
/// Forwards the aggregated progress events from the coordinator to the user's progress callback.
171+
/// The coordinator has already aggregated progress across all concurrent part downloads.
172+
/// Tracks the last known transferred bytes for failure reporting.
173+
/// </summary>
174+
/// <param name="sender">The event sender (coordinator)</param>
175+
/// <param name="e">Aggregated progress information from the coordinator</param>
176+
internal void DownloadPartProgressEventCallback(object sender, WriteObjectProgressArgs e)
177+
{
178+
// Track last known transferred bytes using Exchange (not Add).
179+
//
180+
// Why Exchange? The coordinator already aggregates increments from concurrent parts:
181+
// Coordinator receives: Part 1: +512 bytes, Part 2: +1024 bytes, Part 3: +768 bytes
182+
// Coordinator aggregates: 0 -> 512 -> 1536 -> 2304 (using Interlocked.Add)
183+
// Coordinator passes to us: e.TransferredBytes = 2304 (pre-aggregated total)
184+
//
185+
// We receive the TOTAL (e.TransferredBytes = 2304), not an increment (+768).
186+
// Using Add here would incorrectly accumulate totals: 0 + 2304 + 2304 + ... = wrong!
187+
// Using Exchange correctly stores the latest total: 2304 (overwrite previous value).
188+
//
189+
// Compare to other commands (SimpleUploadCommand, DownloadCommand) which receive
190+
// INCREMENTS directly from SDK streams and must use Add to accumulate them.
191+
System.Threading.Interlocked.Exchange(ref _lastKnownTransferredBytes, e.TransferredBytes);
192+
193+
// Set the Request property to enable access to the original download request
194+
e.Request = _request;
195+
196+
// Forward the coordinator's aggregated progress event to the user
197+
_request.OnRaiseProgressEvent(e);
198+
}
199+
200+
#endregion
114201
}
115202
}

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,17 @@ internal class MultipartDownloadManager : IDownloadManager
5454
private string _savedETag;
5555
private int _discoveredPartCount;
5656

57+
// Progress tracking fields for multipart download aggregation
58+
private long _totalTransferredBytes = 0;
59+
private long _totalObjectSize = 0;
60+
private EventHandler<WriteObjectProgressArgs> _userProgressCallback;
61+
62+
// Atomic flag to ensure completion event fires exactly once
63+
// Without this, concurrent parts completing simultaneously can both see
64+
// transferredBytes >= _totalObjectSize and fire duplicate completion events
65+
// Uses int instead of bool because Interlocked.CompareExchange requires reference types
66+
private int _completionEventFired = 0; // 0 = false, 1 = true
67+
5768
private Logger Logger
5869
{
5970
get { return Logger.GetLogger(typeof(TransferUtility)); }
@@ -133,13 +144,17 @@ public async Task<DownloadDiscoveryResult> DiscoverDownloadStrategyAsync(Cancell
133144
}
134145

135146
/// <inheritdoc/>
136-
public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken)
147+
public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken, EventHandler<WriteObjectProgressArgs> progressCallback = null)
137148
{
138149
ThrowIfDisposed();
139150

140151
if (discoveryResult == null)
141152
throw new ArgumentNullException(nameof(discoveryResult));
142153

154+
// Store for progress aggregation
155+
_userProgressCallback = progressCallback;
156+
_totalObjectSize = discoveryResult.ObjectSize;
157+
143158
Logger.DebugFormat("MultipartDownloadManager: Starting downloads - TotalParts={0}, IsSinglePart={1}",
144159
discoveryResult.TotalParts, discoveryResult.IsSinglePart);
145160

@@ -151,10 +166,27 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, C
151166
// Prepare the data handler (e.g., create temp files for file-based downloads)
152167
await _dataHandler.PrepareAsync(discoveryResult, cancellationToken).ConfigureAwait(false);
153168

169+
// Create delegate once and reuse for all parts
170+
var wrappedCallback = progressCallback != null
171+
? new EventHandler<WriteObjectProgressArgs>(DownloadPartProgressEventCallback)
172+
: null;
173+
174+
// Attach progress callback to Part 1's response if provided
175+
if (wrappedCallback != null)
176+
{
177+
discoveryResult.InitialResponse.WriteObjectProgressEvent += wrappedCallback;
178+
}
179+
154180
// Process Part 1 from InitialResponse (applies to both single-part and multipart)
155181
Logger.DebugFormat("MultipartDownloadManager: Buffering Part 1 from discovery response");
156182
await _dataHandler.ProcessPartAsync(1, discoveryResult.InitialResponse, cancellationToken).ConfigureAwait(false);
157183

184+
// Detach the event handler after processing to prevent memory leak
185+
if (wrappedCallback != null)
186+
{
187+
discoveryResult.InitialResponse.WriteObjectProgressEvent -= wrappedCallback;
188+
}
189+
158190
if (discoveryResult.IsSinglePart)
159191
{
160192
// Single-part: Part 1 is the entire object
@@ -169,7 +201,7 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, C
169201

170202
for (int partNum = 2; partNum <= discoveryResult.TotalParts; partNum++)
171203
{
172-
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, internalCts.Token);
204+
var task = CreateDownloadTaskAsync(partNum, discoveryResult.ObjectSize, wrappedCallback, internalCts.Token);
173205
downloadTasks.Add(task);
174206
}
175207

@@ -245,7 +277,7 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, C
245277

246278

247279

248-
private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, CancellationToken cancellationToken)
280+
private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, EventHandler<WriteObjectProgressArgs> progressCallback, CancellationToken cancellationToken)
249281
{
250282
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNumber);
251283

@@ -301,6 +333,12 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Canc
301333
}
302334

303335
response = await _s3Client.GetObjectAsync(getObjectRequest, cancellationToken).ConfigureAwait(false);
336+
337+
// Attach progress callback to response if provided
338+
if (progressCallback != null)
339+
{
340+
response.WriteObjectProgressEvent += progressCallback;
341+
}
304342

305343
Logger.DebugFormat("MultipartDownloadManager: [Part {0}] GetObject response received - ContentLength={1}",
306344
partNumber, response.ContentLength);
@@ -553,6 +591,53 @@ internal void ValidateContentRange(GetObjectResponse response, int partNumber, l
553591
}
554592
}
555593

594+
/// <summary>
595+
/// Creates progress args with aggregated values for multipart downloads.
596+
/// </summary>
597+
private WriteObjectProgressArgs CreateProgressArgs(long incrementTransferred, long transferredBytes, bool completed = false)
598+
{
599+
string filePath = (_request as TransferUtilityDownloadRequest)?.FilePath;
600+
601+
return new WriteObjectProgressArgs(
602+
_request.BucketName,
603+
_request.Key,
604+
filePath,
605+
_request.VersionId,
606+
incrementTransferred,
607+
transferredBytes,
608+
_totalObjectSize,
609+
completed
610+
);
611+
}
612+
613+
/// <summary>
614+
/// Progress aggregation callback that combines progress across all concurrent part downloads.
615+
/// Uses thread-safe counter increment to handle concurrent updates.
616+
/// Detects completion naturally when transferred bytes reaches total size.
617+
/// Uses atomic flag to ensure completion event fires exactly once.
618+
/// </summary>
619+
private void DownloadPartProgressEventCallback(object sender, WriteObjectProgressArgs e)
620+
{
621+
long transferredBytes = Interlocked.Add(ref _totalTransferredBytes, e.IncrementTransferred);
622+
623+
// Use atomic CompareExchange to ensure only first thread fires completion
624+
bool isComplete = false;
625+
if (transferredBytes >= _totalObjectSize)
626+
{
627+
// CompareExchange returns the original value before the exchange
628+
// If original value was 0 (false), we're the first thread and should fire completion
629+
int originalValue = Interlocked.CompareExchange(ref _completionEventFired, 1, 0);
630+
if (originalValue == 0) // Was false, now set to true
631+
{
632+
isComplete = true;
633+
}
634+
}
635+
636+
// Create and fire aggregated progress event
637+
var aggregatedArgs = CreateProgressArgs(e.IncrementTransferred, transferredBytes, isComplete);
638+
_userProgressCallback?.Invoke(this, aggregatedArgs);
639+
}
640+
556641
private void ThrowIfDisposed()
557642
{
558643
if (_disposed)

sdk/src/Services/S3/Custom/Transfer/Internal/_async/MultipartDownloadCommand.async.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public override async Task<TransferUtilityDownloadResponse> ExecuteAsync(Cancell
3333
{
3434
// Validate request parameters
3535
ValidateRequest();
36+
37+
// Fire initiated event before starting any network operations
38+
FireTransferInitiatedEvent();
3639

3740
// Create configuration from request settings
3841
var config = CreateConfiguration();
@@ -54,19 +57,23 @@ public override async Task<TransferUtilityDownloadResponse> ExecuteAsync(Cancell
5457
dataHandler,
5558
RequestEventHandler))
5659
{
60+
long totalBytes = -1;
5761
try
5862
{
5963
// Step 1: Discover download strategy (PART or RANGE) and get metadata
6064
Logger.DebugFormat("MultipartDownloadCommand: Discovering download strategy");
6165
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(cancellationToken)
6266
.ConfigureAwait(false);
6367

68+
totalBytes = discoveryResult.ObjectSize;
69+
70+
6471
Logger.DebugFormat("MultipartDownloadCommand: Discovered {0} part(s), total size: {1} bytes, IsSinglePart={2}",
6572
discoveryResult.TotalParts, discoveryResult.ObjectSize, discoveryResult.IsSinglePart);
6673

6774
// Step 2: Start concurrent downloads for all parts
68-
Logger.DebugFormat("MultipartDownloadCommand: Starting downloads for {0} part(s)", discoveryResult.TotalParts);
69-
await coordinator.StartDownloadsAsync(discoveryResult, cancellationToken)
75+
Logger.DebugFormat("Starting downloads for {0} part(s)", discoveryResult.TotalParts);
76+
await coordinator.StartDownloadsAsync(discoveryResult, cancellationToken, DownloadPartProgressEventCallback)
7077
.ConfigureAwait(false);
7178

7279
// Step 2b: Wait for all downloads to complete before returning
@@ -110,11 +117,18 @@ await coordinator.StartDownloadsAsync(discoveryResult, cancellationToken)
110117
mappedResponse.ChecksumSHA256 = null;
111118
}
112119

120+
// Fire completed event
121+
FireTransferCompletedEvent(mappedResponse, totalBytes);
122+
113123
return mappedResponse;
114124
}
115125
catch (Exception ex)
116126
{
117127
Logger.Error(ex, "Exception during multipart download");
128+
129+
// Fire failed event
130+
FireTransferFailedEvent(totalBytes);
131+
118132
throw;
119133
}
120134
}

0 commit comments

Comments
 (0)