Skip to content

Conversation

@GarrettBeatty
Copy link
Contributor

@GarrettBeatty GarrettBeatty commented Nov 14, 2025

This PR implements multipart downloads for new api OpenStreamWithResponseAsync, which download parts concurrently in the background while buffering them in memory to provide sequential ordered access

OpenStreamWithResponseAsync will be a new api that returns metadata to the user about the response and implements multi part download behind the scenes. it looks like this for the user

using var response = await _transferUtility.OpenStreamWithResponseAsync(streamRequest);
// response contains metadata about the response and the actual response stream
// can access response.Etag and other metadata attributes now


var buffer = new byte[32 * 1024 * 1024]; // 32MB buffer
int bytesRead;

while ((bytesRead = await response.ResponseStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
   // read response stream. notice the user needs to do response.ResponseStream now
}

compared to old api (no response metadata and no multipart download)


using var s3Stream = await _transferUtility.OpenStreamAsync(streamRequest);
// s3stream is the literal responsestream


var buffer = new byte[32 * 1024 * 1024]; // 32MB buffer
int bytesRead;

while ((bytesRead = await s3stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
    // read response stream. notice user just does s3stream.ReadAsync

I am open to better naming suggestions than OpenStreamWithResponseAsync if people have better suggestions

Description

Overview of the classes. To review this PR, i recommend reviewing locally and starting with OpenStreamWithResponseCommand and following the flow of logic from there.

OpenStreamWithResponseCommand - Sets up the download and returns a response with the stream to the user.

BufferedMultipartStream - The Stream object users read from; delegates all the work to other components.

MultipartDownloadCoordinator - Figures out if the file needs multiple downloads and launches them concurrently in the background.

PartBufferManager - Makes sure parts are read in the correct order (1, 2, 3...) even though they download concurrently.

BufferedPartDataHandler - Downloads each part from S3 and stores it in memory using ArrayPool buffers. In future we will have FilePartDataHandler or similar to download to a file instead of a buffer

StreamPartBuffer - Holds the actual bytes for one part and returns the memory to the pool when done.

Motivation and Context

#3806

Testing

  1. Unit Tests
  2. Integration Tests
  3. Manual Tests using Add Dotnet Benchmark Runner awslabs/aws-crt-s3-benchmarks#103

Trying with 5Gb file using OpenStreamWithResponseAsync

Total bytes per run: 5,368,709,120

Run:1 Secs:2.363672 Gb/s:18.170740
Run:2 Secs:1.281866 Gb/s:33.505587
Run:3 Secs:1.082785 Gb/s:39.665944
Run:4 Secs:1.057797 Gb/s:40.602941
Run:5 Secs:0.963468 Gb/s:44.578213
Run:6 Secs:1.000351 Gb/s:42.934607
Run:7 Secs:0.979741 Gb/s:43.837787
Run:8 Secs:0.979618 Gb/s:43.843296
Run:9 Secs:0.978436 Gb/s:43.896265
Run:10 Secs:0.934025 Gb/s:45.983449

vs old result OpenStreamAsync

Total bytes per run: 5,368,709,120

Run:1 Secs:53.916500 Gb/s:0.796596
Run:2 Secs:53.691880 Gb/s:0.799929
Run:3 Secs:53.697803 Gb/s:0.799840
Run:4 Secs:53.691046 Gb/s:0.799941
Run:5 Secs:53.690165 Gb/s:0.799954
Run:6 Secs:53.691007 Gb/s:0.799942
Run:7 Secs:53.720662 Gb/s:0.799500
Run:8 Secs:53.690802 Gb/s:0.799945
Run:9 Secs:53.690605 Gb/s:0.799948
Run:10 Secs:53.690745 Gb/s:0.799946
  1. Dry run - d002a133-4e40-4cb6-a606-b4d80ebb4094 pass

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist

  • My code follows the code style of this project
  • My change requires a change to the documentation
  • I have updated the documentation accordingly
  • I have read the README document
  • I have added tests to cover my changes
  • All new and existing tests passed

License

  • I confirm that this pull request can be released under the Apache 2 license

@GarrettBeatty GarrettBeatty changed the base branch from development to feature/transfermanager November 14, 2025 19:47
@GarrettBeatty GarrettBeatty changed the base branch from feature/transfermanager to gcbeatty/tuconfig November 14, 2025 19:48
Copilot finished reviewing on behalf of GarrettBeatty November 14, 2025 21:21
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements multipart download functionality with buffering for the OpenStreamWithResponseAsync API in the S3 Transfer Utility. The implementation follows the AWS S3 Enhancement Proposal (SEP) for multipart downloads, supporting both PART and RANGE strategies.

Key changes:

  • New OpenStreamWithResponseAsync methods that return stream and metadata together
  • Complete multipart download infrastructure with BufferedMultipartStream, MultipartDownloadCoordinator, and PartBufferManager
  • ArrayPool-based buffer management for efficient memory usage
  • Comprehensive test coverage for all new components

Reviewed Changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
sdk/src/Services/S3/Custom/Transfer/_async/TransferUtility.async.cs Adds public API methods for OpenStreamWithResponseAsync
sdk/src/Services/S3/Custom/Transfer/_async/ITransferUtility.async.cs Adds interface definitions for new methods
sdk/src/Services/S3/Custom/Transfer/Internal/_async/OpenStreamWithResponseCommand.async.cs Implements command execution logic
sdk/src/Services/S3/Custom/Transfer/Internal/OpenStreamWithResponseCommand.cs Base command class for the new operation
sdk/src/Services/S3/Custom/Transfer/Internal/BufferedMultipartStream.cs Main stream implementation supporting both single-part and multipart
sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadCoordinator.cs Orchestrates multipart downloads with PART/RANGE strategies
sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs Manages part buffers with sequential access control
sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs ArrayPool-based buffer container
sdk/src/Services/S3/Custom/Transfer/Internal/BufferedDataSource.cs Data source for buffered parts
sdk/src/Services/S3/Custom/Transfer/Internal/SinglePartStreamHandler.cs Stream handler for single-part downloads
sdk/src/Services/S3/Custom/Transfer/Internal/MultipartStreamHandler.cs Stream handler for multipart downloads
sdk/src/Services/S3/Custom/Transfer/Internal/StreamConfiguration.cs Configuration settings container
sdk/src/Services/S3/Custom/Transfer/Internal/RequestMapper.cs Centralizes GetObjectRequest creation
sdk/src/Services/S3/Custom/Transfer/Internal/BaseCommand.cs Refactored to use RequestMapper
Multiple interface files Define contracts for coordinator, buffer manager, handlers, and data sources
Multiple test files Comprehensive unit tests for all components
sdk/src/Services/S3/Properties/AssemblyInfo.cs Adds InternalsVisibleTo for Moq testing

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 34 out of 34 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

sdk/src/Services/S3/Custom/Transfer/Internal/StreamPartBuffer.cs:1

  • The pragma directives for CA1031 are unnecessary here since the SuppressMessage attribute on line 132 already handles this warning. The pragma directives should be removed to avoid redundant suppression.
/*******************************************************************************

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 39 out of 39 changed files in this pull request and generated no new comments.

/// Buffers downloaded parts in memory using ArrayPool and IPartBufferManager.
/// Implements current streaming behavior for multipart downloads.
/// </summary>
internal class BufferedPartDataHandler : IPartDataHandler
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for downloading to files in the future we can implement the IPartDataHandler logic in a FilePartDataHandler and just change the implementation of FilePartDataHandler's ProcessPartAsync to write to a file instead of a buffer. This will let use re-use the multi part download coordinator in that implementation

Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// <summary>
/// Waits for all of the tasks to complete or till any task fails or is canceled.
/// </summary>
protected static async Task<List<T>> WhenAllOrFirstExceptionAsync<T>(List<Task<T>> pendingTasks, CancellationToken cancellationToken)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to TaskHelpers.cs so i can use from a non Command class

int readSize = Math.Min(Math.Min(remainingBytes, bufferSpace), _config.BufferSize);

// Read directly into final destination at correct offset
int bytesRead = await response.ResponseStream.ReadAsync(
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading from the http response here is where the current performance bottleneck is. open to suggestions on how to make this faster in future

var response = await _s3Client.GetObjectAsync(getRequest, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);
_responseStream = response.ResponseStream;
// TODO map and return response
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed here since this is old api which isnt returning response object

#line hidden
this.Write(@"[assembly: InternalsVisibleTo(""AWSSDK.UnitTests.S3.NetFramework, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db5f59f098d27276c7833875a6263a3cc74ab17ba9a9df0b52aedbe7252745db7274d5271fd79c1f08f668ecfa8eaab5626fa76adc811d3c8fc55859b0d09d3bc0a84eecd0ba891f2b8a2fc55141cdcc37c2053d53491e650a479967c3622762977900eddbf1252ed08a2413f00a28f3a0752a81203f03ccb7f684db373518b4"")]
[assembly: InternalsVisibleTo(""AWSSDK.UnitTests.NetFramework, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db5f59f098d27276c7833875a6263a3cc74ab17ba9a9df0b52aedbe7252745db7274d5271fd79c1f08f668ecfa8eaab5626fa76adc811d3c8fc55859b0d09d3bc0a84eecd0ba891f2b8a2fc55141cdcc37c2053d53491e650a479967c3622762977900eddbf1252ed08a2413f00a28f3a0752a81203f03ccb7f684db373518b4"")]
[assembly: InternalsVisibleTo(""DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7"")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// <summary>
/// Calculates the MD5 checksum of a byte array.
/// </summary>
private static string CalculateChecksum(byte[] data)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we dont support full object checksum for multi part download (since s3 doesnt return it on getobject requests for individual parts) i manually calculate the checksum to verify everything is ok

/// A cancellation token that can be used by other objects or threads to receive notice of cancellation.
/// </param>
/// <returns>The task object representing the asynchronous operation.</returns>
Task<Stream> OpenStreamAsync(TransferUtilityOpenStreamRequest request, CancellationToken cancellationToken = default(CancellationToken));
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

planning on marking OpenStreamAsync as deprecated in the future PR once we get usage out of this one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend updating the docs of the existing method encouraging users to use the new operations. Talk about the new operations use parallel downloads from S3 and memory buffering to improve performance.

{
internal GetObjectRequest ConvertToGetObjectRequest(BaseDownloadRequest request)
{
GetObjectRequest getRequest = new GetObjectRequest()
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to RequestMapper.cs so i can call from a non command class (multipartdownloadcoordinator)

{
public override async Task<TransferUtilityOpenStreamResponse> ExecuteAsync(CancellationToken cancellationToken)
{
var bufferedStream = BufferedMultipartStream.Create(_s3Client, _request, _config, this.RequestEventHandler);
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing in this.RequestEventHandler as a way to set the user agent string to include OpenStreamWithResponseCommand as the caller (similar what we do with other commands, if you check BaseCommand.cs). passing this in felt ugly but it was the only easy way i could think of to solve things

see https://github.com/aws/aws-sdk-net/pull/4130/files#r2532234671

if (wsArgs != null)
{
((Runtime.Internal.IAmazonWebServiceRequest)wsArgs.Request).UserAgentDetails.AddFeature(UserAgentFeatureId.S3_TRANSFER);
((Runtime.Internal.IAmazonWebServiceRequest)wsArgs.Request).UserAgentDetails.AddUserAgentComponent("md/" + this.GetType().Name);
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we do this.GetType here that will set it as whatevever command name called it, which is why i pass this in from openstreamcommandasync

}

// Delegate data handling to the handler
await _dataHandler.ProcessPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/aws/aws-sdk-net/pull/4130/files#r2532203647 see here. right now we only implement for buffering parts to buffer but in future when we add downloading to a file we can implement this interface easily

/// <summary>
/// Next expected part number in the sequence.
/// </summary>
int NextExpectedPartNumber { get; }
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im only exposing this publicly to make the unit tests easier to read/write. in reality it could really be a private field. its easier to check that the next expected part number is getting updated in certain testing scenarios (such as when doing ReadAsync and it reads across multiple parts) rather than checking the individual buffers/bytes etc

/// A cancellation token that can be used by other objects or threads to receive notice of cancellation.
/// </param>
/// <returns>The task object representing the asynchronous operation with response metadata.</returns>
Task<TransferUtilityOpenStreamResponse> OpenStreamWithResponseAsync(string bucketName, string key, CancellationToken cancellationToken = default(CancellationToken));
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenStreamWithResponse will be a new api that returns metadata to the user about the response and implements multi part download behind the scenes. it looks like this for the user

using var response = await _transferUtility.OpenStreamWithResponseAsync(streamRequest);
// response contains metadata about the response and the actual response stream
// can access response.Etag and other metadata attributes now


var buffer = new byte[32 * 1024 * 1024]; // 32MB buffer
int bytesRead;

while ((bytesRead = await response.ResponseStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
   // read response stream. notice the user needs to do response.ResponseStream now
}

compared to old api (no response metadata and no multipart download)


using var s3Stream = await _transferUtility.OpenStreamAsync(streamRequest);
// s3stream is the literal responsestream


var buffer = new byte[32 * 1024 * 1024]; // 32MB buffer
int bytesRead;

while ((bytesRead = await s3stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
    // read response stream. notice user just does s3stream.ReadAsync

I am open to better naming suggestions than OpenStreamWithResponse if people have better suggestions

@GarrettBeatty GarrettBeatty changed the title Multi Part Download OpenStreamWithResponseAsync Multi Part Download + OpenStreamWithResponseAsync Nov 16, 2025
@GarrettBeatty
Copy link
Contributor Author

GarrettBeatty commented Nov 19, 2025

@normj i addressed all of the current comments in 0b3ed24

i still need to test with the encryption client though, i will do that today

Base automatically changed from gcbeatty/tuconfig to feature/transfermanager November 19, 2025 17:54
@GarrettBeatty
Copy link
Contributor Author

this is ready for another review. the only thing i havent touched yet is anything encryption client related. will try out java's encryption client today to see if that works for part get

Copy link
Member

@normj normj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to use the SDK logger and add lots of logging. In particular when requests are made, what strategy is used when semaphores and autoresetsevent are being used. Imagine what logs you would want the customer to send use to help debug a hard to reproduce issue.

We also need to figure out what to do if the initial part size is too large to buffer and if we are doing part and the object is a single part how to just return back the stream instead of buffering. I'm okay punting those issues to future PRs but create the backlog items first so we don't forget.

// Process Part 1 from InitialResponse (applies to both single-part and multipart)
await _dataHandler.ProcessPartAsync(1, discoveryResult.InitialResponse, cancellationToken).ConfigureAwait(false);

if (discoveryResult.IsSinglePart)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we see it is a single part can we rework this so we just hand back the network stream instead of buffering the single part stream and then returning back to the user.

I think this is important to do before we release but okay doing this as a separate PR to the feature branch to stop this PR from growing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im going to create a backlog for this. This needs to apply to both the first part and and other parts where applicable. For example, if the user sets concurrent downloads to 1, we would expect parts 1, 2, 3, etc all to be read in serial from the network stream directly. so this will require changes in multiple places to suppport.

i have made DOTNET-8400

///
/// SYNCHRONIZATION PRIMITIVES AND THEIR PURPOSES:
///
/// 1. _lockObject (object lock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get rid of the _lockObject by

  • Combining _downloadComplete and _downloadException as a Tuple<bool, Exception> so they can be set atomically.
  • Mark the new Tuple and _nextExpectedPartNumber as volatile
  • When incrementing _nextExpectedPartNumber use Interlocked.Increment(ref _nextExpectedPartNumber);

This will remove the concern of having to keep getting the _lockObject and making sure the lock is always being used correctly. For example at the start of ReadFromCurrentPartAsync you are reading _nextExpectedPartNumber without a lock.

Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im trying to remember why i actually used the lock for the next expected part number. i dont think its actually needed for it and it can just be a regular int.

The tuple though must definitely be volatile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i ended up removing the lock/volatile logic for _nextExpectedPartNumber. its only ever being updated by the consumer which should be on a single thread.

I made updates to use the tuple for the exception and download complete

public DownloadDiscoveryResult DiscoveryResult => _discoveryResult;

/// <summary>
/// Creates a new BufferedMultipartStream with dependency injection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use doc references for BufferedMultipartStream

}

/// <summary>
/// Factory method to create BufferedMultipartStream with default dependencies.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use doc references for BufferedMultipartStream

/// <param name="request">Stream request parameters.</param>
/// <param name="transferConfig">Transfer utility configuration.</param>
/// <param name="requestEventHandler">Optional request event handler for user agent tracking.</param>
/// <returns>A new BufferedMultipartStream instance.</returns>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use doc references for BufferedMultipartStream

// Determine target part size from request or use 8MB default
long targetPartSize = request.IsSetPartSize()
? request.PartSize
: 8 * 1024 * 1024; // 8MB default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to a constants class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed to use s3constants

Comment on lines 104 to 105
if (_disposed)
throw new ObjectDisposedException(nameof(BufferedMultipartStream));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add/use the ThrowIfDisposed method (reuse it)

/// Allows implementations to perform cleanup or commit operations.
/// </summary>
/// <param name="exception">Exception if download failed, null if successful</param>
void OnDownloadComplete(Exception exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the exception optional. No need to explicitly pass null when the download is successful

}

// Mark successful completion
_dataHandler.OnDownloadComplete(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a fan of needing to explicitly pass null. Exception should be optional

Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think its safer personally to keep explicit null because it prevents developers from accidentally not passing the exception and marking the download as complete when in reality they just forgot to pass the exception

namespace Amazon.S3.Transfer.Internal
{
/// <summary>
/// Enhanced OpenStream command that uses BufferedMultipartStream for improved multipart download handling.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add doc refs

internal static class RequestMapper
{
/// <summary>
/// Maps a BaseDownloadRequest to GetObjectRequest.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add doc refs

using System.Diagnostics.CodeAnalysis;
using Amazon.S3.Model;

namespace Amazon.S3.Transfer.Internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add doc refs to all referenced classes

@GarrettBeatty
Copy link
Contributor Author

GarrettBeatty commented Nov 21, 2025

@normj - addressed your comments here. i made one commit for your comments, you can check.


@philasmar also addressed your comments, made multiple commits for those

Logger.DebugFormat("MultipartDownloadManager: Waiting for {0} download tasks to complete", expectedTaskCount);

// Wait for all downloads to complete (fails fast on first exception)
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, cancellationToken).ConfigureAwait(false);
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apparently there is a bug here. i was doing performance testing with a 30Gb file. theres a deadlock situation here where if the buffer for the parts is full, it blocks the the consumer from reading, so we need to start this as a background task so we can return immediately and allow the consumer to read. working on a fix for it still

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0f82dfe @normj FYI wanted to make sure you saw this change too

Copy link
Member

@normj normj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just have one minor nit comment that I think you should do but I don't need to review again after that change.

/// - Each signals _partAvailable
/// - Consumer waiting for part 1 wakes up when part 1 is added
/// </remarks>
public Task AddDataSourceAsync(IPartDataSource dataSource, CancellationToken cancellationToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method and AddBufferAsync need to be async? I'm guess this is leftover from a previous version and they can be made sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to be sync

stopwatch.Stop();

// Assert - Should return immediately for single-part
Assert.IsTrue(stopwatch.ElapsedMilliseconds < 100,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self, remove this line since its flaky

@GarrettBeatty GarrettBeatty merged commit b359696 into feature/transfermanager Nov 28, 2025
1 check passed
@dscpinheiro dscpinheiro deleted the gcbeatty/mpdstream branch November 29, 2025 02:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants