Skip to content

Commit 52ae28f

Browse files
committed
initial file based download
1 parent af2d862 commit 52ae28f

26 files changed

+5745
-123
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"services": [
3+
{
4+
"serviceName": "S3",
5+
"type": "minor",
6+
"changeLogMessages": [
7+
"Created new DownloadWithResponseAsync method on the Amazon.S3.Transfer.TransferUtility class. The new operation supports downloading in parallel parts of the S3 object to a file for improved performance."
8+
]
9+
}
10+
]
11+
}

sdk/src/Services/S3/Custom/Model/GetObjectResponse.cs

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,59 @@ private void ValidateWrittenStreamSize(long bytesWritten)
910910
}
911911

912912
#if BCL || NETSTANDARD
913+
/// <summary>
914+
/// Copies data from ResponseStream to destination stream with progress tracking and validation.
915+
/// Internal method to enable reuse across different download scenarios.
916+
/// </summary>
917+
/// <param name="destinationStream">Stream to write data to</param>
918+
/// <param name="filePath">File path for progress event reporting (can be null)</param>
919+
/// <param name="bufferSize">Buffer size for reading/writing operations</param>
920+
/// <param name="cancellationToken">Cancellation token</param>
921+
/// <param name="validateSize">Whether to validate copied bytes match ContentLength</param>
922+
internal async System.Threading.Tasks.Task WriteResponseStreamAsync(
923+
Stream destinationStream,
924+
string filePath,
925+
int bufferSize,
926+
System.Threading.CancellationToken cancellationToken,
927+
bool validateSize = true)
928+
{
929+
long current = 0;
930+
#if NETSTANDARD
931+
Stream stream = this.ResponseStream;
932+
#else
933+
Stream stream = new BufferedStream(this.ResponseStream);
934+
#endif
935+
byte[] buffer = new byte[bufferSize];
936+
int bytesRead = 0;
937+
long totalIncrementTransferred = 0;
938+
939+
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)
940+
.ConfigureAwait(continueOnCapturedContext: false)) > 0)
941+
{
942+
cancellationToken.ThrowIfCancellationRequested();
943+
944+
await destinationStream.WriteAsync(buffer, 0, bytesRead, cancellationToken)
945+
.ConfigureAwait(continueOnCapturedContext: false);
946+
current += bytesRead;
947+
totalIncrementTransferred += bytesRead;
948+
949+
if (totalIncrementTransferred >= AWSSDKUtils.DefaultProgressUpdateInterval)
950+
{
951+
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed: false);
952+
totalIncrementTransferred = 0;
953+
}
954+
}
955+
956+
if (validateSize)
957+
{
958+
ValidateWrittenStreamSize(current);
959+
}
960+
961+
// Encrypted objects may have size smaller than the total amount of data transferred due to padding.
962+
// Instead of changing the file size or the total downloaded size, pass a flag that indicates transfer is complete.
963+
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed: true);
964+
}
965+
913966
/// <summary>
914967
/// Writes the content of the ResponseStream a file indicated by the filePath argument.
915968
/// </summary>
@@ -930,37 +983,8 @@ public async System.Threading.Tasks.Task WriteResponseStreamToFileAsync(string f
930983

931984
try
932985
{
933-
long current = 0;
934-
#if NETSTANDARD
935-
Stream stream = this.ResponseStream;
936-
#else
937-
Stream stream = new BufferedStream(this.ResponseStream);
938-
#endif
939-
byte[] buffer = new byte[S3Constants.DefaultBufferSize];
940-
int bytesRead = 0;
941-
long totalIncrementTransferred = 0;
942-
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)
943-
.ConfigureAwait(continueOnCapturedContext: false)) > 0)
944-
{
945-
cancellationToken.ThrowIfCancellationRequested();
946-
947-
await downloadStream.WriteAsync(buffer, 0, bytesRead, cancellationToken)
948-
.ConfigureAwait(continueOnCapturedContext: false);
949-
current += bytesRead;
950-
totalIncrementTransferred += bytesRead;
951-
952-
if (totalIncrementTransferred >= AWSSDKUtils.DefaultProgressUpdateInterval)
953-
{
954-
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed:false);
955-
totalIncrementTransferred = 0;
956-
}
957-
}
958-
959-
ValidateWrittenStreamSize(current);
960-
961-
// Encrypted objects may have size smaller than the total amount of data trasnfered due to padding.
962-
// Instead of changing the file size or the total downloaded size, pass a flag that indicate that the transfer is complete.
963-
this.OnRaiseProgressEvent(filePath, totalIncrementTransferred, current, this.ContentLength, completed:true);
986+
await WriteResponseStreamAsync(downloadStream, filePath, S3Constants.DefaultBufferSize, cancellationToken, validateSize: true)
987+
.ConfigureAwait(continueOnCapturedContext: false);
964988
}
965989
finally
966990
{
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*******************************************************************************
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
4+
* this file except in compliance with the License. A copy of the License is located at
5+
*
6+
* http://aws.amazon.com/apache2.0
7+
*
8+
* or in the "license" file accompanying this file.
9+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
10+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
* *****************************************************************************
13+
* __ _ _ ___
14+
* ( )( \/\/ )/ __)
15+
* /__\ \ / \__ \
16+
* (_)(_) \/\/ (___/
17+
*
18+
* AWS SDK for .NET
19+
* API Version: 2006-03-01
20+
*
21+
*/
22+
using System;
23+
using System.IO;
24+
using System.Security.Cryptography;
25+
26+
namespace Amazon.S3.Transfer.Internal
27+
{
28+
/// <summary>
29+
/// Handles atomic file operations for multipart downloads using SEP-compliant temporary file pattern.
30+
/// Creates .s3tmp.{uniqueId} files and ensures atomic commits to prevent partial file corruption.
31+
/// </summary>
32+
internal class AtomicFileHandler : IDisposable
33+
{
34+
private string _tempFilePath;
35+
private bool _disposed = false;
36+
37+
/// <summary>
38+
/// Creates a temporary file with unique identifier for atomic operations.
39+
/// Pattern: {destinationPath}.s3tmp.{8-char-unique-id}
40+
/// Uses FileMode.CreateNew for atomic file creation (no race condition).
41+
/// </summary>
42+
public string CreateTemporaryFile(string destinationPath)
43+
{
44+
if (string.IsNullOrEmpty(destinationPath))
45+
throw new ArgumentException("Destination path cannot be null or empty", nameof(destinationPath));
46+
47+
// Create directory if it doesn't exist (Directory.CreateDirectory is idempotent)
48+
var directory = Path.GetDirectoryName(destinationPath);
49+
if (!string.IsNullOrEmpty(directory))
50+
{
51+
Directory.CreateDirectory(directory);
52+
}
53+
54+
// Try up to 100 times to create unique file atomically
55+
for (int attempt = 0; attempt < 100; attempt++)
56+
{
57+
var uniqueId = GenerateUniqueId(8);
58+
var tempPath = $"{destinationPath}.s3tmp.{uniqueId}";
59+
60+
try
61+
{
62+
// FileMode.CreateNew fails atomically if file exists - no race condition
63+
using (var stream = new FileStream(tempPath, FileMode.CreateNew, FileAccess.Write))
64+
{
65+
// File created successfully - immediately close it
66+
}
67+
68+
_tempFilePath = tempPath;
69+
return tempPath;
70+
}
71+
catch (IOException) when (attempt < 99)
72+
{
73+
// File exists, try again with new ID
74+
continue;
75+
}
76+
}
77+
78+
throw new InvalidOperationException("Unable to generate unique temporary file name after 100 attempts");
79+
}
80+
81+
/// <summary>
82+
/// Atomically commits the temporary file to the final destination.
83+
/// Uses File.Replace for atomic replacement when destination exists, or File.Move for new files.
84+
/// This prevents data loss if the process crashes during commit.
85+
/// </summary>
86+
public void CommitFile(string tempPath, string destinationPath)
87+
{
88+
if (string.IsNullOrEmpty(tempPath))
89+
throw new ArgumentException("Temp path cannot be null or empty", nameof(tempPath));
90+
if (string.IsNullOrEmpty(destinationPath))
91+
throw new ArgumentException("Destination path cannot be null or empty", nameof(destinationPath));
92+
93+
if (!File.Exists(tempPath))
94+
throw new FileNotFoundException($"Temporary file not found: {tempPath}");
95+
96+
try
97+
{
98+
// Use File.Replace for atomic replacement when overwriting existing file
99+
// This prevents data loss if process crashes between delete and move operations
100+
// File.Replace is atomic on Windows (ReplaceFile API) and Unix (rename syscall)
101+
if (File.Exists(destinationPath))
102+
{
103+
File.Replace(tempPath, destinationPath, null);
104+
}
105+
else
106+
{
107+
// For new files, File.Move is sufficient and atomic on same volume
108+
File.Move(tempPath, destinationPath);
109+
}
110+
111+
if (_tempFilePath == tempPath)
112+
_tempFilePath = null; // Successfully committed
113+
}
114+
catch (Exception ex)
115+
{
116+
throw new InvalidOperationException($"Failed to commit temporary file {tempPath} to {destinationPath}", ex);
117+
}
118+
}
119+
120+
/// <summary>
121+
/// Cleans up temporary file in case of failure or cancellation.
122+
/// Safe to call multiple times - File.Delete() is idempotent (no-op if file doesn't exist).
123+
/// </summary>
124+
public void CleanupOnFailure(string tempPath = null)
125+
{
126+
var pathToClean = string.IsNullOrEmpty(tempPath) ? _tempFilePath : tempPath;
127+
128+
if (string.IsNullOrEmpty(pathToClean))
129+
return;
130+
131+
try
132+
{
133+
// File.Delete() is idempotent - doesn't throw if file doesn't exist
134+
File.Delete(pathToClean);
135+
136+
if (_tempFilePath == pathToClean)
137+
_tempFilePath = null;
138+
}
139+
catch (IOException)
140+
{
141+
// Log warning but don't throw - cleanup is best effort
142+
// In production, this would use proper logging infrastructure
143+
}
144+
catch (UnauthorizedAccessException)
145+
{
146+
// Log warning but don't throw - cleanup is best effort
147+
}
148+
}
149+
150+
/// <summary>
151+
/// Generates a cryptographically secure unique identifier of specified length.
152+
/// Uses base32 encoding to avoid filesystem-problematic characters.
153+
/// </summary>
154+
private string GenerateUniqueId(int length)
155+
{
156+
const string base32Chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"; // RFC 4648 base32
157+
158+
using (var rng = RandomNumberGenerator.Create())
159+
{
160+
var bytes = new byte[length];
161+
rng.GetBytes(bytes);
162+
163+
var result = new char[length];
164+
for (int i = 0; i < length; i++)
165+
{
166+
result[i] = base32Chars[bytes[i] % base32Chars.Length];
167+
}
168+
169+
return new string(result);
170+
}
171+
}
172+
173+
public void Dispose()
174+
{
175+
if (!_disposed)
176+
{
177+
// Cleanup any remaining temp file
178+
CleanupOnFailure();
179+
_disposed = true;
180+
}
181+
}
182+
}
183+
}

sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public BufferedPartDataHandler(
5757
_config = config ?? throw new ArgumentNullException(nameof(config));
5858
}
5959

60+
public Task PrepareAsync(DownloadDiscoveryResult discoveryResult, CancellationToken cancellationToken)
61+
{
62+
// No preparation needed for buffered handler - buffers are created on demand
63+
return Task.CompletedTask;
64+
}
65+
6066
/// <inheritdoc/>
6167
public async Task ProcessPartAsync(
6268
int partNumber,
@@ -127,55 +133,39 @@ private async Task<StreamPartBuffer> BufferPartFromResponseAsync(
127133
// Get reference to the buffer for writing
128134
var partBuffer = downloadedPart.ArrayPoolBuffer;
129135

130-
int totalRead = 0;
131-
int chunkCount = 0;
132-
133-
// Read response stream into buffer in chunks based on ContentLength.
134-
// Example: For a 10MB part with 8KB BufferSize:
135-
// - Loop 1: remainingBytes=10MB, readSize=8KB → reads 8KB at offset 0
136-
// - Loop 2: remainingBytes=9.992MB, readSize=8KB → reads 8KB at offset 8KB
137-
// - ...continues until totalRead reaches 10MB (1,280 iterations)
138-
while (totalRead < expectedBytes)
136+
// Create a MemoryStream wrapper around the pooled buffer
137+
// writable: true allows WriteResponseStreamAsync to write to it
138+
// The MemoryStream starts at position 0 and can grow up to initialBufferSize
139+
using (var memoryStream = new MemoryStream(partBuffer, 0, initialBufferSize, writable: true))
139140
{
140-
// Calculate how many bytes we still need to read
141-
int remainingBytes = (int)(expectedBytes - totalRead);
142-
143-
// Read in chunks up to BufferSize, but never exceed remaining bytes
144-
int readSize = Math.Min(remainingBytes, _config.BufferSize);
145-
146-
// Read directly into buffer at current position
147-
int bytesRead = await response.ResponseStream.ReadAsync(
148-
partBuffer,
149-
totalRead,
150-
readSize,
151-
cancellationToken).ConfigureAwait(false);
141+
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Reading response stream into buffer",
142+
partNumber);
143+
144+
// Use GetObjectResponse's stream copy logic which includes:
145+
// - Progress tracking with events
146+
// - Size validation (ContentLength vs bytes read)
147+
// - Buffered reading with proper chunk sizes
148+
await response.WriteResponseStreamAsync(
149+
memoryStream,
150+
null, // destination identifier (not needed for memory stream)
151+
_config.BufferSize,
152+
cancellationToken,
153+
validateSize: true)
154+
.ConfigureAwait(false);
152155

153-
if (bytesRead == 0)
154-
{
155-
var errorMessage = $"Unexpected end of stream while downloading part {partNumber}. " +
156-
$"Expected {expectedBytes} bytes but only received {totalRead} bytes. " +
157-
$"This indicates a network error or S3 service issue.";
158-
159-
Logger.Error(null, "BufferedPartDataHandler: [Part {0}] {1}",
160-
partNumber, errorMessage);
161-
162-
throw new IOException(errorMessage);
163-
}
156+
int totalRead = (int)memoryStream.Position;
164157

165-
totalRead += bytesRead;
166-
chunkCount++;
167-
}
168-
169-
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes in {2} chunks from response stream",
170-
partNumber, totalRead, chunkCount);
158+
Logger.DebugFormat("BufferedPartDataHandler: [Part {0}] Read {1} bytes from response stream",
159+
partNumber, totalRead);
171160

172-
// Set the length to reflect actual bytes read
173-
downloadedPart.SetLength(totalRead);
161+
// Set the length to reflect actual bytes read
162+
downloadedPart.SetLength(totalRead);
174163

175-
if (totalRead != expectedBytes)
176-
{
177-
Logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
178-
partNumber, expectedBytes, totalRead);
164+
if (totalRead != expectedBytes)
165+
{
166+
Logger.Error(null, "BufferedPartDataHandler: [Part {0}] Size mismatch - Expected {1} bytes, read {2} bytes",
167+
partNumber, expectedBytes, totalRead);
168+
}
179169
}
180170

181171
return downloadedPart;

0 commit comments

Comments
 (0)