Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8c7ca45
Add executor support
jterapin Oct 7, 2025
c21969a
Add changelog entry
jterapin Oct 7, 2025
39ecf0a
Update TM with executor changes
jterapin Oct 7, 2025
a3f2b9f
Remove thread count support from MPU
jterapin Oct 7, 2025
3156f7c
Update Object usage of executor
jterapin Oct 7, 2025
84c9966
Add documentation/remove unused methods from DefaultExecutor
jterapin Oct 8, 2025
8e16a3b
Add Default Executor specs
jterapin Oct 8, 2025
db1cb62
Update TM docs and impl
jterapin Oct 8, 2025
f907c3b
Update streaming MPU to use executor
jterapin Oct 9, 2025
7cb940a
More MP Stream updates
jterapin Oct 9, 2025
4003536
Update specs
jterapin Oct 9, 2025
7dddda9
Update interfaces
jterapin Oct 9, 2025
481f198
Update specs
jterapin Oct 9, 2025
88bf44a
Update changelog
jterapin Oct 9, 2025
c1a25cd
Minor updates
jterapin Oct 9, 2025
7522a16
Fix failing specs
jterapin Oct 9, 2025
89cffe7
Merge branch 'version-3' into s3-executor-support
jterapin Oct 10, 2025
9eea233
Feedback - address sleep in specs
jterapin Oct 10, 2025
75b0d96
Feedback - update method name for cleanup_team_file
jterapin Oct 10, 2025
ad943ee
Feedback - wrap checksum callback
jterapin Oct 10, 2025
f1fc86a
Feedback - update method name in MPU
jterapin Oct 10, 2025
09eae68
Feedback - streamline handling of progress callbacks
jterapin Oct 10, 2025
e824de0
Feedback - streamline docs
jterapin Oct 10, 2025
c073349
Merge branch 'version-3' into s3-executor-support
jterapin Oct 13, 2025
cd91eb7
Feedback - streamline opts
jterapin Oct 13, 2025
abf78d6
Feedback - remove sleep from specs when possible
jterapin Oct 13, 2025
04a287f
Feedback - update to use 10 threads only
jterapin Oct 13, 2025
f7056a3
Merge branch 'version-3' into s3-executor-support
jterapin Oct 14, 2025
e921654
Using at_exit hooks for default executor
jterapin Oct 14, 2025
78af1fd
Refine documentation
jterapin Oct 14, 2025
e5c8b32
Revert "Using at_exit hooks for default executor"
jterapin Oct 14, 2025
163f1c4
Update docs
jterapin Oct 14, 2025
dce166e
Update changelog entry
jterapin Oct 14, 2025
4b1efdd
Revert "Feedback - update to use 10 threads only"
jterapin Oct 14, 2025
083d9a5
Update TM
jterapin Oct 14, 2025
b58b849
Add extra docs
jterapin Oct 14, 2025
6214de8
Feedback - update shutdown method with mutex
jterapin Oct 15, 2025
353da43
Update docs
jterapin Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gems/aws-sdk-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Unreleased Changes
------------------

* Feature - Add lightweight thread pool executor for multipart `download_file`, `upload_file` and `upload_stream`.

* Feature - Add custom executor support for `Aws::S3::TransferManager`.

1.199.1 (2025-09-25)
------------------

Expand Down
1 change: 1 addition & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module S3
autoload :Encryption, 'aws-sdk-s3/encryption'
autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2'
autoload :FilePart, 'aws-sdk-s3/file_part'
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
autoload :FileUploader, 'aws-sdk-s3/file_uploader'
autoload :FileDownloader, 'aws-sdk-s3/file_downloader'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'
Expand Down
42 changes: 23 additions & 19 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ def public_url(options = {})
# {Client#complete_multipart_upload},
# and {Client#upload_part} can be provided.
#
# @option options [Integer] :thread_count (10) The number of parallel
# multipart uploads
# @option options [Integer] :thread_count (10) The number of parallel multipart uploads.
# An additional thread is used internally for task coordination.
#
# @option options [Boolean] :tempfile (false) Normally read data is stored
# in memory when building the parts in order to complete the underlying
Expand All @@ -383,19 +383,18 @@ def public_url(options = {})
# @see Client#complete_multipart_upload
# @see Client#upload_part
def upload_stream(options = {}, &block)
uploading_options = options.dup
upload_opts = options.merge(bucket: bucket_name, key: key)
executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count))
uploader = MultipartStreamUploader.new(
client: client,
thread_count: uploading_options.delete(:thread_count),
tempfile: uploading_options.delete(:tempfile),
part_size: uploading_options.delete(:part_size)
executor: executor,
tempfile: upload_opts.delete(:tempfile),
part_size: upload_opts.delete(:part_size)
)
Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
uploader.upload(
uploading_options.merge(bucket: bucket_name, key: key),
&block
)
uploader.upload(upload_opts, &block)
end
executor.shutdown
true
end
deprecated(:upload_stream, use: 'Aws::S3::TransferManager#upload_stream', version: 'next major version')
Expand Down Expand Up @@ -458,12 +457,18 @@ def upload_stream(options = {}, &block)
# @see Client#complete_multipart_upload
# @see Client#upload_part
def upload_file(source, options = {})
uploading_options = options.dup
uploader = FileUploader.new(multipart_threshold: uploading_options.delete(:multipart_threshold), client: client)
upload_opts = options.merge(bucket: bucket_name, key: key)
executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count))
uploader = FileUploader.new(
client: client,
executor: executor,
multipart_threshold: upload_opts.delete(:multipart_threshold)
)
response = Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
uploader.upload(source, uploading_options.merge(bucket: bucket_name, key: key))
uploader.upload(source, upload_opts)
end
yield response if block_given?
executor.shutdown
true
end
deprecated(:upload_file, use: 'Aws::S3::TransferManager#upload_file', version: 'next major version')
Expand Down Expand Up @@ -512,10 +517,6 @@ def upload_file(source, options = {})
#
# @option options [Integer] :thread_count (10) Customize threads used in the multipart download.
#
# @option options [String] :version_id The object version id used to retrieve the object.
#
# @see https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning
#
# @option options [String] :checksum_mode ("ENABLED")
# When `"ENABLED"` and the object has a stored checksum, it will be used to validate the download and will
# raise an `Aws::Errors::ChecksumError` if checksum validation fails. You may provide a `on_checksum_validated`
Expand All @@ -539,10 +540,13 @@ def upload_file(source, options = {})
# @see Client#get_object
# @see Client#head_object
def download_file(destination, options = {})
downloader = FileDownloader.new(client: client)
download_opts = options.merge(bucket: bucket_name, key: key)
executor = DefaultExecutor.new(max_threads: download_opts.delete([:thread_count]))
downloader = FileDownloader.new(client: client, executor: executor)
Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
downloader.download(destination, options.merge(bucket: bucket_name, key: key))
downloader.download(destination, download_opts)
end
executor.shutdown
true
end
deprecated(:download_file, use: 'Aws::S3::TransferManager#download_file', version: 'next major version')
Expand Down
103 changes: 103 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

module Aws
module S3
# @api private
class DefaultExecutor
DEFAULT_MAX_THREADS = 10
RUNNING = :running
SHUTTING_DOWN = :shutting_down
SHUTDOWN = :shutdown

def initialize(options = {})
@max_threads = options[:max_threads] || DEFAULT_MAX_THREADS
@state = RUNNING
@queue = Queue.new
@pool = []
@mutex = Mutex.new
end

# Submits a task for execution.
# @param [Object] args Variable number of arguments to pass to the block
# @param [Proc] block The block to be executed
# @return [Boolean] Returns true if the task was submitted successfully
def post(*args, &block)
@mutex.synchronize do
raise 'Executor has been shutdown and is no longer accepting tasks' unless @state == RUNNING

@queue << [args, block]
ensure_worker_available
end
true
end

# Immediately terminates all worker threads and clears pending tasks.
# This is a forceful shutdown that doesn't wait for running tasks to complete.
#
# @return [Boolean] true when termination is complete
def kill
@mutex.synchronize do
@state = SHUTDOWN
@pool.each(&:kill)
@pool.clear
@queue.clear
end
true
end

# Gracefully shuts down the executor, optionally with a timeout.
# Stops accepting new tasks and waits for running tasks to complete.
#
# @param timeout [Numeric, nil] Maximum time in seconds to wait for shutdown.
# If nil, waits indefinitely. If timeout expires, remaining threads are killed.
# @return [Boolean] true when shutdown is complete
def shutdown(timeout = nil)
@mutex.synchronize do
return true if @state == SHUTDOWN

@state = SHUTTING_DOWN
@pool.size.times { @queue << :shutdown }
end

if timeout
deadline = Time.now + timeout
@pool.each do |thread|
remaining = deadline - Time.now
break if remaining <= 0

thread.join([remaining, 0].max)
end
@pool.select(&:alive?).each(&:kill)
else
@pool.each(&:join)
end

@mutex.synchronize do
@pool.clear
@state = SHUTDOWN
end
true
end

private

def ensure_worker_available
return unless @state == RUNNING

@pool.select!(&:alive?)
@pool << spawn_worker if @pool.size < @max_threads
end

def spawn_worker
Thread.new do
while (job = @queue.shift)
break if job == :shutdown

args, block = job
block.call(*args)
end
end
end
end
end
end
Loading