diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index da6498b98..fb2d8f80c 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit da6498b9847edec32c0890fdbb0db9ed709abc81 +Subproject commit fb2d8f80c9a976f7999a131a75c74d9b82bdcc4c diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 223732b2c..c0eb12e19 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -260,7 +260,7 @@ private > KeyValueIterator sendRange( .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) - .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)); + .setExpectedMinWrittenOffset(walOffsetProto(expectedWrittenOffset)); final Supplier rangeDescription = () -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"; final var asyncStub = stubs.stubs(storeId, pssId).asyncStub(); @@ -335,7 +335,7 @@ private Optional sendGet( requestBuilder.setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) - .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)); + .setExpectedMinWrittenOffset(walOffsetProto(expectedWrittenOffset)); final var request = requestBuilder.build(); final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index a7ba9bfdd..a021fd07c 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -666,7 +666,7 @@ public void shouldGetWithExpectedWrittenOffset() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(123L)) + .setExpectedMinWrittenOffset(GrpcRs3Util.walOffsetProto(123L)) .setKey(Rs3.Key.newBuilder().setBasicKey( GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8)) )) @@ -706,7 +706,7 @@ public void shouldGet() { .setKey(Rs3.Key.newBuilder().setBasicKey( GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8)) )) - .setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) + .setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) .build() ); } @@ -825,7 +825,7 @@ public void shouldWindowedGet() { verify(stub).get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) + .setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) .setStoreId(uuidToProto(STORE_ID)) .setKey(Rs3.Key.newBuilder().setWindowKey(keyProto)) .build() @@ -866,7 +866,7 @@ public void shouldRetryWindowedGet() { .get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) + .setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) .setStoreId(uuidToProto(STORE_ID)) .setKey(Rs3.Key.newBuilder().setWindowKey(keyProto)) .build() diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java index 5404eee46..fe724bc99 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java @@ -83,8 +83,8 @@ public void get( return; } - if (req.getExpectedWrittenOffset().getIsWritten()) { - if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { + if (req.getExpectedMinWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedMinWrittenOffset().getOffset()) { responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); return; } @@ -113,8 +113,8 @@ public void range( return; } - if (req.getExpectedWrittenOffset().getIsWritten()) { - if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { + if (req.getExpectedMinWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedMinWrittenOffset().getOffset()) { responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); return; }