Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from 59f088 to 55c8b0
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicDeleteProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto;
import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto;
import static dev.responsive.kafka.internal.utils.Utils.lssIdProto;
import static dev.responsive.kafka.internal.utils.Utils.uuidFromProto;
import static dev.responsive.kafka.internal.utils.Utils.uuidToProto;
Expand Down Expand Up @@ -51,8 +53,6 @@
import org.apache.kafka.streams.state.KeyValueIterator;

public class GrpcRS3Client implements RS3Client {
public static final long WAL_OFFSET_NONE = Long.MAX_VALUE;

private final PssStubsProvider stubs;
private final Time time;
private final long retryTimeoutMs;
Expand Down Expand Up @@ -88,10 +88,8 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f
checkField(result::hasWrittenOffset, "writtenOffset");
checkField(result::hasFlushedOffset, "flushedOffset");
return new CurrentOffsets(
result.getWrittenOffset() == WAL_OFFSET_NONE
? Optional.empty() : Optional.of(result.getWrittenOffset()),
result.getFlushedOffset() == WAL_OFFSET_NONE
? Optional.empty() : Optional.of(result.getFlushedOffset())
walOffsetFromProto(result.getWrittenOffset()),
walOffsetFromProto(result.getFlushedOffset())
);
}

Expand Down Expand Up @@ -158,7 +156,7 @@ public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsyn
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setEndOffset(endOffset)
.setExpectedWrittenOffset(expectedWrittenOffset.orElse(WAL_OFFSET_NONE));
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
addWalEntryToSegment(entry, entryBuilder);
return entryBuilder.build();
},
Expand All @@ -169,8 +167,7 @@ public StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsyn
resultObserver.message()
.thenApply(r -> {
checkField(r::hasFlushedOffset, "flushedOffset");
return r.getFlushedOffset() == WAL_OFFSET_NONE
? Optional.empty() : Optional.of(r.getFlushedOffset());
return walOffsetFromProto(r.getFlushedOffset());
})
);
}
Expand Down Expand Up @@ -235,8 +232,8 @@ public KeyValueIterator<Bytes, byte[]> range(
final var requestBuilder = Rs3.RangeRequest.newBuilder()
.setStoreId(uuidToProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId);
expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset);
.setPssId(pssId)
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
final Supplier<String> rangeDescription =
() -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")";
final var asyncStub = stubs.stubs(storeId, pssId).asyncStub();
Expand All @@ -256,8 +253,8 @@ public Optional<byte[]> get(
.setStoreId(uuidToProto(storeId))
.setLssId(lssIdProto(lssId))
.setPssId(pssId)
.setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key)));
expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset);
.setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key)))
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
final var request = requestBuilder.build();
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface GrpcRangeRequestProxy {
* through to result observer. If a transient error is encountered through the
* observer, the caller can retry this operation with an updated `startBound`.
*
* @param start The updated start bound based on key-values seen with `resultObserver`
* @param range The updated range based on key-values seen with `resultObserver`
* @param resultObserver An observer for key-value results and the end of stream marker
*/
void send(Range range, StreamObserver<Rs3.RangeResult> resultObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.Optional;

public class GrpcRs3Util {
public static final Rs3.WALOffset UNWRITTEN_WAL_OFFSET = Rs3.WALOffset.newBuilder()
.setIsWritten(false)
.build();

public static RuntimeException wrapThrowable(Throwable t) {
final var statusOpt = getGrpcStatus(t);
Expand Down Expand Up @@ -117,7 +120,7 @@ public static Rs3.CreateStoreOptions createStoreOptionsProto(
builder.setLogicalShards(options.logicalShards());
builder.setStoreType(storeTypeProto(options.storeType()));
options.clockType().ifPresent(
type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal()))
type -> builder.setClockType(Rs3.ClockType.forNumber(type.ordinal()))
);
options.defaultTtl().ifPresent(builder::setDefaultTtl);
options.slateDbOptions().ifPresent(slateDbOptions -> {
Expand All @@ -127,4 +130,31 @@ public static Rs3.CreateStoreOptions createStoreOptionsProto(
});
return builder.build();
}

public static Rs3.WALOffset walOffsetProto(final long offset) {
return Rs3.WALOffset.newBuilder()
.setIsWritten(true)
.setOffset(offset)
.build();
}

public static Rs3.WALOffset walOffsetProto(final Optional<Long> offset) {
return offset
.map(GrpcRs3Util::walOffsetProto)
.orElse(UNWRITTEN_WAL_OFFSET);
}

public static Optional<Long> walOffsetFromProto(final Rs3.WALOffset walOffset) {
if (!walOffset.hasIsWritten()) {
throw new RS3Exception("illegal wal offset: is_written must be set");
}
if (walOffset.getIsWritten()) {
if (!walOffset.hasOffset()) {
throw new RS3Exception("illegal wal offset: offset must be set");
}
return Optional.of(walOffset.getOffset());
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ public void getOffsets(
final var currentOffset = offset.get();
final var result = Rs3.GetOffsetsResult
.newBuilder()
.setFlushedOffset(currentOffset)
.setWrittenOffset(currentOffset)
.setFlushedOffset(GrpcRs3Util.walOffsetProto(currentOffset))
.setWrittenOffset(GrpcRs3Util.walOffsetProto(currentOffset))
.build();
responseObserver.onNext(result);
responseObserver.onCompleted();
Expand All @@ -311,8 +311,8 @@ public void get(
return;
}

if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) {
if (offset.get() < req.getExpectedWrittenOffset()) {
if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down Expand Up @@ -353,8 +353,8 @@ public void range(
return;
}

if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) {
if (offset.get() < req.getExpectedWrittenOffset()) {
if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down Expand Up @@ -403,8 +403,8 @@ public void onNext(final Rs3.WriteWALSegmentRequest req) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
}

if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) {
if (offset.get() < req.getExpectedWrittenOffset()) {
if (req.getExpectedWrittenOffset().getIsWritten()) {
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
return;
}
Expand Down Expand Up @@ -434,7 +434,7 @@ public void onError(final Throwable throwable) {
public void onCompleted() {
final var result = Rs3.WriteWALSegmentResult
.newBuilder()
.setFlushedOffset(offset.get())
.setFlushedOffset(GrpcRs3Util.walOffsetProto(offset.get()))
.build();
responseObserver.onNext(result);
responseObserver.onCompleted();
Expand Down
Loading
Loading