Skip to content

feat: s3 transfer manager v2 #3079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3093732
feat: s3 transfer manager v2
yenfryherrerafeliz Feb 2, 2025
5950c31
chore: add tests cases and refactor
yenfryherrerafeliz Feb 6, 2025
1c82ab5
chore: add multipart download listener tests
yenfryherrerafeliz Feb 6, 2025
237fc7b
chore: refactor multipart downloaders and add tests
yenfryherrerafeliz Feb 6, 2025
c28c165
feat: add download directory and refactor code
yenfryherrerafeliz Feb 13, 2025
34321b2
chore: add upload and refactor code
yenfryherrerafeliz Feb 14, 2025
5289f7c
feat: add upload directory feature
yenfryherrerafeliz Feb 14, 2025
c6c0780
feat: multipart upload and some refactor
yenfryherrerafeliz Feb 17, 2025
034b50d
chore: short namespace
yenfryherrerafeliz Feb 17, 2025
bc15ac3
chore: refactor and address feedback
yenfryherrerafeliz Feb 22, 2025
e681d10
chore: fix test cases
yenfryherrerafeliz Feb 24, 2025
1f094cd
chore: remove unused implementation
yenfryherrerafeliz Feb 24, 2025
464b498
chore: remove invalid test
yenfryherrerafeliz Feb 24, 2025
09e493f
fix: add nullable type
yenfryherrerafeliz Feb 24, 2025
f10522b
chore: add more tests
yenfryherrerafeliz Feb 26, 2025
a55e1b3
chore: add upload unit tests and refactor
yenfryherrerafeliz Mar 13, 2025
b271897
chore: address naming feedback and test failures
yenfryherrerafeliz Mar 13, 2025
f4f1c88
chore: address minor styling issues
yenfryherrerafeliz Mar 13, 2025
d987aff
chore: add download tests
yenfryherrerafeliz Mar 17, 2025
6d000f1
chore: add integ tests
yenfryherrerafeliz May 19, 2025
27570d0
chore: add integ test
yenfryherrerafeliz May 20, 2025
1dde7fc
chore: address PR feedback
yenfryherrerafeliz May 26, 2025
060e0e1
chore: fix and refactor
yenfryherrerafeliz May 27, 2025
ee0fefb
chore: fix TransferListener import
yenfryherrerafeliz May 27, 2025
9f639f1
chore: add test case
yenfryherrerafeliz May 29, 2025
bdce369
chore: address PR feedback
yenfryherrerafeliz Jun 4, 2025
cd2133a
fix: prevent calling twice downloadFailed
yenfryherrerafeliz Jun 5, 2025
0426e11
chore: fix exception throwing
yenfryherrerafeliz Jun 5, 2025
a548ce2
feat: consider checksum mode from command
yenfryherrerafeliz Jun 17, 2025
5a25ad8
chore: tests and minor fixes
yenfryherrerafeliz Jun 18, 2025
64fc3f6
tests: Add integ test for abort
yenfryherrerafeliz Jun 18, 2025
908b4a0
chore: update integ test
yenfryherrerafeliz Jun 18, 2025
1e643e5
feat: update to use modeled inputs
yenfryherrerafeliz Jul 10, 2025
48a2822
chore: multipart download updates
yenfryherrerafeliz Jul 15, 2025
02b43f0
chore: s3 transfer manager updates
yenfryherrerafeliz Jul 17, 2025
153227b
chore: minor update
yenfryherrerafeliz Jul 17, 2025
31dbd19
chore: add empty lines at the end
yenfryherrerafeliz Jul 17, 2025
aeed758
chore: remove unused implementations
yenfryherrerafeliz Jul 17, 2025
26d2469
fix: object key should be normalized
yenfryherrerafeliz Jul 17, 2025
10928ca
fix: minor logic and test fix
yenfryherrerafeliz Jul 18, 2025
632ece9
fix: fix s3 delimiter test
yenfryherrerafeliz Jul 18, 2025
6efcc0a
fix: wrong data provider name used
yenfryherrerafeliz Jul 18, 2025
b0c5f75
chore: addressed some styling
yenfryherrerafeliz Jul 18, 2025
44f6ff4
chore: update argument name
yenfryherrerafeliz Jul 18, 2025
83ccd9b
chore: make config optional
yenfryherrerafeliz Jul 22, 2025
039a67b
chore: minor refactor and fix
yenfryherrerafeliz Jul 23, 2025
50715d1
chore: make parameter optional
yenfryherrerafeliz Jul 23, 2025
3034ad8
chore: make model classes final
yenfryherrerafeliz Jul 28, 2025
cfe4ab5
chore: make classes final and refactor tests
yenfryherrerafeliz Jul 29, 2025
f4c42e0
chore: fix and reformat integ test
yenfryherrerafeliz Jul 30, 2025
9fb03f1
chore: address some styling suggestions
yenfryherrerafeliz Jul 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/S3/S3Transfer/DownloadResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace Aws\S3\S3Transfer;

use Psr\Http\Message\StreamInterface;

class DownloadResponse
{
public function __construct(
private readonly StreamInterface $content,
private readonly array $metadata = []
) {}

public function getContent(): StreamInterface
{
return $this->content;
}

public function getMetadata(): array
{
return $this->metadata;
}
}
7 changes: 7 additions & 0 deletions src/S3/S3Transfer/Exceptions/S3TransferException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Aws\S3\S3Transfer\Exceptions;

class S3TransferException extends \RuntimeException
{
}
359 changes: 359 additions & 0 deletions src/S3/S3Transfer/MultipartDownloader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,359 @@
<?php

namespace Aws\S3\S3Transfer;

use Aws\CommandInterface;
use Aws\ResultInterface;
use Aws\S3\S3ClientInterface;
use Aws\S3\S3Transfer\Progress\TransferProgressSnapshot;
use GuzzleHttp\Promise\Coroutine;
use GuzzleHttp\Promise\Create;
use GuzzleHttp\Promise\PromiseInterface;
use GuzzleHttp\Promise\PromisorInterface;
use GuzzleHttp\Psr7\Utils;
use Psr\Http\Message\StreamInterface;

abstract class MultipartDownloader implements PromisorInterface
{
public const GET_OBJECT_COMMAND = "GetObject";
public const PART_GET_MULTIPART_DOWNLOADER = "partGet";
public const RANGE_GET_MULTIPART_DOWNLOADER = "rangeGet";

/** @var array */
protected array $requestArgs;

/** @var int */
protected int $currentPartNo;

/** @var int */
protected int $objectPartsCount;

/** @var int */
protected int $objectSizeInBytes;

/** @var string */
protected string $eTag;

/** @var StreamInterface */
private StreamInterface $stream;

/** @var TransferListenerNotifier | null */
private readonly ?TransferListenerNotifier $listenerNotifier;

/** Tracking Members */
private ?TransferProgressSnapshot $currentSnapshot;

/**
* @param S3ClientInterface $s3Client
* @param array $requestArgs
* @param array $config
* - minimum_part_size: The minimum part size for a multipart download
* using range get. This option MUST be set when using range get.
* @param int $currentPartNo
* @param int $objectPartsCount
* @param int $objectSizeInBytes
* @param string $eTag
* @param StreamInterface|null $stream
* @param TransferProgressSnapshot|null $currentSnapshot
* @param TransferListenerNotifier|null $listenerNotifier
*/
public function __construct(
protected readonly S3ClientInterface $s3Client,
array $requestArgs,
protected readonly array $config = [],
int $currentPartNo = 0,
int $objectPartsCount = 0,
int $objectSizeInBytes = 0,
string $eTag = "",
?StreamInterface $stream = null,
?TransferProgressSnapshot $currentSnapshot = null,
?TransferListenerNotifier $listenerNotifier = null,
) {
$this->requestArgs = $requestArgs;
$this->currentPartNo = $currentPartNo;
$this->objectPartsCount = $objectPartsCount;
$this->objectSizeInBytes = $objectSizeInBytes;
$this->eTag = $eTag;
if ($stream === null) {
$this->stream = Utils::streamFor(
fopen('php://temp', 'w+')
);
} else {
$this->stream = $stream;
}
$this->currentSnapshot = $currentSnapshot;
$this->listenerNotifier = $listenerNotifier;
}

/**
* @return int
*/
public function getCurrentPartNo(): int
{
return $this->currentPartNo;
}

/**
* @return int
*/
public function getObjectPartsCount(): int
{
return $this->objectPartsCount;
}

/**
* @return int
*/
public function getObjectSizeInBytes(): int
{
return $this->objectSizeInBytes;
}

/**
* @return TransferProgressSnapshot
*/
public function getCurrentSnapshot(): TransferProgressSnapshot
{
return $this->currentSnapshot;
}

/**
* Returns that resolves a multipart download operation,
* or to a rejection in case of any failures.
*
* @return PromiseInterface
*/
public function promise(): PromiseInterface
{
return Coroutine::of(function () {
$this->downloadInitiated($this->requestArgs);
try {
yield $this->s3Client->executeAsync($this->nextCommand())
->then(function (ResultInterface $result) {
// Calculate object size and parts count.
$this->computeObjectDimensions($result);
// Trigger first part completed
$this->partDownloadCompleted($result);
})->otherwise(function ($reason) {
$this->partDownloadFailed($reason);

throw $reason;
});
} catch (\Throwable $e) {
$this->downloadFailed($e);
// TODO: yield transfer exception modeled with a transfer failed response.
yield Create::rejectionFor($e);
}

while ($this->currentPartNo < $this->objectPartsCount) {
try {
yield $this->s3Client->executeAsync($this->nextCommand())
->then(function ($result) {
$this->partDownloadCompleted($result);

return $result;
})->otherwise(function ($reason) {
$this->partDownloadFailed($reason);

return $reason;
});
} catch (\Throwable $reason) {
$this->downloadFailed($reason);
// TODO: yield transfer exception modeled with a transfer failed response.
yield Create::rejectionFor($reason);
}

}

// Transfer completed
$this->downloadComplete();

// TODO: yield the stream wrapped in a modeled transfer success response.
yield Create::promiseFor(new DownloadResponse(
$this->stream,
[]
));
});
}

/**
* Returns the next command for fetching the next object part.
*
* @return CommandInterface
*/
abstract protected function nextCommand() : CommandInterface;

/**
* Compute the object dimensions, such as size and parts count.
*
* @param ResultInterface $result
*
* @return void
*/
abstract protected function computeObjectDimensions(ResultInterface $result): void;

/**
* Calculates the object size dynamically.
*
* @param $sizeSource
*
* @return int
*/
protected function computeObjectSize($sizeSource): int
{
if (is_int($sizeSource)) {
return (int) $sizeSource;
}

if (empty($sizeSource)) {
return 0;
}

// For extracting the object size from the ContentRange header value.
if (preg_match("/\/(\d+)$/", $sizeSource, $matches)) {
return $matches[1];
}

throw new \RuntimeException('Invalid source size format');
}

/**
* Main purpose of this method is to propagate
* the download-initiated event to listeners, but
* also it does some computation regarding internal states
* that need to be maintained.
*
* @param array $commandArgs
*
* @return void
*/
private function downloadInitiated(array $commandArgs): void
{
if ($this->currentSnapshot === null) {
$this->currentSnapshot = new TransferProgressSnapshot(
$commandArgs['Key'],
0,
$this->objectSizeInBytes
);
} else {
$this->currentSnapshot = new TransferProgressSnapshot(
$this->currentSnapshot->getIdentifier(),
$this->currentSnapshot->getTransferredBytes(),
$this->currentSnapshot->getTotalBytes(),
$this->currentSnapshot->getResponse()
);
}

$this->listenerNotifier?->transferInitiated([
'request_args' => $commandArgs,
'progress_snapshot' => $this->currentSnapshot,
]);
}

/**
* Propagates download-failed event to listeners.
*
* @param \Throwable $reason
*
* @return void
*/
private function downloadFailed(\Throwable $reason): void
{
$this->listenerNotifier?->transferFail([
'request_args' => $this->requestArgs,
'progress_snapshot' => $this->currentSnapshot,
'reason' => $reason,
]);
}

/**
* Propagates part-download-completed to listeners.
* It also does some computation in order to maintain internal states.
* In this specific method we move each part content into an accumulative
* stream, which is meant to hold the full object content once the download
* is completed.
*
* @param ResultInterface $result
*
* @return void
*/
private function partDownloadCompleted(
ResultInterface $result
): void
{
$partDownloadBytes = $result['ContentLength'];
if (isset($result['ETag'])) {
$this->eTag = $result['ETag'];
}
Utils::copyToStream($result['Body'], $this->stream);
$newSnapshot = new TransferProgressSnapshot(
$this->currentSnapshot->getIdentifier(),
$this->currentSnapshot->getTransferredBytes() + $partDownloadBytes,
$this->objectSizeInBytes,
$result->toArray()
);
$this->currentSnapshot = $newSnapshot;
$this->listenerNotifier?->bytesTransferred([
'request_args' => $this->requestArgs,
'progress_snapshot' => $this->currentSnapshot,
]);
}

/**
* Propagates part-download-failed event to listeners.
*
* @param \Throwable $reason
*
* @return void
*/
private function partDownloadFailed(
\Throwable $reason,
): void
{
$this->downloadFailed($reason);
}

/**
* Propagates object-download-completed event to listeners.
* It also resets the pointer of the stream to the first position,
* so that the stream is ready to be consumed once returned.
*
* @return void
*/
private function downloadComplete(): void
{
$this->stream->rewind();
$newSnapshot = new TransferProgressSnapshot(
$this->currentSnapshot->getIdentifier(),
$this->currentSnapshot->getTransferredBytes(),
$this->objectSizeInBytes,
$this->currentSnapshot->getResponse()
);
$this->currentSnapshot = $newSnapshot;
$this->listenerNotifier?->transferComplete([
'request_args' => $this->requestArgs,
'progress_snapshot' => $this->currentSnapshot,
]);
}

/**
* @param mixed $multipartDownloadType
*
* @return string
*/
public static function chooseDownloaderClassName(
string $multipartDownloadType
): string
{
return match ($multipartDownloadType) {
MultipartDownloader::PART_GET_MULTIPART_DOWNLOADER => 'Aws\S3\S3Transfer\PartGetMultipartDownloader',
MultipartDownloader::RANGE_GET_MULTIPART_DOWNLOADER => 'Aws\S3\S3Transfer\RangeGetMultipartDownloader',
default => throw new \InvalidArgumentException(
"The config value for `multipart_download_type` must be one of:\n"
. "\t* " . MultipartDownloader::PART_GET_MULTIPART_DOWNLOADER
."\n"
. "\t* " . MultipartDownloader::RANGE_GET_MULTIPART_DOWNLOADER
)
};
}
}
Loading