Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from da6498 to fb2d8f
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private <K extends Comparable<K>> KeyValueIterator<K, byte[]> sendRange(
.setStoreId(uuidToProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
.setExpectedMinWrittenOffset(walOffsetProto(expectedWrittenOffset));
final Supplier<String> rangeDescription =
() -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")";
final var asyncStub = stubs.stubs(storeId, pssId).asyncStub();
Expand Down Expand Up @@ -335,7 +335,7 @@ private Optional<Rs3.KeyValue> sendGet(
requestBuilder.setStoreId(uuidToProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
.setExpectedMinWrittenOffset(walOffsetProto(expectedWrittenOffset));

final var request = requestBuilder.build();
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public void shouldGetWithExpectedWrittenOffset() {
.setLssId(lssIdProto(LSS_ID))
.setPssId(PSS_ID)
.setStoreId(uuidToProto(STORE_ID))
.setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(123L))
.setExpectedMinWrittenOffset(GrpcRs3Util.walOffsetProto(123L))
.setKey(Rs3.Key.newBuilder().setBasicKey(
GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8))
))
Expand Down Expand Up @@ -706,7 +706,7 @@ public void shouldGet() {
.setKey(Rs3.Key.newBuilder().setBasicKey(
GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8))
))
.setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
.setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
.build()
);
}
Expand Down Expand Up @@ -825,7 +825,7 @@ public void shouldWindowedGet() {
verify(stub).get(Rs3.GetRequest.newBuilder()
.setLssId(lssIdProto(LSS_ID))
.setPssId(PSS_ID)
.setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
.setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
.setStoreId(uuidToProto(STORE_ID))
.setKey(Rs3.Key.newBuilder().setWindowKey(keyProto))
.build()
Expand Down Expand Up @@ -866,7 +866,7 @@ public void shouldRetryWindowedGet() {
.get(Rs3.GetRequest.newBuilder()
.setLssId(lssIdProto(LSS_ID))
.setPssId(PSS_ID)
.setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
.setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
.setStoreId(uuidToProto(STORE_ID))
.setKey(Rs3.Key.newBuilder().setWindowKey(keyProto))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void get(
return;
}

if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
if (req.getExpectedMinWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedMinWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down Expand Up @@ -113,8 +113,8 @@ public void range(
return;
}

if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
if (req.getExpectedMinWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedMinWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down
Loading