diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 59f088e12..55c8b0abe 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 59f088e12b2afe75441b06735be75c3a9178664a +Subproject commit 55c8b0abe6708b3238c2ffbaf335ae366da6adf0 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index f5eba6c16..577a471b6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -15,6 +15,8 @@ import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicDeleteProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; import static dev.responsive.kafka.internal.utils.Utils.uuidFromProto; import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; @@ -51,8 +53,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; public class GrpcRS3Client implements RS3Client { - public static final long WAL_OFFSET_NONE = Long.MAX_VALUE; - private final PssStubsProvider stubs; private final Time time; private final long retryTimeoutMs; @@ -88,10 +88,8 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f checkField(result::hasWrittenOffset, "writtenOffset"); checkField(result::hasFlushedOffset, "flushedOffset"); return new CurrentOffsets( - result.getWrittenOffset() == WAL_OFFSET_NONE - ? Optional.empty() : Optional.of(result.getWrittenOffset()), - result.getFlushedOffset() == WAL_OFFSET_NONE - ? Optional.empty() : Optional.of(result.getFlushedOffset()) + walOffsetFromProto(result.getWrittenOffset()), + walOffsetFromProto(result.getFlushedOffset()) ); } @@ -158,7 +156,7 @@ public StreamSenderMessageReceiver> writeWalSegmentAsyn .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setEndOffset(endOffset) - .setExpectedWrittenOffset(expectedWrittenOffset.orElse(WAL_OFFSET_NONE)); + .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)); addWalEntryToSegment(entry, entryBuilder); return entryBuilder.build(); }, @@ -169,8 +167,7 @@ public StreamSenderMessageReceiver> writeWalSegmentAsyn resultObserver.message() .thenApply(r -> { checkField(r::hasFlushedOffset, "flushedOffset"); - return r.getFlushedOffset() == WAL_OFFSET_NONE - ? Optional.empty() : Optional.of(r.getFlushedOffset()); + return walOffsetFromProto(r.getFlushedOffset()); }) ); } @@ -235,8 +232,8 @@ public KeyValueIterator range( final var requestBuilder = Rs3.RangeRequest.newBuilder() .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) - .setPssId(pssId); - expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset); + .setPssId(pssId) + .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)); final Supplier rangeDescription = () -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"; final var asyncStub = stubs.stubs(storeId, pssId).asyncStub(); @@ -256,8 +253,8 @@ public Optional get( .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) - .setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key))); - expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset); + .setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key))) + .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)); final var request = requestBuilder.build(); final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java index 644bf48c6..6b12fd288 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java @@ -30,7 +30,7 @@ public interface GrpcRangeRequestProxy { * through to result observer. If a transient error is encountered through the * observer, the caller can retry this operation with an updated `startBound`. * - * @param start The updated start bound based on key-values seen with `resultObserver` + * @param range The updated range based on key-values seen with `resultObserver` * @param resultObserver An observer for key-value results and the end of stream marker */ void send(Range range, StreamObserver resultObserver); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java index 135538e27..e46f9be4c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java @@ -25,6 +25,9 @@ import java.util.Optional; public class GrpcRs3Util { + public static final Rs3.WALOffset UNWRITTEN_WAL_OFFSET = Rs3.WALOffset.newBuilder() + .setIsWritten(false) + .build(); public static RuntimeException wrapThrowable(Throwable t) { final var statusOpt = getGrpcStatus(t); @@ -117,7 +120,7 @@ public static Rs3.CreateStoreOptions createStoreOptionsProto( builder.setLogicalShards(options.logicalShards()); builder.setStoreType(storeTypeProto(options.storeType())); options.clockType().ifPresent( - type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal())) + type -> builder.setClockType(Rs3.ClockType.forNumber(type.ordinal())) ); options.defaultTtl().ifPresent(builder::setDefaultTtl); options.slateDbOptions().ifPresent(slateDbOptions -> { @@ -127,4 +130,31 @@ public static Rs3.CreateStoreOptions createStoreOptionsProto( }); return builder.build(); } + + public static Rs3.WALOffset walOffsetProto(final long offset) { + return Rs3.WALOffset.newBuilder() + .setIsWritten(true) + .setOffset(offset) + .build(); + } + + public static Rs3.WALOffset walOffsetProto(final Optional offset) { + return offset + .map(GrpcRs3Util::walOffsetProto) + .orElse(UNWRITTEN_WAL_OFFSET); + } + + public static Optional walOffsetFromProto(final Rs3.WALOffset walOffset) { + if (!walOffset.hasIsWritten()) { + throw new RS3Exception("illegal wal offset: is_written must be set"); + } + if (walOffset.getIsWritten()) { + if (!walOffset.hasOffset()) { + throw new RS3Exception("illegal wal offset: offset must be set"); + } + return Optional.of(walOffset.getOffset()); + } else { + return Optional.empty(); + } + } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java index a0779171a..4e4bfeb93 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java @@ -288,8 +288,8 @@ public void getOffsets( final var currentOffset = offset.get(); final var result = Rs3.GetOffsetsResult .newBuilder() - .setFlushedOffset(currentOffset) - .setWrittenOffset(currentOffset) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(currentOffset)) + .setWrittenOffset(GrpcRs3Util.walOffsetProto(currentOffset)) .build(); responseObserver.onNext(result); responseObserver.onCompleted(); @@ -311,8 +311,8 @@ public void get( return; } - if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) { - if (offset.get() < req.getExpectedWrittenOffset()) { + if (req.getExpectedWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); return; } @@ -353,8 +353,8 @@ public void range( return; } - if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) { - if (offset.get() < req.getExpectedWrittenOffset()) { + if (req.getExpectedWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); return; } @@ -403,8 +403,8 @@ public void onNext(final Rs3.WriteWALSegmentRequest req) { responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); } - if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) { - if (offset.get() < req.getExpectedWrittenOffset()) { + if (req.getExpectedWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); return; } @@ -434,7 +434,7 @@ public void onError(final Throwable throwable) { public void onCompleted() { final var result = Rs3.WriteWALSegmentResult .newBuilder() - .setFlushedOffset(offset.get()) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(offset.get())) .build(); responseObserver.onNext(result); responseObserver.onCompleted(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index d72807793..94fcfa131 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -46,7 +46,7 @@ import dev.responsive.rs3.Rs3; import dev.responsive.rs3.Rs3.CreateStoreResult; import dev.responsive.rs3.Rs3.ListStoresResult; -import dev.responsive.rs3.Rs3.Store; +import dev.responsive.rs3.Rs3.StoreInfo; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -111,7 +111,7 @@ public void shouldReturnErrorOnGetOffsetsWithMissingFlushedOffset() { // given: when(stub.getOffsets(any())).thenReturn( Rs3.GetOffsetsResult.newBuilder() - .setWrittenOffset(123) + .setWrittenOffset(GrpcRs3Util.walOffsetProto(123L)) .build() ); @@ -124,7 +124,7 @@ public void shouldReturnErrorOnGetOffsetsWithMissingWrittenOffset() { // given: when(stub.getOffsets(any())).thenReturn( Rs3.GetOffsetsResult.newBuilder() - .setFlushedOffset(123) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(123L)) .build() ); @@ -137,8 +137,8 @@ public void shouldReturnEmptyWrittenOffsetOnGetOffsetsWithWrittenOffsetNone() { // given: when(stub.getOffsets(any())).thenReturn( Rs3.GetOffsetsResult.newBuilder() - .setWrittenOffset(GrpcRS3Client.WAL_OFFSET_NONE) - .setFlushedOffset(123) + .setWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(123L)) .build() ); @@ -155,8 +155,8 @@ public void shouldReturnEmptyFlushedOffsetOnGetOffsetsWithFlushedOffsetNone() { // given: when(stub.getOffsets(any())).thenReturn( Rs3.GetOffsetsResult.newBuilder() - .setWrittenOffset(123) - .setFlushedOffset(GrpcRS3Client.WAL_OFFSET_NONE) + .setWrittenOffset(GrpcRs3Util.walOffsetProto(123L)) + .setFlushedOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) .build() ); @@ -173,8 +173,8 @@ public void shouldGetOffsets() { // given: when(stub.getOffsets(any())).thenReturn( Rs3.GetOffsetsResult.newBuilder() - .setWrittenOffset(13) - .setFlushedOffset(3) + .setWrittenOffset(GrpcRs3Util.walOffsetProto(13)) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(3)) .build() ); @@ -194,8 +194,8 @@ public void shouldRetryGetOffsets() { .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) .thenReturn( Rs3.GetOffsetsResult.newBuilder() - .setWrittenOffset(13) - .setFlushedOffset(3) + .setWrittenOffset(GrpcRs3Util.walOffsetProto(13)) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(3)) .build()); // when: @@ -265,7 +265,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15)) .setEndOffset(20) .setPut(basicPutProto(put1)) .build() @@ -274,7 +274,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15)) .setEndOffset(20) .setPut(basicPutProto(put2)) .build() @@ -304,7 +304,7 @@ public void shouldWriteWalEntriesWithExpectedWrittenOffsetNone() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(GrpcRS3Client.WAL_OFFSET_NONE) + .setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) .setEndOffset(20) .setPut(basicPutProto(put1)) .build() @@ -326,7 +326,7 @@ public void shouldHandleWriteWalSegmentResponse() final var receiveFuture = senderReceiver.completion().toCompletableFuture(); final var observer = verifyWalSegmentResultObserver(); observer.onNext(Rs3.WriteWALSegmentResult.newBuilder() - .setFlushedOffset(123) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(123)) .build()); observer.onCompleted(); @@ -373,10 +373,10 @@ public void shouldThrowOnMultipleWriteWalSegmentResponseMessages() { final var receiveFuture = senderReceiver.completion().toCompletableFuture(); final var observer = verifyWalSegmentResultObserver(); observer.onNext(Rs3.WriteWALSegmentResult.newBuilder() - .setFlushedOffset(123) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(123)) .build()); observer.onNext(Rs3.WriteWALSegmentResult.newBuilder() - .setFlushedOffset(456) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(456)) .build()); observer.onCompleted(); @@ -400,7 +400,7 @@ public void shouldWriteWalSegmentSync() { StreamObserver responseObserver = invocation.getArgument(0); responseObserver.onNext(Rs3.WriteWALSegmentResult.newBuilder() - .setFlushedOffset(flushedOffset) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(flushedOffset)) .build()); responseObserver.onCompleted(); return writeWALSegmentRequestObserver; @@ -422,7 +422,7 @@ public void shouldWriteWalSegmentSync() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15L)) .setEndOffset(20) .setPut(basicPutProto((Put) entries.get(0))) .build() @@ -432,7 +432,7 @@ public void shouldWriteWalSegmentSync() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15L)) .setEndOffset(20) .setPut(basicPutProto((Put) entries.get(1))) .build() @@ -461,7 +461,7 @@ public void shouldRetryWriteWalSegmentSync() { StreamObserver responseObserver = invocation.getArgument(0); responseObserver.onNext(Rs3.WriteWALSegmentResult.newBuilder() - .setFlushedOffset(flushedOffset) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(flushedOffset)) .build()); responseObserver.onCompleted(); reset(writeWALSegmentRequestObserver); @@ -485,7 +485,7 @@ public void shouldRetryWriteWalSegmentSync() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15L)) .setEndOffset(20) .setPut(basicPutProto((Put) entries.get(0))) .build() @@ -495,7 +495,7 @@ public void shouldRetryWriteWalSegmentSync() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15L)) .setEndOffset(20) .setPut(basicPutProto((Put) entries.get(1))) .build() @@ -522,7 +522,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInOnNext() { StreamObserver responseObserver = invocation.getArgument(0); responseObserver.onNext(Rs3.WriteWALSegmentResult.newBuilder() - .setFlushedOffset(flushedOffset) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(flushedOffset)) .build()); responseObserver.onCompleted(); reset(writeWALSegmentRequestObserver); @@ -546,7 +546,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInOnNext() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15L)) .setEndOffset(20) .setPut(basicPutProto((Put) entries.get(0))) .build() @@ -573,7 +573,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInFinish() { StreamObserver responseObserver = invocation.getArgument(0); responseObserver.onNext(Rs3.WriteWALSegmentResult.newBuilder() - .setFlushedOffset(flushedOffset) + .setFlushedOffset(GrpcRs3Util.walOffsetProto(flushedOffset)) .build()); responseObserver.onCompleted(); reset(writeWALSegmentRequestObserver); @@ -596,7 +596,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInFinish() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(15L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(15L)) .setEndOffset(20) .setPut(basicPutProto((Put) entries.get(0))) .build() @@ -657,7 +657,7 @@ public void shouldGetWithExpectedWrittenOffset() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(123L) + .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(123L)) .setKey(Rs3.Key.newBuilder().setBasicKey( GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8)) )) @@ -691,6 +691,7 @@ public void shouldGet() { .setKey(Rs3.Key.newBuilder().setBasicKey( GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8)) )) + .setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) .build() ); } @@ -880,7 +881,7 @@ public void shouldListStores() { // given: when(stub.listStores(any())).thenReturn( ListStoresResult.newBuilder() - .addStores(Store.newBuilder() + .addStores(StoreInfo.newBuilder() .setStoreName(STORE_NAME) .setStoreId(uuidToProto(STORE_ID)) .addAllPssIds(List.of(PSS_ID, PSS_ID_2))