Skip to content

BlobContainer: add copyBlob method (8.x) #126558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -200,7 +201,8 @@ private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHa
private final Set<String> seenRequestIds = ConcurrentCollections.newConcurrentSet();

private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
super(delegate);
// minimal implementation of
super(delegate, Arrays.stream(AzureBlobStore.Operation.values()).map(AzureBlobStore.Operation::getKey).toArray(String[]::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,36 @@ public Map<String, Long> stats() {
return stats.toMap();
}

// partial backport of #113573
// visible for testing
enum Operation {
GET_BLOB("GetBlob"),
LIST_BLOBS("ListBlobs"),
GET_BLOB_PROPERTIES("GetBlobProperties"),
PUT_BLOB("PutBlob"),
PUT_BLOCK("PutBlock"),
PUT_BLOCK_LIST("PutBlockList");

private final String key;

public String getKey() {
return key;
}

Operation(String key) {
this.key = key;
}

public static Operation fromKey(String key) {
for (Operation operation : Operation.values()) {
if (operation.key.equals(key)) {
return operation;
}
}
throw new IllegalArgumentException("No matching key: " + key);
}
}

private static class Stats {

private final AtomicLong getOperations = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
public static final Pattern contentRangeMatcher = Pattern.compile("bytes \\d+-(\\d+)/(\\d+)");

GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) {
super(delegate);
super(delegate, Arrays.stream(StorageOperation.values()).map(StorageOperation::key).toArray(String[]::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.gcs;

// partial backport of #125452
public enum StorageOperation {
INSERT("InsertObject"),
GET("GetObject"),
LIST("ListObjects");

final String key;

public String key() {
return key;
}

StorageOperation(String key) {
this.key = key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,56 @@ public void testReadFromPositionLargerThanBlobLength() {
e -> asInstanceOf(AmazonS3Exception.class, e.getCause()).getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
);
}

public void testCopy() {
final var sourceBlobName = randomIdentifier();
final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000));
final var destinationBlobName = randomIdentifier();

final var repository = getRepository();

final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);

final var destinationBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("target"));
destinationBlobContainer.copyBlob(
randomPurpose(),
sourceBlobContainer,
sourceBlobName,
destinationBlobName,
blobBytes.length()
);

return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
});

assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
}

public void testMultipartCopy() {
final var sourceBlobName = randomIdentifier();
// executeMultipart requires a minimum part size of 5 MiB
final var blobBytes = randomBytesReference(randomIntBetween(5 * 1024 * 1024, 10 * 1024 * 1024));
final var destinationBlobName = randomIdentifier();

final var repository = getRepository();

final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);

final S3BlobContainer destinationBlobContainer = (S3BlobContainer) repository.blobStore()
.blobContainer(repository.basePath().add("target"));
destinationBlobContainer.executeMultipartCopy(
randomPurpose(),
(S3BlobContainer) sourceBlobContainer,
sourceBlobName,
destinationBlobName,
blobBytes.length()
);

return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
});

assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -600,6 +601,12 @@ long getLargeBlobThresholdInBytes() {
return ByteSizeUnit.MB.toBytes(1L);
}

@Override
long getMaxCopySizeBeforeMultipart() {
// on my laptop 10K exercises this better but larger values should be fine for nightlies
return ByteSizeUnit.MB.toBytes(1L);
}

@Override
void ensureMultiPartUploadSize(long blobSize) {}
};
Expand Down Expand Up @@ -667,7 +674,7 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
private final Map<S3BlobStore.StatsKey, AtomicLong> metricsCount = ConcurrentCollections.newConcurrentMap();

S3StatsCollectorHttpHandler(final HttpHandler delegate) {
super(delegate);
super(delegate, Arrays.stream(S3BlobStore.Operation.values()).map(S3BlobStore.Operation::getKey).toArray(String[]::new));
}

private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
Expand Down Expand Up @@ -715,9 +722,17 @@ public void maybeTrack(HttpExchange exchange) {
k -> new AtomicLong()
).incrementAndGet();
} else if (request.isPutObjectRequest()) {
trackRequest("PutObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
.incrementAndGet();
if (exchange.getRequestHeaders().containsKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_COPY_SOURCE)) {
trackRequest("CopyObject");
metricsCount.computeIfAbsent(
new S3BlobStore.StatsKey(S3BlobStore.Operation.COPY_OBJECT, purpose),
k -> new AtomicLong()
).incrementAndGet();
} else {
trackRequest("PutObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
.incrementAndGet();
}
} else if (request.isMultiObjectDeleteRequest()) {
trackRequest("DeleteObjects");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong())
Expand Down
Loading