diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 878970eb8056c..c6c1868d5bbc1 100644 --- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -48,6 +48,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Collections; @@ -241,7 +242,7 @@ protected String requestUniqueId(final HttpExchange exchange) { private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler { private AzureHTTPStatsCollectorHandler(HttpHandler delegate) { - super(delegate); + super(delegate, Arrays.stream(AzureBlobStore.Operation.values()).map(AzureBlobStore.Operation::getKey).toArray(String[]::new)); } @Override diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 0597ab303004e..01466467a2bcd 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -364,7 +364,7 @@ protected boolean canFailRequest(final HttpExchange exchange) { private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler { GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) { - super(delegate); + super(delegate, Arrays.stream(StorageOperation.values()).map(StorageOperation::key).toArray(String[]::new)); } @Override diff --git a/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index 4cebedebfba07..04a743b90efa6 100644 --- a/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -228,4 +228,56 @@ public void testReadFromPositionLargerThanBlobLength() { e -> asInstanceOf(AmazonS3Exception.class, e.getCause()).getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus() ); } + + public void testCopy() { + final var sourceBlobName = randomIdentifier(); + final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000)); + final var destinationBlobName = randomIdentifier(); + + final var repository = getRepository(); + + final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> { + sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true); + + final var destinationBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("target")); + destinationBlobContainer.copyBlob( + randomPurpose(), + sourceBlobContainer, + sourceBlobName, + destinationBlobName, + blobBytes.length() + ); + + return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes(); + }); + + assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes); + } + + public void testMultipartCopy() { + final var sourceBlobName = randomIdentifier(); + // executeMultipart requires a minimum part size of 5 MiB + final var blobBytes = randomBytesReference(randomIntBetween(5 * 1024 * 1024, 10 * 1024 * 1024)); + final var destinationBlobName = randomIdentifier(); + + final var repository = getRepository(); + + final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> { + sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true); + + final S3BlobContainer destinationBlobContainer = (S3BlobContainer) repository.blobStore() + .blobContainer(repository.basePath().add("target")); + destinationBlobContainer.executeMultipartCopy( + randomPurpose(), + (S3BlobContainer) sourceBlobContainer, + sourceBlobName, + destinationBlobName, + blobBytes.length() + ); + + return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes(); + }); + + assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes); + } } diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 51064631cb84f..a30cf9086b96c 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -71,6 +71,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -621,6 +622,12 @@ long getLargeBlobThresholdInBytes() { return ByteSizeUnit.MB.toBytes(1L); } + @Override + long getMaxCopySizeBeforeMultipart() { + // on my laptop 10K exercises this better but larger values should be fine for nightlies + return ByteSizeUnit.MB.toBytes(1L); + } + @Override void ensureMultiPartUploadSize(long blobSize) {} }; @@ -688,7 +695,7 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler { private final Map metricsCount = ConcurrentCollections.newConcurrentMap(); S3StatsCollectorHttpHandler(final HttpHandler delegate) { - super(delegate); + super(delegate, Arrays.stream(S3BlobStore.Operation.values()).map(S3BlobStore.Operation::getKey).toArray(String[]::new)); } private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) { @@ -736,9 +743,17 @@ public void maybeTrack(HttpExchange exchange) { k -> new AtomicLong() ).incrementAndGet(); } else if (request.isPutObjectRequest()) { - trackRequest("PutObject"); - metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong()) - .incrementAndGet(); + if (exchange.getRequestHeaders().containsKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_COPY_SOURCE)) { + trackRequest("CopyObject"); + metricsCount.computeIfAbsent( + new S3BlobStore.StatsKey(S3BlobStore.Operation.COPY_OBJECT, purpose), + k -> new AtomicLong() + ).incrementAndGet(); + } else { + trackRequest("PutObject"); + metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong()) + .incrementAndGet(); + } } else if (request.isMultiObjectDeleteRequest()) { trackRequest("DeleteObjects"); metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong()) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 6963e329ddc86..326509f998903 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -14,6 +14,8 @@ import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.CopyPartRequest; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; @@ -63,12 +65,14 @@ import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; import org.elasticsearch.repositories.s3.S3BlobStore.Operation; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.NoSuchFileException; import java.time.Instant; import java.util.ArrayList; import java.util.Date; @@ -300,6 +304,11 @@ long getLargeBlobThresholdInBytes() { return blobStore.bufferSizeInBytes(); } + // package private for testing + long getMaxCopySizeBeforeMultipart() { + return blobStore.maxCopySizeBeforeMultipart(); + } + @Override public void writeBlobAtomic( OperationPurpose purpose, @@ -317,6 +326,67 @@ public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesRefe writeBlob(purpose, blobName, bytes, failIfAlreadyExists); } + /** + * Perform server-side copy of a blob from a source container + *

+ * Server-side copy can be done for any size object, but if the object is larger than 5 GB then + * it must be done through a series of part copy operations rather than a single blob copy. + * See CopyObject. + * Note that this operation will overwrite the destination if it already exists. + * @param purpose The purpose of the operation + * @param sourceBlobContainer The blob container to copy the blob into + * @param sourceBlobName The name of the blob to copy from + * @param blobName The name of the blob to copy to + * @param blobSize The size of the source blob in bytes (needed because some object stores use different implementations + * for very large blobs) + * @throws IOException If the operation fails on the server side + */ + @Override + public void copyBlob( + final OperationPurpose purpose, + final BlobContainer sourceBlobContainer, + final String sourceBlobName, + final String blobName, + final long blobSize + ) throws IOException { + assert BlobContainer.assertPurposeConsistency(purpose, sourceBlobName); + assert BlobContainer.assertPurposeConsistency(purpose, blobName); + if (sourceBlobContainer instanceof S3BlobContainer == false) { + throw new IllegalArgumentException("source blob container must be a S3BlobContainer"); + } + + final var s3SourceBlobContainer = (S3BlobContainer) sourceBlobContainer; + + try { + if (blobSize > getMaxCopySizeBeforeMultipart()) { + executeMultipartCopy(purpose, s3SourceBlobContainer, sourceBlobName, blobName, blobSize); + } else { + // metadata is inherited from source, but not canned ACL or storage class + final var blobKey = buildKey(blobName); + final CopyObjectRequest copyRequest = new CopyObjectRequest( + s3SourceBlobContainer.blobStore.bucket(), + s3SourceBlobContainer.buildKey(sourceBlobName), + blobStore.bucket(), + blobKey + ).withCannedAccessControlList(blobStore.getCannedACL()).withStorageClass(blobStore.getStorageClass()); + + S3BlobStore.configureRequestForMetrics(copyRequest, blobStore, Operation.COPY_OBJECT, purpose); + + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + SocketAccess.doPrivilegedVoid(() -> { clientReference.client().copyObject(copyRequest); }); + } + } + } catch (final AmazonClientException e) { + if (e instanceof AmazonS3Exception amazonS3Exception) { + if (amazonS3Exception.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) { + final var sourceKey = s3SourceBlobContainer.buildKey(sourceBlobName); + throw new NoSuchFileException("Copy source [" + sourceKey + "] not found: " + amazonS3Exception.getMessage()); + } + } + throw new IOException("Unable to copy object [" + blobName + "] from [" + sourceBlobContainer + "][" + sourceBlobName + "]", e); + } + } + @Override public DeleteResult delete(OperationPurpose purpose) throws IOException { final AtomicLong deletedBlobs = new AtomicLong(); @@ -475,23 +545,25 @@ void executeSingleUpload( } } - /** - * Uploads a blob using multipart upload requests. - */ - void executeMultipartUpload( - OperationPurpose purpose, + private interface PartOperation { + PartETag doPart(String uploadId, int partNum, long partSize, boolean lastPart); + } + + // for copy, blobName and s3BlobStore are the destination + private void executeMultipart( + final OperationPurpose purpose, final S3BlobStore s3BlobStore, final String blobName, - final InputStream input, - final long blobSize + final long partSize, + final long blobSize, + final PartOperation partOperation ) throws IOException { ensureMultiPartUploadSize(blobSize); - final long partSize = s3BlobStore.bufferSizeInBytes(); final Tuple multiparts = numberOfMultiparts(blobSize, partSize); if (multiparts.v1() > Integer.MAX_VALUE) { - throw new IllegalArgumentException("Too many multipart upload requests, maybe try a larger buffer size?"); + throw new IllegalArgumentException("Too many multipart upload requests, maybe try a larger part size?"); } final int nbParts = multiparts.v1().intValue(); @@ -510,7 +582,7 @@ void executeMultipartUpload( ); } if (Strings.isEmpty(uploadId.get())) { - throw new IOException("Failed to initialize multipart upload " + blobName); + throw new IOException("Failed to initialize multipart operation for " + blobName); } final List parts = new ArrayList<>(); @@ -518,28 +590,20 @@ void executeMultipartUpload( long bytesCount = 0; for (int i = 1; i <= nbParts; i++) { final boolean lastPart = i == nbParts; - final UploadPartRequest uploadRequest = createPartUploadRequest( - purpose, - input, - uploadId.get(), - i, - blobName, - lastPart ? lastPartSize : partSize, - lastPart - ); - bytesCount += uploadRequest.getPartSize(); - - try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) { - final UploadPartResult uploadResponse = SocketAccess.doPrivileged( - () -> clientReference.client().uploadPart(uploadRequest) - ); - parts.add(uploadResponse.getPartETag()); - } + final var curPartSize = lastPart ? lastPartSize : partSize; + final var partEtag = partOperation.doPart(uploadId.get(), i, curPartSize, lastPart); + bytesCount += curPartSize; + parts.add(partEtag); } if (bytesCount != blobSize) { throw new IOException( - "Failed to execute multipart upload for [" + blobName + "], expected " + blobSize + "bytes sent but got " + bytesCount + "Failed to execute multipart operation for [" + + blobName + + "], expected " + + blobSize + + "bytes sent but got " + + bytesCount ); } @@ -556,7 +620,7 @@ void executeMultipartUpload( success = true; } catch (final AmazonClientException e) { - throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e); + throw new IOException("Unable to upload or copy object [" + blobName + "] using multipart upload", e); } finally { if ((success == false) && Strings.hasLength(uploadId.get())) { abortMultiPartUpload(purpose, uploadId.get(), blobName); @@ -564,6 +628,81 @@ void executeMultipartUpload( } } + /** + * Uploads a blob using multipart upload requests. + */ + void executeMultipartUpload( + OperationPurpose purpose, + final S3BlobStore s3BlobStore, + final String blobName, + final InputStream input, + final long blobSize + ) throws IOException { + executeMultipart( + purpose, + s3BlobStore, + blobName, + s3BlobStore.bufferSizeInBytes(), + blobSize, + (uploadId, partNum, partSize, lastPart) -> { + final UploadPartRequest uploadRequest = createPartUploadRequest( + purpose, + input, + uploadId, + partNum, + blobName, + partSize, + lastPart + ); + + try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) { + final UploadPartResult uploadResponse = SocketAccess.doPrivileged( + () -> clientReference.client().uploadPart(uploadRequest) + ); + return uploadResponse.getPartETag(); + } + } + ); + } + + /** + * Copies a blob using multipart + *

+ * This is required when the blob size is larger than MAX_FILE_SIZE. + * It must be called on the destination blob container. + *

+ * It uses MAX_FILE_SIZE as the copy part size, because that minimizes the number of requests needed. + * Smaller part sizes might improve throughput when downloading from multiple parts at once, but we have no measurements + * indicating this would be helpful so we optimize for request count. + */ + void executeMultipartCopy( + OperationPurpose purpose, + final S3BlobContainer sourceContainer, + final String sourceBlobName, + final String destinationBlobName, + final long blobSize + ) throws IOException { + final long copyPartSize = MAX_FILE_SIZE.getBytes(); + final var destinationKey = buildKey(destinationBlobName); + executeMultipart(purpose, blobStore, destinationKey, copyPartSize, blobSize, ((uploadId, partNum, partSize, lastPart) -> { + final long startOffset = (partNum - 1) * copyPartSize; + final var request = new CopyPartRequest().withSourceBucketName(sourceContainer.blobStore.bucket()) + .withSourceKey(sourceContainer.buildKey(sourceBlobName)) + .withDestinationBucketName(blobStore.bucket()) + .withDestinationKey(destinationKey) + .withUploadId(uploadId) + .withPartNumber(partNum) + .withFirstByte(startOffset) + .withLastByte(startOffset + partSize - 1); + S3BlobStore.configureRequestForMetrics(request, blobStore, Operation.COPY_MULTIPART_OBJECT, purpose); + + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + final var result = SocketAccess.doPrivileged(() -> clientReference.client().copyPart(request)); + return result.getPartETag(); + } + })); + } + // non-static, package private for testing void ensureMultiPartUploadSize(final long blobSize) { if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) { diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index b96a7c009dc7b..129de029daf7a 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -59,6 +59,7 @@ class S3BlobStore implements BlobStore { + public static final String CUSTOM_QUERY_PARAMETER_COPY_SOURCE = "x-amz-copy-source"; public static final String CUSTOM_QUERY_PARAMETER_PURPOSE = "x-purpose"; /** @@ -79,6 +80,8 @@ class S3BlobStore implements BlobStore { private final ByteSizeValue bufferSize; + private final ByteSizeValue maxCopySizeBeforeMultipart; + private final boolean serverSideEncryption; private final CannedAccessControlList cannedACL; @@ -103,6 +106,7 @@ class S3BlobStore implements BlobStore { String bucket, boolean serverSideEncryption, ByteSizeValue bufferSize, + ByteSizeValue maxCopySizeBeforeMultipart, String cannedACL, String storageClass, RepositoryMetadata repositoryMetadata, @@ -116,6 +120,7 @@ class S3BlobStore implements BlobStore { this.bucket = bucket; this.serverSideEncryption = serverSideEncryption; this.bufferSize = bufferSize; + this.maxCopySizeBeforeMultipart = maxCopySizeBeforeMultipart; this.cannedACL = initCannedACL(cannedACL); this.storageClass = initStorageClass(storageClass); this.repositoryMetadata = repositoryMetadata; @@ -251,10 +256,10 @@ private boolean assertConsistencyBetweenHttpRequestAndOperation(Request reque case GET_OBJECT, LIST_OBJECTS -> { return request.getHttpMethod().name().equals("GET"); } - case PUT_OBJECT -> { + case PUT_OBJECT, COPY_OBJECT -> { return request.getHttpMethod().name().equals("PUT"); } - case PUT_MULTIPART_OBJECT -> { + case PUT_MULTIPART_OBJECT, COPY_MULTIPART_OBJECT -> { return request.getHttpMethod().name().equals("PUT") || request.getHttpMethod().name().equals("POST"); } case DELETE_OBJECTS -> { @@ -328,6 +333,10 @@ public long bufferSizeInBytes() { return bufferSize.getBytes(); } + public long maxCopySizeBeforeMultipart() { + return maxCopySizeBeforeMultipart.getBytes(); + } + public RepositoryMetadata getRepositoryMetadata() { return repositoryMetadata; } @@ -551,7 +560,9 @@ enum Operation { PUT_OBJECT("PutObject"), PUT_MULTIPART_OBJECT("PutMultipartObject"), DELETE_OBJECTS("DeleteObjects"), - ABORT_MULTIPART_OBJECT("AbortMultipartObject"); + ABORT_MULTIPART_OBJECT("AbortMultipartObject"), + COPY_OBJECT("CopyObject"), + COPY_MULTIPART_OBJECT("CopyMultipartObject"); private final String key; diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 48e5065f56cd8..0904f37e39743 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -131,6 +131,18 @@ class S3Repository extends MeteredBlobStoreRepository { MAX_PART_SIZE_USING_MULTIPART ); + /** + * Maximum size allowed for copy without multipart. + * Objects larger than this will be copied using multipart copy. S3 enforces a minimum multipart size of 5 MiB and a maximum + * non-multipart copy size of 5 GiB. The default is to use the maximum allowable size in order to minimize request count. + */ + static final Setting MAX_COPY_SIZE_BEFORE_MULTIPART = Setting.byteSizeSetting( + "max_copy_size_before_multipart", + MAX_FILE_SIZE, + MIN_PART_SIZE_USING_MULTIPART, + MAX_FILE_SIZE + ); + /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 5tb. */ @@ -241,6 +253,8 @@ class S3Repository extends MeteredBlobStoreRepository { private final ByteSizeValue chunkSize; + private final ByteSizeValue maxCopySizeBeforeMultipart; + private final boolean serverSideEncryption; private final String storageClass; @@ -308,6 +322,8 @@ class S3Repository extends MeteredBlobStoreRepository { ); } + this.maxCopySizeBeforeMultipart = MAX_COPY_SIZE_BEFORE_MULTIPART.get(metadata.settings()); + this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings()); this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); @@ -325,11 +341,13 @@ class S3Repository extends MeteredBlobStoreRepository { coolDown = COOLDOWN_PERIOD.get(metadata.settings()); logger.debug( - "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", + "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], " + + "max_copy_size_before_multipart [{}], cannedACL [{}], storageClass [{}]", bucket, chunkSize, serverSideEncryption, bufferSize, + maxCopySizeBeforeMultipart, cannedACL, storageClass ); @@ -454,6 +472,7 @@ protected S3BlobStore createBlobStore() { bucket, serverSideEncryption, bufferSize, + maxCopySizeBeforeMultipart, cannedACL, storageClass, metadata, diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 629015b40b5ff..d6a1c87911f6a 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -214,6 +214,7 @@ protected BlobContainer createBlobContainer( "bucket", S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY), bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize, + S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY), S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY), S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY), repositoryMetadata, diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java index 58bb11874fbe6..16ac4b29585f9 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java @@ -15,6 +15,10 @@ import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.CopyObjectResult; +import com.amazonaws.services.s3.model.CopyPartRequest; +import com.amazonaws.services.s3.model.CopyPartResult; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -159,12 +163,26 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() { } public void testExecuteMultipartUpload() throws IOException { + testExecuteMultipart(false); + } + + public void testExecuteMultipartCopy() throws IOException { + testExecuteMultipart(true); + } + + void testExecuteMultipart(boolean doCopy) throws IOException { final String bucketName = randomAlphaOfLengthBetween(1, 10); final String blobName = randomAlphaOfLengthBetween(1, 10); + final String sourceBucketName = randomAlphaOfLengthBetween(1, 10); + final String sourceBlobName = randomAlphaOfLengthBetween(1, 10); final BlobPath blobPath = BlobPath.EMPTY; if (randomBoolean()) { - IntStream.of(randomIntBetween(1, 5)).forEach(value -> BlobPath.EMPTY.add("path_" + value)); + IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); + } + final var sourceBlobPath = BlobPath.EMPTY; + if (randomBoolean()) { + IntStream.of(randomIntBetween(1, 5)).forEach(value -> sourceBlobPath.add("path_" + value)); } final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(1, 128)); @@ -174,6 +192,9 @@ public void testExecuteMultipartUpload() throws IOException { when(blobStore.bucket()).thenReturn(bucketName); when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + final S3BlobStore sourceBlobStore = mock(S3BlobStore.class); + when(sourceBlobStore.bucket()).thenReturn(sourceBucketName); + final boolean serverSideEncryption = randomBoolean(); when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); @@ -193,29 +214,45 @@ public void testExecuteMultipartUpload() throws IOException { when(client.initiateMultipartUpload(initArgCaptor.capture())).thenReturn(initResult); final ArgumentCaptor uploadArgCaptor = ArgumentCaptor.forClass(UploadPartRequest.class); + final var copyArgCaptor = ArgumentCaptor.forClass(CopyPartRequest.class); final List expectedEtags = new ArrayList<>(); - final long partSize = Math.min(bufferSize, blobSize); + final long partSize = Math.min(doCopy ? ByteSizeUnit.GB.toBytes(5) : bufferSize, blobSize); long totalBytes = 0; do { expectedEtags.add(randomAlphaOfLength(50)); totalBytes += partSize; } while (totalBytes < blobSize); - when(client.uploadPart(uploadArgCaptor.capture())).thenAnswer(invocationOnMock -> { - final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0]; - final UploadPartResult response = new UploadPartResult(); - response.setPartNumber(request.getPartNumber()); - response.setETag(expectedEtags.get(request.getPartNumber() - 1)); - return response; - }); + if (doCopy) { + when(client.copyPart(copyArgCaptor.capture())).thenAnswer(invocationOnMock -> { + final CopyPartRequest request = (CopyPartRequest) invocationOnMock.getArguments()[0]; + final CopyPartResult result = new CopyPartResult(); + result.setETag(expectedEtags.get(request.getPartNumber() - 1)); + return result; + }); + } else { + when(client.uploadPart(uploadArgCaptor.capture())).thenAnswer(invocationOnMock -> { + final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0]; + final UploadPartResult response = new UploadPartResult(); + response.setPartNumber(request.getPartNumber()); + response.setETag(expectedEtags.get(request.getPartNumber() - 1)); + return response; + }); + } final ArgumentCaptor compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult()); final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize); + final S3BlobContainer sourceContainer = new S3BlobContainer(sourceBlobPath, sourceBlobStore); + + if (doCopy) { + blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize); + } else { + blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize); + } final InitiateMultipartUploadRequest initRequest = initArgCaptor.getValue(); assertEquals(bucketName, initRequest.getBucketName()); @@ -226,26 +263,46 @@ public void testExecuteMultipartUpload() throws IOException { assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, initRequest.getObjectMetadata().getSSEAlgorithm()); } - final Tuple numberOfParts = S3BlobContainer.numberOfMultiparts(blobSize, bufferSize); - - final List uploadRequests = uploadArgCaptor.getAllValues(); - assertEquals(numberOfParts.v1().intValue(), uploadRequests.size()); - - for (int i = 0; i < uploadRequests.size(); i++) { - final UploadPartRequest uploadRequest = uploadRequests.get(i); - - assertEquals(bucketName, uploadRequest.getBucketName()); - assertEquals(blobPath.buildAsString() + blobName, uploadRequest.getKey()); - assertEquals(initResult.getUploadId(), uploadRequest.getUploadId()); - assertEquals(i + 1, uploadRequest.getPartNumber()); - assertEquals(inputStream, uploadRequest.getInputStream()); - - if (i == (uploadRequests.size() - 1)) { - assertTrue(uploadRequest.isLastPart()); - assertEquals(numberOfParts.v2().longValue(), uploadRequest.getPartSize()); - } else { - assertFalse(uploadRequest.isLastPart()); - assertEquals(bufferSize, uploadRequest.getPartSize()); + final Tuple numberOfParts = S3BlobContainer.numberOfMultiparts(blobSize, partSize); + + if (doCopy) { + final var copyRequests = copyArgCaptor.getAllValues(); + assertEquals(numberOfParts.v1().intValue(), copyRequests.size()); + + for (int i = 0; i < copyRequests.size(); i++) { + final var request = copyRequests.get(i); + final long startOffset = i * partSize; + final long endOffset = Math.min(startOffset + partSize - 1, blobSize - 1); + + assertEquals(sourceBucketName, request.getSourceBucketName()); + assertEquals(sourceBlobPath.buildAsString() + sourceBlobName, request.getSourceKey()); + assertEquals(bucketName, request.getDestinationBucketName()); + assertEquals(blobPath.buildAsString() + blobName, request.getDestinationKey()); + assertEquals(initResult.getUploadId(), request.getUploadId()); + assertEquals(i + 1, request.getPartNumber()); + assertEquals(Long.valueOf(startOffset), request.getFirstByte()); + assertEquals(Long.valueOf(endOffset), request.getLastByte()); + } + } else { + final List uploadRequests = uploadArgCaptor.getAllValues(); + assertEquals(numberOfParts.v1().intValue(), uploadRequests.size()); + + for (int i = 0; i < uploadRequests.size(); i++) { + final UploadPartRequest uploadRequest = uploadRequests.get(i); + + assertEquals(bucketName, uploadRequest.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, uploadRequest.getKey()); + assertEquals(initResult.getUploadId(), uploadRequest.getUploadId()); + assertEquals(i + 1, uploadRequest.getPartNumber()); + assertEquals(inputStream, uploadRequest.getInputStream()); + + if (i == (uploadRequests.size() - 1)) { + assertTrue(uploadRequest.isLastPart()); + assertEquals(numberOfParts.v2().longValue(), uploadRequest.getPartSize()); + } else { + assertFalse(uploadRequest.isLastPart()); + assertEquals(bufferSize, uploadRequest.getPartSize()); + } } } @@ -326,7 +383,7 @@ public void testExecuteMultipartUploadAborted() { blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize); }); - assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage()); + assertEquals("Unable to upload or copy object [" + blobName + "] using multipart upload", e.getMessage()); assertThat(e.getCause(), instanceOf(AmazonClientException.class)); assertEquals(exceptions.get(stage).getMessage(), e.getCause().getMessage()); @@ -358,6 +415,46 @@ public void testExecuteMultipartUploadAborted() { closeMockClient(blobStore); } + public void testCopy() throws Exception { + final var sourceBucketName = randomAlphaOfLengthBetween(1, 10); + final var sourceBlobName = randomAlphaOfLengthBetween(1, 10); + final var blobName = randomAlphaOfLengthBetween(1, 10); + + final StorageClass storageClass = randomFrom(StorageClass.values()); + final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null; + + final var blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(sourceBucketName); + when(blobStore.getStorageClass()).thenReturn(storageClass); + if (cannedAccessControlList != null) { + when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); + } + when(blobStore.maxCopySizeBeforeMultipart()).thenReturn(S3Repository.MIN_PART_SIZE_USING_MULTIPART.getBytes()); + + final var sourceBlobPath = BlobPath.EMPTY.add(randomAlphaOfLengthBetween(1, 10)); + final var sourceBlobContainer = new S3BlobContainer(sourceBlobPath, blobStore); + + final var destinationBlobPath = BlobPath.EMPTY.add(randomAlphaOfLengthBetween(1, 10)); + final var destinationBlobContainer = new S3BlobContainer(destinationBlobPath, blobStore); + + final var client = configureMockClient(blobStore); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(CopyObjectRequest.class); + when(client.copyObject(captor.capture())).thenReturn(new CopyObjectResult()); + + destinationBlobContainer.copyBlob(randomPurpose(), sourceBlobContainer, sourceBlobName, blobName, randomLongBetween(1, 10_000)); + + final CopyObjectRequest request = captor.getValue(); + assertEquals(sourceBucketName, request.getSourceBucketName()); + assertEquals(sourceBlobPath.buildAsString() + sourceBlobName, request.getSourceKey()); + assertEquals(sourceBucketName, request.getDestinationBucketName()); + assertEquals(destinationBlobPath.buildAsString() + blobName, request.getDestinationKey()); + assertEquals(storageClass.toString(), request.getStorageClass()); + assertEquals(cannedAccessControlList, request.getCannedAccessControlList()); + + closeMockClient(blobStore); + } + private static AmazonS3 configureMockClient(S3BlobStore blobStore) { final AmazonS3 client = mock(AmazonS3.class); try (AmazonS3Reference clientReference = new AmazonS3Reference(client)) { diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 00943d04275dd..2ad225417ce80 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -214,6 +214,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00); public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_046_0_00); public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0); + public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 8f6ee42339e6d..a8ca895480779 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -177,6 +177,32 @@ default void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesRef writeBlobAtomic(purpose, blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); } + /** + * Copy a blob into this container from a source blob container and name. + * If copy is unavailable then throws UnsupportedOperationException. + * It may be unavailable either because the blob container has no copy implementation + * or because the target blob container is not on the same store as the source. + * If the destination blob already exists, this operation will overwrite it. + * + * @param purpose The purpose of the operation + * @param sourceBlobContainer The blob container to copy the blob into + * @param sourceBlobName The name of the blob to copy from + * @param blobName The name of the blob to copy to + * @param blobSize The size of the source blob in bytes (needed because some object stores use different implementations + * for very large blobs) + * @throws NoSuchFileException If the source blob does not exist + * @throws IOException If the operation generates an IO error + */ + default void copyBlob( + OperationPurpose purpose, + BlobContainer sourceBlobContainer, + String sourceBlobName, + String blobName, + long blobSize + ) throws IOException { + throw new UnsupportedOperationException("this blob container does not support copy"); + } + /** * Deletes this container and all its contents from the repository. * diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 7d40008231292..b6b33f96f3c5c 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -349,6 +349,29 @@ public void writeBlobAtomic(OperationPurpose purpose, final String blobName, Byt } } + @Override + public void copyBlob(OperationPurpose purpose, BlobContainer sourceBlobContainer, String sourceBlobName, String blobName, long blobSize) + throws IOException { + if (sourceBlobContainer instanceof FsBlobContainer == false) { + throw new IllegalArgumentException("source blob container must be a FsBlobContainer"); + } + final FsBlobContainer sourceContainer = (FsBlobContainer) sourceBlobContainer; + final Path sourceBlobPath = sourceContainer.path.resolve(sourceBlobName); + final String tempBlob = tempBlobName(blobName); + final Path tempBlobPath = path.resolve(tempBlob); + Files.copy(sourceBlobPath, tempBlobPath, StandardCopyOption.REPLACE_EXISTING); + try { + moveBlobAtomic(purpose, tempBlob, blobName, false); + } catch (IOException ex) { + try { + deleteBlobsIgnoringIfNotExists(purpose, Iterators.single(tempBlob)); + } catch (IOException e) { + ex.addSuppressed(e); + } + throw ex; + } + } + private static void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException { try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { bytes.writeTo(outputStream); diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java index 6cb1c00dab0e9..3fff220e521ef 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java @@ -377,6 +377,31 @@ private static void checkAtomicWrite() throws IOException { } } + public void testCopy() throws Exception { + // without this, on CI the test sometimes fails with + // java.nio.file.ProviderMismatchException: mismatch, expected: class org.elasticsearch.common.blobstore.fs.FsBlobContainerTests$1, + // got: class org.elasticsearch.common.blobstore.fs.FsBlobContainerTests$MockFileSystemProvider + // and I haven't figured out why yet. + restoreFileSystem(); + final var path = PathUtils.get(createTempDir().toString()); + final var store = new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false); + final var sourcePath = BlobPath.EMPTY.add("source"); + final var sourceContainer = store.blobContainer(sourcePath); + final var destinationPath = BlobPath.EMPTY.add("destination"); + final var destinationContainer = store.blobContainer(destinationPath); + + final var sourceBlobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT); + final var blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT); + final var contents = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))); + sourceContainer.writeBlob(randomPurpose(), sourceBlobName, contents, true); + destinationContainer.copyBlob(randomPurpose(), sourceContainer, sourceBlobName, blobName, contents.length()); + + var sourceContents = Streams.readFully(sourceContainer.readBlob(randomPurpose(), sourceBlobName)); + var targetContents = Streams.readFully(destinationContainer.readBlob(randomPurpose(), blobName)); + assertEquals(sourceContents, targetContents); + assertEquals(contents, targetContents); + } + static class MockFileSystemProvider extends FilterFileSystemProvider { final Consumer onRead; diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 367d75ceca56f..c460662eb67c9 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -49,6 +49,7 @@ import javax.xml.parsers.DocumentBuilderFactory; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.elasticsearch.test.fixture.HttpHeaderParser.parseRangeHeader; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.w3c.dom.Node.ELEMENT_NODE; @@ -155,10 +156,34 @@ public void handle(final HttpExchange exchange) throws IOException { if (upload == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); } else { - final Tuple blob = parseRequestBody(exchange); - upload.addPart(blob.v1(), blob.v2()); - exchange.getResponseHeaders().add("ETag", blob.v1()); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + // CopyPart is UploadPart with an x-amz-copy-source header + final var sourceBlobName = exchange.getRequestHeaders().get("X-amz-copy-source"); + if (sourceBlobName != null) { + var sourceBlob = blobs.get(sourceBlobName.getFirst()); + if (sourceBlob == null) { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } else { + var range = parsePartRange(exchange); + int start = Math.toIntExact(range.start()); + int len = Math.toIntExact(range.end() - range.start() + 1); + var part = sourceBlob.slice(start, len); + var etag = UUIDs.randomBase64UUID(); + upload.addPart(etag, part); + byte[] response = (""" + + + %s + """.formatted(etag)).getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + } + } else { + final Tuple blob = parseRequestBody(exchange); + upload.addPart(blob.v1(), blob.v2()); + exchange.getResponseHeaders().add("ETag", blob.v1()); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + } } } else if (request.isCompleteMultipartUploadRequest()) { @@ -201,10 +226,28 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders((upload == null ? RestStatus.NOT_FOUND : RestStatus.NO_CONTENT).getStatus(), -1); } else if (request.isPutObjectRequest()) { - final Tuple blob = parseRequestBody(exchange); - blobs.put(request.path(), blob.v2()); - exchange.getResponseHeaders().add("ETag", blob.v1()); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + // a copy request is a put request with a copy source header + final var sourceBlobName = exchange.getRequestHeaders().get("X-amz-copy-source"); + if (sourceBlobName != null) { + var sourceBlob = blobs.get(sourceBlobName.getFirst()); + if (sourceBlob == null) { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } else { + blobs.put(request.path(), sourceBlob); + + byte[] response = (""" + + """).getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + } + } else { + final Tuple blob = parseRequestBody(exchange); + blobs.put(request.path(), blob.v2()); + exchange.getResponseHeaders().add("ETag", blob.v1()); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + } } else if (request.isListObjectsRequest()) { final StringBuilder list = new StringBuilder(); @@ -268,7 +311,7 @@ public void handle(final HttpExchange exchange) throws IOException { // requests with a header value like "Range: bytes=start-end" where both {@code start} and {@code end} are always defined // (sometimes to very high value for {@code end}). It would be too tedious to fully support the RFC so S3HttpHandler only // supports when both {@code start} and {@code end} are defined to match the SDK behavior. - final HttpHeaderParser.Range range = HttpHeaderParser.parseRangeHeader(rangeHeader); + final HttpHeaderParser.Range range = parseRangeHeader(rangeHeader); if (range == null) { throw new AssertionError("Bytes range does not match expected pattern: " + rangeHeader); } @@ -467,6 +510,17 @@ static List extractPartEtags(BytesReference completeMultipartUploadBody) } } + private static HttpHeaderParser.Range parsePartRange(final HttpExchange exchange) { + final var sourceRangeHeaders = exchange.getRequestHeaders().get("X-amz-copy-source-range"); + if (sourceRangeHeaders == null) { + throw new IllegalStateException("missing x-amz-copy-source-range header"); + } + if (sourceRangeHeaders.size() != 1) { + throw new IllegalStateException("expected 1 x-amz-copy-source-range header, found " + sourceRangeHeaders.size()); + } + return parseRangeHeader(sourceRangeHeaders.getFirst()); + } + MultipartUpload getUpload(String uploadId) { return uploads.get(uploadId); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index c982f36e5ccb3..8870c79218780 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -137,17 +137,27 @@ public void testReadNonExistingPath() throws IOException { } } - public void testWriteRead() throws IOException { + public void testWriteMaybeCopyRead() throws IOException { try (BlobStore store = newBlobStore()) { final BlobContainer container = store.blobContainer(BlobPath.EMPTY); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - writeBlob(container, "foobar", new BytesArray(data), randomBoolean()); + final String blobName = randomAlphaOfLengthBetween(8, 12); + String readBlobName = blobName; + writeBlob(container, blobName, new BytesArray(data), randomBoolean()); if (randomBoolean()) { // override file, to check if we get latest contents data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - writeBlob(container, "foobar", new BytesArray(data), false); + writeBlob(container, blobName, new BytesArray(data), false); } - try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) { + if (randomBoolean()) { + // server-side copy if supported + try { + final var destinationBlobName = blobName + "_copy"; + container.copyBlob(randomPurpose(), container, blobName, destinationBlobName, data.length); + readBlobName = destinationBlobName; + } catch (UnsupportedOperationException ignored) {} + } + try (InputStream stream = container.readBlob(randomPurpose(), readBlobName)) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())]; diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index d289533e840a8..49d26b08dde62 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -227,10 +227,8 @@ public void testRequestStats() throws Exception { } }).filter(Objects::nonNull).map(Repository::stats).reduce(RepositoryStats::merge).get(); - // Since no abort request is made, filter it out from the stats (also ensure it is 0) before comparing to the mock counts Map sdkRequestCounts = repositoryStats.actionStats.entrySet() .stream() - .filter(entry -> false == ("AbortMultipartObject".equals(entry.getKey()) && entry.getValue().requests() == 0L)) .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().requests())); final Map mockCalls = getMockRequestCounts(); @@ -355,8 +353,11 @@ public abstract static class HttpStatsCollectorHandler implements DelegatingHttp private final Map operationCount = new HashMap<>(); - public HttpStatsCollectorHandler(HttpHandler delegate) { + public HttpStatsCollectorHandler(HttpHandler delegate, String[] operations) { this.delegate = delegate; + for (String operation : operations) { + operationCount.put(operation, 0L); + } } @Override @@ -369,7 +370,7 @@ synchronized Map getOperationsCount() { } protected synchronized void trackRequest(final String requestType) { - operationCount.put(requestType, operationCount.getOrDefault(requestType, 0L) + 1); + operationCount.put(requestType, operationCount.get(requestType) + 1); } @Override diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java index e5930e02375b7..b6bedb428b296 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java @@ -61,6 +61,7 @@ protected Settings repositorySettings() { .put("base_path", basePath) .put("delete_objects_max_size", between(1, 1000)) .put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones + .put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5)) .build(); } } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java index 6a638f53a6330..5594348088a8c 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java @@ -74,6 +74,7 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase { @@ -172,6 +173,31 @@ public byte[] onRead(byte[] actualContents, long position, long length) { assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } + public void testFailsOnCopyAfterWrite() { + final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo"); + request.maxBlobSize(ByteSizeValue.ofBytes(10L)); + request.abortWritePermitted(false); + + final AtomicBoolean failedCopy = new AtomicBoolean(); + blobStore.setDisruption(new Disruption() { + @Override + public void onCopy() throws IOException { + failedCopy.set(true); + throw new IOException("simulated"); + } + }); + + safeAwait((ActionListener l) -> analyseRepository(request, l.delegateResponse((ll, e) -> { + if (ExceptionsHelper.unwrapCause(e) instanceof RepositoryVerificationException repositoryVerificationException) { + assertAnalysisFailureMessage(repositoryVerificationException.getMessage()); + assertTrue("did not fail a copy operation, so why did the verification fail?", failedCopy.get()); + ll.onResponse(null); + } else { + ll.onFailure(e); + } + }))); + } + public void testFailsOnChecksumMismatch() { final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo"); request.maxBlobSize(ByteSizeValue.ofBytes(10L)); @@ -593,6 +619,8 @@ default byte[] onRead(byte[] actualContents, long position, long length) throws default void onWrite() throws IOException {} + default void onCopy() throws IOException {} + default Map onList(Map actualListing) throws IOException { return actualListing; } @@ -751,6 +779,25 @@ private void writeBlobAtomic(String blobName, InputStream inputStream, boolean f blobs.put(blobName, contents); } + @Override + public void copyBlob( + OperationPurpose purpose, + BlobContainer sourceBlobContainer, + String sourceBlobName, + String blobName, + long blobSize + ) throws IOException { + assertThat(sourceBlobContainer, instanceOf(DisruptableBlobContainer.class)); + assertPurpose(purpose); + final var source = (DisruptableBlobContainer) sourceBlobContainer; + final var sourceBlob = source.blobs.get(sourceBlobName); + if (sourceBlob == null) { + throw new FileNotFoundException(sourceBlobName + " not found"); + } + disruption.onCopy(); + blobs.put(blobName, sourceBlob); + } + @Override public DeleteResult delete(OperationPurpose purpose) throws IOException { assertPurpose(purpose); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java index c24a254d34ace..2af5c009d4ea2 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java @@ -70,6 +70,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; @@ -469,6 +470,24 @@ private void writeBlobAtomic(String blobName, InputStream inputStream, long blob } } + @Override + public void copyBlob( + OperationPurpose purpose, + BlobContainer sourceBlobContainer, + String sourceBlobName, + String blobName, + long blobSize + ) throws IOException { + assertPurpose(purpose); + assertThat(sourceBlobContainer, instanceOf(AssertingBlobContainer.class)); + final var source = (AssertingBlobContainer) sourceBlobContainer; + final var sourceBlob = source.blobs.get(sourceBlobName); + if (sourceBlob == null) { + throw new FileNotFoundException(sourceBlobName + " not found"); + } + blobs.put(blobName, sourceBlob); + } + @Override public DeleteResult delete(OperationPurpose purpose) { assertPurpose(purpose); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/BlobAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/BlobAnalyzeAction.java index 537c87e9c55fe..ab7840dab4ae2 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/BlobAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/BlobAnalyzeAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequest; @@ -47,7 +48,9 @@ import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -70,6 +73,11 @@ * version of the blob, but again must not yield partial data). Usually, however, we write once and only read after the write completes, and * in this case we insist that the read succeeds. * + * The writer may also attempt to copy the blob, either just before the write completes (which may fail with not found) + * or after (which should not fail). The writer may overwrite the source while the copy is in progress. If a copy is attempted, + * readers will read the copy instead of the original. As above, if the copy succeeds, then readers should see a complete copy. + * If the source is overwritten while the copy is in progress, readers may see either the original blob or the new one but no + * mixture or partial result. * *

  *
@@ -83,6 +91,12 @@
  *      | Write blob with random content     |                                        |
  *      |-----------------------------------→|                                        |
  *      |                                    |                                        |
+ *      | Copy blob during write (rarely)    |                                        |
+ *      |-----------------------------------→|                                        |
+ *      |                                    |                                        |
+ *      |                      Copy complete |                                        |
+ *      |←-----------------------------------|                                        |
+ *      |                                    |                                        |
  *      | Read range during write (rarely)   |                                        |
  *      |----------------------------------------------------------------------------→|
  *      |                                    |                                        |
@@ -106,6 +120,18 @@
  *      |-| Read phase |                     |                                        |
  *      | |------------|                     |                                        |
  *      |                                    |                                        |
+ *      | Copy blob (rarely)                 |                                        |
+ *      |-----------------------------------→|                                        |
+ *      |                                    |                                        |
+ *      | TODO: Overwrite source (rarely)    |                                        |
+ *      |-----------------------------------→|                                        |
+ *      |                                    |                                        |
+ *      |                 Overwrite complete |                                        |
+ *      |←-----------------------------------|                                        |
+ *      |                                    |                                        |
+ *      |                      Copy complete |                                        |
+ *      |←-----------------------------------|                                        |
+ *      |                                    |                                        |
  *      | Read range [a,b)                   |                                        |
  *      |----------------------------------------------------------------------------→|
  *      |                                    |                                        |
@@ -199,6 +225,9 @@ private static class BlobAnalysis {
         private final boolean checksumWholeBlob;
         private final long checksumStart;
         private final long checksumEnd;
+        // If a copy is requested, do exactly one so that the number of blobs created is controlled by RepositoryAnalyzeAction.
+        // Doing the copy in step 1 exercises copy before read completes. Step 2 exercises copy after read completes or the happy path.
+        private final boolean doEarlyCopy;
         private final List earlyReadNodes;
         private final List readNodes;
         private final GroupedActionListener readNodesListener;
@@ -230,6 +259,7 @@ private static class BlobAnalysis {
                 checksumStart = randomLongBetween(0L, request.targetLength);
                 checksumEnd = randomLongBetween(checksumStart + 1, request.targetLength + 1);
             }
+            doEarlyCopy = random.nextBoolean();
 
             final ArrayList nodes = new ArrayList<>(request.nodes); // copy for shuffling purposes
             if (request.readEarly) {
@@ -368,11 +398,37 @@ public StreamInput streamInput() throws IOException {
         }
 
         private void onLastReadForInitialWrite() {
+            var readBlobName = request.blobName;
+            if (request.copyBlobName != null && doEarlyCopy) {
+                try {
+                    blobContainer.copyBlob(
+                        OperationPurpose.REPOSITORY_ANALYSIS,
+                        blobContainer,
+                        request.blobName,
+                        request.copyBlobName,
+                        request.targetLength
+                    );
+                    readBlobName = request.copyBlobName;
+                } catch (UnsupportedOperationException uoe) {
+                    // not all repositories support copy
+                } catch (NoSuchFileException | FileNotFoundException ignored) {
+                    // assume this is due to copy starting before the source was finished
+                    logger.trace("copy FNF before write completed: {}", request.blobName);
+                } catch (IOException e) {
+                    if (request.getAbortWrite() == false) {
+                        throw new RepositoryVerificationException(
+                            request.getRepositoryName(),
+                            "failed to copy blob before write: [" + request.blobName + "]",
+                            e
+                        );
+                    }
+                }
+            }
             if (earlyReadNodes.isEmpty() == false) {
                 if (logger.isTraceEnabled()) {
                     logger.trace("sending read request to [{}] for [{}] before write complete", earlyReadNodes, request.getDescription());
                 }
-                readOnNodes(earlyReadNodes, true);
+                readOnNodes(earlyReadNodes, readBlobName, true);
             }
             if (request.getAbortWrite()) {
                 throw new BlobWriteAbortedException();
@@ -383,10 +439,36 @@ private void doReadAfterWrite() {
             if (logger.isTraceEnabled()) {
                 logger.trace("sending read request to [{}] for [{}] after write complete", readNodes, request.getDescription());
             }
-            readOnNodes(readNodes, false);
+            var readBlobName = request.blobName;
+            if (request.copyBlobName != null && doEarlyCopy == false && request.getAbortWrite() == false) {
+                try {
+                    blobContainer.copyBlob(
+                        OperationPurpose.REPOSITORY_ANALYSIS,
+                        blobContainer,
+                        request.blobName,
+                        request.copyBlobName,
+                        request.targetLength
+                    );
+                    readBlobName = request.copyBlobName;
+                } catch (UnsupportedOperationException uoe) {
+                    // not all repositories support copy
+                } catch (IOException e) {
+                    for (int i = 0; i < readNodes.size(); i++) {
+                        readNodesListener.onFailure(
+                            new RepositoryVerificationException(
+                                request.getRepositoryName(),
+                                "failed to copy blob after write: [" + request.blobName + "]",
+                                e
+                            )
+                        );
+                    }
+                    return;
+                }
+            }
+            readOnNodes(readNodes, readBlobName, false);
         }
 
-        private void readOnNodes(List nodes, boolean beforeWriteComplete) {
+        private void readOnNodes(List nodes, String blobName, boolean beforeWriteComplete) {
             for (DiscoveryNode node : nodes) {
                 if (task.isCancelled()) {
                     // record dummy response since we're already on the path to failure
@@ -396,7 +478,7 @@ private void readOnNodes(List nodes, boolean beforeWriteComplete)
                 } else {
                     // no need for extra synchronization after checking if we were cancelled a couple of lines ago -- we haven't notified
                     // the outer listener yet so any bans on the children are still in place
-                    final GetBlobChecksumAction.Request blobChecksumRequest = getBlobChecksumRequest();
+                    final GetBlobChecksumAction.Request blobChecksumRequest = getBlobChecksumRequest(blobName);
                     transportService.sendChildRequest(
                         node,
                         GetBlobChecksumAction.NAME,
@@ -432,11 +514,11 @@ public void onFailure(Exception e) {
             }
         }
 
-        private GetBlobChecksumAction.Request getBlobChecksumRequest() {
+        private GetBlobChecksumAction.Request getBlobChecksumRequest(String blobName) {
             return new GetBlobChecksumAction.Request(
                 request.getRepositoryName(),
                 request.getBlobPath(),
-                request.getBlobName(),
+                blobName,
                 checksumStart,
                 checksumWholeBlob ? 0L : checksumEnd
             );
@@ -650,6 +732,8 @@ static class Request extends ActionRequest {
         private final boolean readEarly;
         private final boolean writeAndOverwrite;
         private final boolean abortWrite;
+        @Nullable
+        private final String copyBlobName;
 
         Request(
             String repositoryName,
@@ -662,7 +746,8 @@ static class Request extends ActionRequest {
             int earlyReadNodeCount,
             boolean readEarly,
             boolean writeAndOverwrite,
-            boolean abortWrite
+            boolean abortWrite,
+            @Nullable String copyBlobName
         ) {
             assert 0 < targetLength;
             assert targetLength <= MAX_ATOMIC_WRITE_SIZE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
@@ -678,6 +763,7 @@ static class Request extends ActionRequest {
             this.readEarly = readEarly;
             this.writeAndOverwrite = writeAndOverwrite;
             this.abortWrite = abortWrite;
+            this.copyBlobName = copyBlobName;
         }
 
         Request(StreamInput in) throws IOException {
@@ -693,6 +779,11 @@ static class Request extends ActionRequest {
             readEarly = in.readBoolean();
             writeAndOverwrite = in.readBoolean();
             abortWrite = in.readBoolean();
+            if (in.getTransportVersion().onOrAfter(TransportVersions.REPO_ANALYSIS_COPY_BLOB)) {
+                copyBlobName = in.readOptionalString();
+            } else {
+                copyBlobName = null;
+            }
         }
 
         @Override
@@ -709,6 +800,14 @@ public void writeTo(StreamOutput out) throws IOException {
             out.writeBoolean(readEarly);
             out.writeBoolean(writeAndOverwrite);
             out.writeBoolean(abortWrite);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.REPO_ANALYSIS_COPY_BLOB)) {
+                out.writeOptionalString(copyBlobName);
+            } else if (copyBlobName != null) {
+                assert false : out.getTransportVersion();
+                throw new IllegalStateException(
+                    "cannot serialize " + this + "] using transport version [" + out.getTransportVersion() + "]"
+                );
+            }
         }
 
         @Override
@@ -734,6 +833,8 @@ public String getDescription() {
                 + writeAndOverwrite
                 + ", abortWrite="
                 + abortWrite
+                + ", copyBlobName="
+                + copyBlobName
                 + "]";
         }
 
diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java
index bc09b1f099970..f776d0a255004 100644
--- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java
+++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java
@@ -517,14 +517,27 @@ public void run() {
             final List blobSizes = getBlobSizes(request);
             Collections.shuffle(blobSizes, random);
 
-            for (int i = 0; i < request.getBlobCount(); i++) {
+            int blobCount = request.getBlobCount();
+            for (int i = 0; i < blobCount; i++) {
                 final long targetLength = blobSizes.get(i);
                 final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
                 final boolean abortWrite = smallBlob && request.isAbortWritePermitted() && rarely(random);
+                final boolean doCopy = minClusterTransportVersion.onOrAfter(TransportVersions.REPO_ANALYSIS_COPY_BLOB)
+                    && rarely(random)
+                    && i > 0;
+                final String blobName = "test-blob-" + i + "-" + UUIDs.randomBase64UUID(random);
+                String copyBlobName = null;
+                if (doCopy) {
+                    copyBlobName = blobName + "-copy";
+                    blobCount--;
+                    if (i >= blobCount) {
+                        break;
+                    }
+                }
                 final BlobAnalyzeAction.Request blobAnalyzeRequest = new BlobAnalyzeAction.Request(
                     request.getRepositoryName(),
                     blobPath,
-                    "test-blob-" + i + "-" + UUIDs.randomBase64UUID(random),
+                    blobName,
                     targetLength,
                     random.nextLong(),
                     nodes,
@@ -532,7 +545,8 @@ public void run() {
                     request.getEarlyReadNodeCount(),
                     smallBlob && rarely(random),
                     repository.supportURLRepo() && repository.hasAtomicOverwrites() && smallBlob && rarely(random) && abortWrite == false,
-                    abortWrite
+                    abortWrite,
+                    copyBlobName
                 );
                 final DiscoveryNode node = nodes.get(random.nextInt(nodes.size()));
                 queue.add(ref -> runBlobAnalysis(ref, blobAnalyzeRequest, node));