Skip to content

Conversation

jterapin
Copy link
Contributor

@jterapin jterapin commented Oct 7, 2025

This PR introduces a lightweight DefaultExecutor to replace the legacy :thread_count approach, laying the foundation for directory upload/download feature. Additionally, we support the following:

  • Custom executor support - Customers are able to pass in their own executor implementation (must implement DefaultExecutor interface)
  • :thread_count configuration is still respected - Existing :thread_count option continues to work by
    automatically creating a DefaultExecutor with the specified thread count

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

  1. To make sure we include your contribution in the release notes, please make sure to add description entry for your changes in the "unreleased changes" section of the CHANGELOG.md file (at corresponding gem). For the description entry, please make sure it lives in one line and starts with Feature or Issue in the correct format.

  2. For generated code changes, please checkout below instructions first:
    https://github.com/aws/aws-sdk-ruby/blob/version-3/CONTRIBUTING.md

Thank you for your contribution!

Copy link

github-actions bot commented Oct 7, 2025

Detected 1 possible performance regressions:

  • aws-sdk-s3.get_object_small_allocated_kb - z-score regression: 64.25 -> 64.39. Z-score: Infinity

@jterapin jterapin marked this pull request as ready for review October 9, 2025 18:11
Copy link
Contributor

@mullermp mullermp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice so far!


module Aws
module S3
# @api private
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this private or public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know - I was super torn whether I should make this private or not. It is definitely an inner detail but at the same time, it's an interface that customer will have to abide when implementing their own executor. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's private since thread count option is passed through. However if we haven't already, make clear how to provide a custom one.

errors = []

if (callback = options[:progress_callback])
progress = MultipartProgress.new(pending, callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify the usages of these I think by always making a multipart progress object and short circuiting its callback if it doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the revision makes it better

# * track transfer progress by using progress listener
#
class TransferManager
# @example Using default executor (automatic creation and shutdown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider using at_exit to handle shutdown automatically for all cases, rather than for only the default and having a customer managed executor. We can just require it to implement a shutdown method and we always call it. https://docs.ruby-lang.org/en/3.4/Kernel.html#method-i-at_exit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a pretty neat. I'm still a bit worried about custom executors though, especially if customers are reusing that executor elsewhere in their app and have their own at_exit hooks (not sure if this is a valid case). We might run into exit ordering issues that we can't control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. If they are using a custom executor then we would only shut it down when the program exits. If they shut it down before us in the program or at their own exit hook, our shutdown call happens but becomes no op. I do think you should investigate this option more.

executor = @executor || DefaultExecutor.new(max_threads: download_opts.delete(:thread_count))
downloader = FileDownloader.new(client: @client, executor: executor)
downloader.download(destination, options.merge(bucket: bucket, key: key))
executor.shutdown unless @options[:executor]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just inspect @executor here and not store all of the options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of how I know whether if @executor is custom or not - so that I don't shut down custom executors.

expect(block).to receive(:call).with('hello')

subject.post('hello') { |arg| block.call(arg) }
sleep 0.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer tests without actual sleeping somehow - these make tests slower.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. I shorten the timing so maybe this will help.
Alternatively, I was thinking of using Thread.pass but I'm bit worried about the global effect of it:

describe '#post' do
  it 'executes a block with arguments' do
    result = nil
    subject.post('hello') { |a| result = a }

    Thread.pass until result
    expect(result).to eq('hello')
  end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect these kinds of tests will be flakey. I think you should find a way to do it with expectations rather than blocking with sleep.

@jterapin jterapin requested a review from mullermp October 10, 2025 19:17
# @see Client#head_object
def download_file(destination, options = {})
downloader = FileDownloader.new(client: client)
download_options = options.dup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an optimization, rather than dup, you can just merge bucket and key which creates new hash then you can delete off keys.

@params = options
validate!
validate_destination!(destination)
opts = build_download_opts(destination, options.dup)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we care about dup here either because we should be given a new hash already at this point.

ensure
# Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111
write_pipe.close
upload_thread = Thread.new do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if I set thread count to 10 but 11 threads are used, that's maybe undesirable. Using post maybe makes sense so that it's at most 10 for the entire operation but that's up to you.

# * track transfer progress by using progress listener
#
class TransferManager
# @example Using default executor (automatic creation and shutdown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. If they are using a custom executor then we would only shut it down when the program exits. If they shut it down before us in the program or at their own exit hook, our shutdown call happens but becomes no op. I do think you should investigate this option more.

# @see Client#head_object
def download_file(destination, bucket:, key:, **options)
downloader = FileDownloader.new(client: @client)
download_opts = options.dup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto about dup comments (applies to other places)

expect(block).to receive(:call).with('hello')

subject.post('hello') { |arg| block.call(arg) }
sleep 0.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect these kinds of tests will be flakey. I think you should find a way to do it with expectations rather than blocking with sleep.

end

it 'kills threads after timeout' do
result = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you have asserted result is done at some point?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants