Skip to content

Commit 0b3ed24

Browse files
committed
Norm PR comments
1 parent 95eddaa commit 0b3ed24

File tree

13 files changed

+1516
-660
lines changed

13 files changed

+1516
-660
lines changed

generator/.DevConfigs/9d07dc1e-d82d-4f94-8700-c7b57f872042.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"serviceName": "S3",
55
"type": "minor",
66
"changeLogMessages": [
7-
"Create OpenStreamWithResponseAsync API and implement multi part download"
7+
"Created new OpenStreamWithResponseAsync method on the Amazon.S3.Transfer.TransferUtility class. The new operation supports downloading in parallel parts of the S3 object in the background while reading from the stream for improved performance."
88
]
99
}
1010
]

sdk/src/Services/S3/Custom/Transfer/Internal/BufferedMultipartStream.cs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ internal class BufferedMultipartStream : Stream
4242
private bool _initialized = false;
4343
private bool _disposed = false;
4444
private DownloadDiscoveryResult _discoveryResult;
45+
private long _totalBytesRead = 0;
4546

4647
/// <summary>
4748
/// Gets the discovery result containing metadata from the initial GetObject response.
@@ -143,6 +144,12 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
143144
var bytesRead = await _partBufferManager.ReadAsync(buffer, offset, count, cancellationToken)
144145
.ConfigureAwait(false);
145146

147+
// Track total bytes read for Position property
148+
if (bytesRead > 0)
149+
{
150+
Interlocked.Add(ref _totalBytesRead, bytesRead);
151+
}
152+
146153
return bytesRead;
147154
}
148155

@@ -153,11 +160,25 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
153160
public override bool CanSeek => false;
154161
public override bool CanWrite => false;
155162

156-
public override long Length => throw new NotSupportedException("Length not supported for multipart download streams");
163+
public override long Length
164+
{
165+
get
166+
{
167+
if (!_initialized)
168+
throw new InvalidOperationException("Stream must be initialized before accessing Length");
169+
return _discoveryResult.ObjectSize;
170+
}
171+
}
172+
157173
public override long Position
158174
{
159-
get => throw new NotSupportedException("Position not supported for multipart download streams");
160-
set => throw new NotSupportedException("Position not supported for multipart download streams");
175+
get
176+
{
177+
if (!_initialized)
178+
throw new InvalidOperationException("Stream must be initialized before accessing Position");
179+
return Interlocked.Read(ref _totalBytesRead);
180+
}
181+
set => throw new NotSupportedException("Position setter not supported for read-only streams");
161182
}
162183

163184
public override void Flush() { }

sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,17 @@ private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
8585
CancellationToken cancellationToken)
8686
{
8787
StreamPartBuffer downloadedPart = null;
88-
byte[] partBuffer = null;
8988

9089
try
9190
{
9291
// Use ContentLength to determine exact bytes to read and allocate
9392
long expectedBytes = response.ContentLength;
9493
int initialBufferSize = (int)expectedBytes;
9594

96-
partBuffer = ArrayPool<byte>.Shared.Rent(initialBufferSize);
95+
downloadedPart = StreamPartBuffer.Create(partNumber, initialBufferSize);
96+
97+
// Get reference to the buffer for writing
98+
var partBuffer = downloadedPart.ArrayPoolBuffer;
9799

98100
int totalRead = 0;
99101

@@ -104,13 +106,6 @@ private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
104106
int remainingBytes = (int)(expectedBytes - totalRead);
105107
int bufferSpace = partBuffer.Length - totalRead;
106108

107-
// Expand buffer if needed BEFORE reading
108-
if (bufferSpace == 0)
109-
{
110-
partBuffer = ExpandPartBuffer(partBuffer, totalRead);
111-
bufferSpace = partBuffer.Length - totalRead;
112-
}
113-
114109
// Read size is minimum of: remaining bytes needed, buffer space available, and configured buffer size
115110
int readSize = Math.Min(Math.Min(remainingBytes, bufferSpace), _config.BufferSize);
116111

@@ -130,36 +125,17 @@ private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
130125
totalRead += bytesRead;
131126
}
132127

133-
// Create ArrayPool-based StreamPartBuffer (no copying!)
134-
downloadedPart = StreamPartBuffer.FromArrayPoolBuffer(
135-
partNumber,
136-
partBuffer, // Transfer ownership to StreamPartBuffer
137-
totalRead // Actual data length
138-
);
139-
140-
partBuffer = null; // Clear reference to prevent return in finally
128+
// Set the length to reflect actual bytes read
129+
downloadedPart.SetLength(totalRead);
130+
141131
return downloadedPart;
142132
}
143-
finally
133+
catch
144134
{
145-
// Only return partBuffer if ownership wasn't transferred
146-
if (partBuffer != null)
147-
ArrayPool<byte>.Shared.Return(partBuffer);
135+
// If something goes wrong, StreamPartBuffer.Dispose() will handle cleanup
136+
downloadedPart?.Dispose();
137+
throw;
148138
}
149139
}
150-
151-
private byte[] ExpandPartBuffer(byte[] currentBuffer, int validDataLength)
152-
{
153-
var newSize = Math.Max(currentBuffer.Length * 2, validDataLength + (int)_config.TargetPartSizeBytes);
154-
var expandedBuffer = ArrayPool<byte>.Shared.Rent(newSize);
155-
156-
// Single copy of valid data to expanded buffer
157-
Buffer.BlockCopy(currentBuffer, 0, expandedBuffer, 0, validDataLength);
158-
159-
// Return old buffer to pool
160-
ArrayPool<byte>.Shared.Return(currentBuffer);
161-
162-
return expandedBuffer;
163-
}
164140
}
165141
}

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadCoordinator.cs

Lines changed: 41 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ internal class MultipartDownloadCoordinator : IDownloadCoordinator
4747
private Exception _downloadException;
4848
private bool _disposed = false;
4949
private bool _discoveryCompleted = false;
50-
private readonly object _lockObject = new object();
5150

5251
private string _savedETag;
5352
private int _discoveredPartCount;
@@ -67,22 +66,16 @@ public Exception DownloadException
6766
{
6867
get
6968
{
70-
lock (_lockObject)
71-
{
72-
return _downloadException;
73-
}
69+
return _downloadException;
7470
}
7571
}
7672

7773
public async Task<DownloadDiscoveryResult> DiscoverDownloadStrategyAsync(CancellationToken cancellationToken)
7874
{
7975
ThrowIfDisposed();
8076

81-
lock (_lockObject)
82-
{
83-
if (_discoveryCompleted)
84-
throw new InvalidOperationException("Discovery has already been performed");
85-
}
77+
if (_discoveryCompleted)
78+
throw new InvalidOperationException("Discovery has already been performed");
8679

8780
try
8881
{
@@ -91,19 +84,13 @@ public async Task<DownloadDiscoveryResult> DiscoverDownloadStrategyAsync(Cancell
9184
? await DiscoverUsingPartStrategyAsync(cancellationToken).ConfigureAwait(false)
9285
: await DiscoverUsingRangeStrategyAsync(cancellationToken).ConfigureAwait(false);
9386

94-
lock (_lockObject)
95-
{
96-
_discoveryCompleted = true;
97-
}
87+
_discoveryCompleted = true;
9888

9989
return result;
10090
}
10191
catch (Exception ex)
10292
{
103-
lock (_lockObject)
104-
{
105-
_downloadException = ex;
106-
}
93+
_downloadException = ex;
10794
throw;
10895
}
10996
}
@@ -161,10 +148,7 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, C
161148
}
162149
catch (Exception ex)
163150
{
164-
lock (_lockObject)
165-
{
166-
_downloadException = ex;
167-
}
151+
_downloadException = ex;
168152

169153
_dataHandler.OnDownloadComplete(ex);
170154
throw;
@@ -304,8 +288,8 @@ private async Task<DownloadDiscoveryResult> DiscoverUsingPartStrategyAsync(Cance
304288

305289
private async Task<DownloadDiscoveryResult> DiscoverUsingRangeStrategyAsync(CancellationToken cancellationToken)
306290
{
307-
// Get target part size for RANGE strategy
308-
var targetPartSize = _request.IsSetPartSize() ? _request.PartSize : _config.TargetPartSizeBytes;
291+
// Get target part size for RANGE strategy (already set in config from request or default)
292+
var targetPartSize = _config.TargetPartSizeBytes;
309293

310294
// SEP Ranged GET Step 1: "create a new GetObject request copying all fields in the original request.
311295
// Set range value to bytes=0-{targetPartSizeBytes-1} to request the first part."
@@ -393,62 +377,62 @@ private GetObjectRequest CreateGetObjectRequest()
393377
return request;
394378
}
395379

396-
private (long startByte, long endByte) CalculatePartRange(int partNumber, long objectSize)
380+
internal (long startByte, long endByte) CalculatePartRange(int partNumber, long objectSize)
397381
{
398-
var targetPartSize = _request.IsSetPartSize() ? _request.PartSize : _config.TargetPartSizeBytes;
382+
var targetPartSize = _config.TargetPartSizeBytes;
399383

400384
var startByte = (partNumber - 1) * targetPartSize;
401385
var endByte = Math.Min(startByte + targetPartSize - 1, objectSize - 1);
402386
return (startByte, endByte);
403387
}
404388

405-
private long ExtractTotalSizeFromContentRange(string contentRange)
389+
internal (long startByte, long endByte, long totalSize) ParseContentRange(string contentRange)
406390
{
407391
if (string.IsNullOrEmpty(contentRange))
408-
throw new InvalidOperationException("Content-Range header is missing from range request response");
409-
410-
// Format: "bytes 0-{end}/{total-size}" or "bytes 0-{end}/*"
411-
var parts = contentRange.Split('/');
412-
if (parts.Length == 2 && parts[1] != "*")
413-
{
414-
if (long.TryParse(parts[1], out var totalSize))
415-
{
416-
return totalSize;
417-
}
418-
}
392+
throw new InvalidOperationException("Content-Range header is missing");
393+
394+
// Format: "bytes {start}-{end}/{total-size}"
395+
var parts = contentRange.Replace("bytes ", "").Split('/');
396+
if (parts.Length != 2)
397+
throw new InvalidOperationException($"Invalid ContentRange format: {contentRange}");
398+
399+
// Parse byte range
400+
var rangeParts = parts[0].Split('-');
401+
if (rangeParts.Length != 2 ||
402+
!long.TryParse(rangeParts[0], out var startByte) ||
403+
!long.TryParse(rangeParts[1], out var endByte))
404+
throw new InvalidOperationException($"Unable to parse ContentRange byte range: {contentRange}");
405+
406+
// Parse total size - S3 always returns exact sizes, never wildcards
407+
if (parts[1] == "*")
408+
throw new InvalidOperationException($"Unexpected wildcard in ContentRange total size: {contentRange}. S3 always returns exact object sizes.");
409+
if (!long.TryParse(parts[1], out var totalSize))
410+
throw new InvalidOperationException($"Unable to parse ContentRange total size: {contentRange}");
411+
412+
return (startByte, endByte, totalSize);
413+
}
419414

420-
throw new InvalidOperationException($"Unable to parse Content-Range header: {contentRange}");
415+
internal long ExtractTotalSizeFromContentRange(string contentRange)
416+
{
417+
var (_, _, totalSize) = ParseContentRange(contentRange);
418+
return totalSize;
421419
}
422420

423-
private void ValidateContentRange(GetObjectResponse response, int partNumber, long objectSize)
421+
internal void ValidateContentRange(GetObjectResponse response, int partNumber, long objectSize)
424422
{
425423
// Ranged GET Step 7:
426424
// "validate that ContentRange matches with the requested range"
427425
if (_request.MultipartDownloadType == MultipartDownloadType.RANGE)
428426
{
429427
var (expectedStartByte, expectedEndByte) = CalculatePartRange(partNumber, objectSize);
430428

431-
// Parse actual ContentRange from response
432-
// Format: "bytes {start}-{end}/{total}"
433-
var contentRange = response.ContentRange;
434-
if (string.IsNullOrEmpty(contentRange))
429+
// Parse actual ContentRange from response using unified helper
430+
if (string.IsNullOrEmpty(response.ContentRange))
435431
{
436432
throw new InvalidOperationException($"ContentRange header missing from part {partNumber} response");
437433
}
438434

439-
var parts = contentRange.Replace("bytes ", "").Split('/');
440-
if (parts.Length != 2)
441-
{
442-
throw new InvalidOperationException($"Invalid ContentRange format: {contentRange}");
443-
}
444-
445-
var rangeParts = parts[0].Split('-');
446-
if (rangeParts.Length != 2 ||
447-
!long.TryParse(rangeParts[0], out var actualStartByte) ||
448-
!long.TryParse(rangeParts[1], out var actualEndByte))
449-
{
450-
throw new InvalidOperationException($"Unable to parse ContentRange: {contentRange}");
451-
}
435+
var (actualStartByte, actualEndByte, _) = ParseContentRange(response.ContentRange);
452436

453437
// Validate range matches what we requested
454438
if (actualStartByte != expectedStartByte || actualEndByte != expectedEndByte)

0 commit comments

Comments
 (0)