diff --git a/flink-filesystems/flink-gs-fs-hadoop/pom.xml b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
index e0d6eb5132545..29b23d0b1f6b5 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-gs-fs-hadoop/pom.xml
@@ -36,6 +36,7 @@ under the License.
2.29.1hadoop3-2.2.18
+ 0.128.71.59.1
@@ -149,6 +150,13 @@ under the License.
+
+ com.google.cloud
+ google-cloud-nio
+ ${fs.gs.cloud.nio.version}
+ test
+
+
diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
index a254d9946b6cb..09c95c1357b4d 100644
--- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
+++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.fs.gs.storage;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.fs.gs.utils.BlobUtils;
import org.apache.flink.util.Preconditions;
@@ -60,8 +61,7 @@ public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) {
LOGGER.trace("Creating writable blob for identifier {}", blobIdentifier);
Preconditions.checkNotNull(blobIdentifier);
- BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
- com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+ com.google.cloud.WriteChannel writeChannel = createWriteChannel(blobIdentifier);
return new WriteChannel(blobIdentifier, writeChannel);
}
@@ -75,12 +75,23 @@ public GSBlobStorage.WriteChannel writeBlob(
Preconditions.checkNotNull(blobIdentifier);
Preconditions.checkArgument(chunkSize.getBytes() > 0);
- BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
- com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+ com.google.cloud.WriteChannel writeChannel = createWriteChannel(blobIdentifier);
writeChannel.setChunkSize((int) chunkSize.getBytes());
return new WriteChannel(blobIdentifier, writeChannel);
}
+ /**
+ * Creates a write channel for the given blob identifier with appropriate preconditions.
+ *
+ * @param blobIdentifier The blob identifier to create the write channel for
+ * @return The write channel with appropriate write options
+ */
+ private com.google.cloud.WriteChannel createWriteChannel(GSBlobIdentifier blobIdentifier) {
+ BlobInfo existingBlob = storage.get(blobIdentifier.bucketName, blobIdentifier.objectName);
+ BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
+ return storage.writer(blobInfo, getBlobWriteOption(existingBlob));
+ }
+
@Override
public void createBlob(GSBlobIdentifier blobIdentifier) {
LOGGER.trace("Creating empty blob {}", blobIdentifier);
@@ -153,11 +164,74 @@ public void compose(
for (GSBlobIdentifier blobIdentifier : sourceBlobIdentifiers) {
builder.addSource(blobIdentifier.objectName);
}
+ BlobInfo existingTargetBlob =
+ storage.get(targetBlobIdentifier.bucketName, targetBlobIdentifier.objectName);
+ Storage.BlobTargetOption precondition = getBlobTargetOption(existingTargetBlob);
+ Storage.ComposeRequest request = builder.setTargetOptions(precondition).build();
- Storage.ComposeRequest request = builder.build();
storage.compose(request);
}
+ /**
+ * Generic helper to create blob options with appropriate preconditions. This ensures that the
+ * operations become idempotent or atomic, allowing the GCS client to safely retry the 503
+ * errors.
+ *
+ *
For a target object that does not yet exist, sets the DoesNotExist precondition. This will
+ * cause the request to fail if the object is created before the request runs.
+ *
+ *
If the destination already exists, sets a generation-match precondition. This will cause
+ * the request to fail if the existing object's generation changes before the request runs.
+ *
+ * @param blobInfo The blob info to create the option for, or null if the blob does not exist
+ * @param doesNotExistSupplier Supplier for the doesNotExist option
+ * @param generationMatchFunction Function to create generationMatch option from generation
+ * number
+ * @param The type of the blob option (BlobTargetOption or BlobWriteOption)
+ * @return The appropriate option for the blob
+ */
+ @VisibleForTesting
+ T getBlobOption(
+ BlobInfo blobInfo,
+ java.util.function.Supplier doesNotExistSupplier,
+ java.util.function.Function generationMatchFunction) {
+ if (blobInfo == null) {
+ return doesNotExistSupplier.get();
+ } else {
+ return generationMatchFunction.apply(blobInfo.getGeneration());
+ }
+ }
+
+ /**
+ * Creates the appropriate BlobTargetOption for the given blob info.
+ *
+ * @param blobInfo The blob info to create the target option for, or null if the blob does not
+ * exist
+ * @return The appropriate target option for the blob
+ */
+ @VisibleForTesting
+ Storage.BlobTargetOption getBlobTargetOption(BlobInfo blobInfo) {
+ return getBlobOption(
+ blobInfo,
+ Storage.BlobTargetOption::doesNotExist,
+ Storage.BlobTargetOption::generationMatch);
+ }
+
+ /**
+ * Creates the appropriate BlobWriteOption for the given blob info.
+ *
+ * @param blobInfo The blob info to create the write option for, or null if the blob does not
+ * exist
+ * @return The appropriate write option for the blob
+ */
+ @VisibleForTesting
+ Storage.BlobWriteOption getBlobWriteOption(BlobInfo blobInfo) {
+ return getBlobOption(
+ blobInfo,
+ Storage.BlobWriteOption::doesNotExist,
+ Storage.BlobWriteOption::generationMatch);
+ }
+
@Override
public List delete(Iterable blobIdentifiers) {
LOGGER.trace("Deleting blobs {}", blobIdentifiers);
diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
new file mode 100644
index 0000000000000..264d6e5935372
--- /dev/null
+++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/storage/GSBlobStorageImplTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.configuration.MemorySize;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link GSBlobStorageImpl}. */
+class GSBlobStorageImplTest {
+
+ private static final String TEST_BUCKET = "test-bucket";
+ private GSBlobStorageImpl blobStorage;
+ private Storage storage;
+
+ @BeforeEach
+ void setUp() {
+ storage = LocalStorageHelper.getOptions().getService();
+ blobStorage = new GSBlobStorageImpl(storage);
+ }
+
+ @ParameterizedTest(name = "{0} with null BlobInfo")
+ @MethodSource("provideOptionTypes")
+ void testGetBlobOptionWithNullBlobInfo(
+ String optionType,
+ Supplier