diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index fb2d8f80c..90c72a24d 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit fb2d8f80c9a976f7999a131a75c74d9b82bdcc4c +Subproject commit 90c72a24d22bbe7059fd8759fe1a1442c1e1ef41 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateCheckpointResult.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateCheckpointResult.java new file mode 100644 index 000000000..945547c6b --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateCheckpointResult.java @@ -0,0 +1,121 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.Objects; +import java.util.UUID; + +public class CreateCheckpointResult { + + private final StorageCheckpoint checkpoint; + + public CreateCheckpointResult(StorageCheckpoint checkpoint) { + this.checkpoint = checkpoint; + } + + public StorageCheckpoint checkpoint() { + return checkpoint; + } + + @Override + public String toString() { + return "CreateCheckpointResult{" + + "checkpoint=" + checkpoint + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final CreateCheckpointResult that = (CreateCheckpointResult) o; + return Objects.equals(checkpoint, that.checkpoint); + } + + @Override + public int hashCode() { + return Objects.hashCode(checkpoint); + } + + public interface StorageCheckpoint { + + T map(CheckpointMapper mapper); + + void visit(CheckpointVisitor visitor); + } + + public interface CheckpointVisitor { + void visit(SlatedbCheckpoint checkpoint); + } + + public interface CheckpointMapper { + T map(SlatedbCheckpoint checkpoint); + } + + public static class SlatedbCheckpoint implements StorageCheckpoint { + private final UUID id; + private final String path; + + public SlatedbCheckpoint(final UUID id, final String path) { + this.id = id; + this.path = path; + } + + public UUID id() { + return id; + } + + public String path() { + return path; + } + + @Override + public String toString() { + return "SlatedbCheckpoint{" + + "id=" + id + + ", path='" + path + '\'' + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final SlatedbCheckpoint that = (SlatedbCheckpoint) o; + return Objects.equals(id, that.id) && Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(id, path); + } + + @Override + public T map(final CheckpointMapper mapper) { + return mapper.map(this); + } + + @Override + public void visit(final CheckpointVisitor visitor) { + visitor.visit(this); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 5452eb4a3..31cb81a47 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -168,6 +168,16 @@ public CreateStoreResult createStore( return delegate.createStore(storeName, options); } + @Override + public CreateCheckpointResult createCheckpoint( + final UUID storeId, + final LssId lssId, + final int pssId, + final long expectedWrittenOffset + ) { + return delegate.createCheckpoint(storeId, lssId, pssId, expectedWrittenOffset); + } + public void close() { this.metrics.removeSensor(GET_SENSOR_NAME); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index 4ca0eedca..82055f1d5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -80,5 +80,12 @@ CreateStoreResult createStore( CreateStoreOptions options ); + CreateCheckpointResult createCheckpoint( + UUID storeId, + LssId lssId, + int pssId, + long expectedWrittenOffset + ); + void close(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index c0eb12e19..b816be157 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -24,11 +24,13 @@ import com.google.common.annotations.VisibleForTesting; import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.db.rs3.client.CreateCheckpointResult; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; import dev.responsive.kafka.internal.db.rs3.client.Range; @@ -391,6 +393,41 @@ public CreateStoreResult createStore( return new CreateStoreResult(uuidFromProto(result.getStoreId()), result.getPssIdsList()); } + @Override + public CreateCheckpointResult createCheckpoint( + final UUID storeId, + final LssId lssId, + final int pssId, + final long expectedWrittenOffset + ) { + final var request = Rs3.CreateCheckpointRequest.newBuilder() + .setLssId(lssIdProto(lssId)) + .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)) + .build(); + final var stub = stubs.stubs(storeId, pssId).syncStub(); + + final var result = withRetry( + () -> stub.createCheckpoint(request), + () -> "CreateCheckpoint(storeId=" + storeId + + ", lssId=" + lssId + + ", pssId=" + pssId + + ", expectedWrittenOffset=" + expectedWrittenOffset + + ")" + ); + + final var checkpoint = result.getCheckpoint(); + if (!checkpoint.hasSlatedbStorageCheckpoint()) { + throw new RS3Exception("Unexpected checkpoint type in CreateCheckpoint result"); + } + + final Rs3.SlateDbStorageCheckpoint slatedbCheckpoint = + checkpoint.getSlatedbStorageCheckpoint(); + return new CreateCheckpointResult(new CreateCheckpointResult.SlatedbCheckpoint( + uuidFromProto(slatedbCheckpoint.getCheckpointId()), + slatedbCheckpoint.getPath() + )); + } + private void checkField(final Supplier check, final String field) { if (!check.get()) { throw new RuntimeException("rs3 resp proto missing field " + field); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index a021fd07c..04eac2233 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -14,6 +14,7 @@ import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.createStoreOptionsProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newEndOfStreamResult; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; @@ -33,6 +34,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import dev.responsive.kafka.internal.db.rs3.client.CreateCheckpointResult; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; @@ -1252,6 +1254,42 @@ public void shouldPropagateUnexpectedExceptionFromObserverOnError() { } } + @Test + public void shouldCreateCheckpoint() { + final var checkpointId = UUID.randomUUID(); + final var checkpointPath = "/some/checkpoint/path"; + final var storageCheckpoint = Rs3.StorageCheckpoint.newBuilder().setSlatedbStorageCheckpoint( + Rs3.SlateDbStorageCheckpoint + .newBuilder() + .setCheckpointId(uuidToProto(checkpointId)) + .setPath(checkpointPath) + .build() + ); + when(stub.createCheckpoint(any())) + .thenReturn(Rs3.CreateCheckpointResult + .newBuilder() + .setCheckpoint(storageCheckpoint) + .build()); + + final var checkpoint = client.createCheckpoint( + STORE_ID, + LSS_ID, + PSS_ID, + 10 + ).checkpoint(); + + assertThat(checkpoint.map(CreateCheckpointResult.SlatedbCheckpoint::id), is(checkpointId)); + assertThat(checkpoint.map(CreateCheckpointResult.SlatedbCheckpoint::path), is(checkpointPath)); + + verify(stub).createCheckpoint( + Rs3.CreateCheckpointRequest + .newBuilder() + .setLssId(lssIdProto(LSS_ID)) + .setExpectedWrittenOffset(walOffsetProto(10)) + .build() + ); + } + private StreamObserver verifyWalSegmentResultObserver() { verify(asyncStub).writeWALSegmentStream(writeWALSegmentResultObserverCaptor.capture()); return writeWALSegmentResultObserverCaptor.getValue();