Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from 59f088 to 55c8b0
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicDeleteProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto;
import static dev.responsive.kafka.internal.utils.Utils.lssIdProto;
import static dev.responsive.kafka.internal.utils.Utils.uuidFromProto;
import static dev.responsive.kafka.internal.utils.Utils.uuidToProto;
Expand Down Expand Up @@ -51,8 +53,6 @@
import org.apache.kafka.streams.state.KeyValueIterator;

public class GrpcRS3Client implements RS3Client {
public static final long WAL_OFFSET_NONE = Long.MAX_VALUE;

private final PssStubsProvider stubs;
private final Time time;
private final long retryTimeoutMs;
Expand Down Expand Up @@ -88,10 +88,8 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f
checkField(result::hasWrittenOffset, "writtenOffset");
checkField(result::hasFlushedOffset, "flushedOffset");
return new CurrentOffsets(
result.getWrittenOffset() == WAL_OFFSET_NONE
? Optional.empty() : Optional.of(result.getWrittenOffset()),
result.getFlushedOffset() == WAL_OFFSET_NONE
? Optional.empty() : Optional.of(result.getFlushedOffset())
walOffsetFromProto(result.getWrittenOffset()),
walOffsetFromProto(result.getFlushedOffset())
);
}

Expand Down Expand Up @@ -158,7 +156,7 @@ public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsyn
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setEndOffset(endOffset)
.setExpectedWrittenOffset(expectedWrittenOffset.orElse(WAL_OFFSET_NONE));
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
addWalEntryToSegment(entry, entryBuilder);
return entryBuilder.build();
},
Expand All @@ -169,8 +167,7 @@ public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsyn
resultObserver.message()
.thenApply(r -> {
checkField(r::hasFlushedOffset, "flushedOffset");
return r.getFlushedOffset() == WAL_OFFSET_NONE
? Optional.empty() : Optional.of(r.getFlushedOffset());
return walOffsetFromProto(r.getFlushedOffset());
})
);
}
Expand Down Expand Up @@ -235,8 +232,8 @@ public KeyValueIterator<Bytes, byte[]> range(
final var requestBuilder = Rs3.RangeRequest.newBuilder()
.setStoreId(uuidToProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId);
expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset);
.setPssId(pssId)
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
final Supplier<String> rangeDescription =
() -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")";
final var asyncStub = stubs.stubs(storeId, pssId).asyncStub();
Expand All @@ -256,8 +253,8 @@ public Optional<byte[]> get(
.setStoreId(uuidToProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key)));
expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset);
.setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key)))
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
final var request = requestBuilder.build();
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.Optional;

public class GrpcRs3Util {
public static final Rs3.WALOffset UNWRITTEN_WAL_OFFSET = Rs3.WALOffset.newBuilder()
.setIsWritten(false)
.build();

public static RuntimeException wrapThrowable(Throwable t) {
final var statusOpt = getGrpcStatus(t);
Expand Down Expand Up @@ -117,7 +120,7 @@ public static Rs3.CreateStoreOptions createStoreOptionsProto(
builder.setLogicalShards(options.logicalShards());
builder.setStoreType(storeTypeProto(options.storeType()));
options.clockType().ifPresent(
type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal()))
type -> builder.setClockType(Rs3.ClockType.forNumber(type.ordinal()))
);
options.defaultTtl().ifPresent(builder::setDefaultTtl);
options.slateDbOptions().ifPresent(slateDbOptions -> {
Expand All @@ -127,4 +130,31 @@ public static Rs3.CreateStoreOptions createStoreOptionsProto(
});
return builder.build();
}

public static Rs3.WALOffset walOffsetProto(final long offset) {
return Rs3.WALOffset.newBuilder()
.setIsWritten(true)
.setOffset(offset)
.build();
}

public static Rs3.WALOffset walOffsetProto(final Optional<Long> offset) {
return offset
.map(GrpcRs3Util::walOffsetProto)
.orElse(UNWRITTEN_WAL_OFFSET);
}

public static Optional<Long> walOffsetFromProto(final Rs3.WALOffset walOffset) {
if (!walOffset.hasIsWritten()) {
throw new RS3Exception("illegal wal offset: is_written must be set");
}
if (walOffset.getIsWritten()) {
if (!walOffset.hasOffset()) {
throw new RS3Exception("illegal wal offset: offset must be set");
}
return Optional.of(walOffset.getOffset());
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ public void getOffsets(
final var currentOffset = offset.get();
final var result = Rs3.GetOffsetsResult
.newBuilder()
.setFlushedOffset(currentOffset)
.setWrittenOffset(currentOffset)
.setFlushedOffset(GrpcRs3Util.walOffsetProto(currentOffset))
.setWrittenOffset(GrpcRs3Util.walOffsetProto(currentOffset))
.build();
responseObserver.onNext(result);
responseObserver.onCompleted();
Expand All @@ -311,8 +311,8 @@ public void get(
return;
}

if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) {
if (offset.get() < req.getExpectedWrittenOffset()) {
if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down Expand Up @@ -353,8 +353,8 @@ public void range(
return;
}

if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) {
if (offset.get() < req.getExpectedWrittenOffset()) {
if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down Expand Up @@ -403,8 +403,8 @@ public void onNext(final Rs3.WriteWALSegmentRequest req) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
}

if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) {
if (offset.get() < req.getExpectedWrittenOffset()) {
if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down Expand Up @@ -434,7 +434,7 @@ public void onError(final Throwable throwable) {
public void onCompleted() {
final var result = Rs3.WriteWALSegmentResult
.newBuilder()
.setFlushedOffset(offset.get())
.setFlushedOffset(GrpcRs3Util.walOffsetProto(offset.get()))
.build();
responseObserver.onNext(result);
responseObserver.onCompleted();
Expand Down
Loading
Loading