diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index f385e7902..59f088e12 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit f385e79026aac033a409794dcd6b755eedbe4429 +Subproject commit 59f088e12b2afe75441b06735be75c3a9178664a diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java index 14c4a17ab..cc2dcd429 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java @@ -18,6 +18,7 @@ import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; @@ -137,8 +138,7 @@ public KeyValueIterator range( final Bytes to, final long streamTimeMs ) { - final RangeBound fromBound = RangeBound.inclusive(from.get()); - final RangeBound toBound = RangeBound.exclusive(to.get()); + final var range = new Range(RangeBound.inclusive(from.get()), RangeBound.exclusive(to.get())); final List> pssIters = new ArrayList<>(); for (int pssId : pssPartitioner.pssForLss(this.lssId)) { @@ -147,8 +147,7 @@ public KeyValueIterator range( lssId, pssId, flushManager.writtenOffset(pssId), - fromBound, - toBound + range )); } return new MergeKeyValueIterator<>(pssIters); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java index 3c901be7a..bfc927387 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java @@ -13,6 +13,7 @@ package dev.responsive.kafka.internal.db.rs3; import dev.responsive.kafka.internal.db.RemoteKVTable; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; @@ -76,12 +77,14 @@ public static UUID createStore( : Optional.empty(); final var options = new CreateStoreOptions( + numKafkaPartitions, + CreateStoreTypes.StoreType.BASIC, ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(), defaultTtl, Optional.empty() ); - final var result = rs3Client.createStore(storeName, numKafkaPartitions, options); + final var result = rs3Client.createStore(storeName, options); LOG.info("Created store {} ({}) with {} logical shards and {} physical shards", storeName, result.storeId(), numKafkaPartitions, result.pssIds().size()); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java index 7f5df9785..bf2faa68d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java @@ -1,7 +1,7 @@ package dev.responsive.kafka.internal.db.rs3.client; -import dev.responsive.rs3.Rs3; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -12,21 +12,39 @@ public enum ClockType { STREAM_TIME } + public enum StoreType { + BASIC, + WINDOW + } + public static class CreateStoreOptions { + private final int logicalShards; + private final StoreType storeType; private final Optional clockType; private final Optional defaultTtl; - private final Optional filterBitsPerKey; - + private final Optional slateDbOptions; public CreateStoreOptions( + final int logicalShards, + final StoreType storeType, final Optional clockType, final Optional defaultTtl, - final Optional filterBitsPerKey + final Optional slateDbOptions ) { - this.clockType = clockType; - this.defaultTtl = defaultTtl; - this.filterBitsPerKey = filterBitsPerKey; + this.logicalShards = logicalShards; + this.storeType = Objects.requireNonNull(storeType); + this.clockType = Objects.requireNonNull(clockType); + this.defaultTtl = Objects.requireNonNull(defaultTtl); + this.slateDbOptions = Objects.requireNonNull(slateDbOptions); + } + + public int logicalShards() { + return logicalShards; + } + + public StoreType storeType() { + return storeType; } public Optional clockType() { @@ -37,26 +55,18 @@ public Optional defaultTtl() { return defaultTtl; } - public Optional filterBitsPerKey() { - return filterBitsPerKey; - } - - public Rs3.CreateStoreOptions toProto() { - final var builder = Rs3.CreateStoreOptions.newBuilder(); - clockType.ifPresent( - type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal())) - ); - defaultTtl.ifPresent(builder::setDefaultTtl); - filterBitsPerKey.ifPresent(builder::setFilterBitsPerKey); - return builder.build(); + public Optional slateDbOptions() { + return slateDbOptions; } @Override public String toString() { return "CreateStoreOptions{" - + "clockType=" + clockType + + "logicalShards=" + logicalShards + + ", storeType=" + storeType + + ", clockType=" + clockType + ", defaultTtl=" + defaultTtl - + ", filterBitsPerKey=" + filterBitsPerKey + + ", slateDbOptions=" + slateDbOptions + '}'; } @@ -68,24 +78,54 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final CreateStoreOptions that = (CreateStoreOptions) o; + return logicalShards == that.logicalShards && storeType == that.storeType + && Objects.equals(clockType, that.clockType) && Objects.equals( + defaultTtl, + that.defaultTtl + ) && Objects.equals(slateDbOptions, that.slateDbOptions); + } - if (!clockType.equals(that.clockType)) { - return false; + @Override + public int hashCode() { + return Objects.hash(logicalShards, storeType, clockType, defaultTtl, slateDbOptions); + } + } + + public static class SlateDbStorageOptions { + private final Optional filterBitsPerKey; + + public SlateDbStorageOptions( + final Optional filterBitsPerKey) { + this.filterBitsPerKey = Objects.requireNonNull(filterBitsPerKey); + } + + public Optional filterBitsPerKey() { + return filterBitsPerKey; + } + + @Override + public String toString() { + return "SlateDbStorageOptions{" + + "filterBitsPerKey=" + filterBitsPerKey + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; } - if (!defaultTtl.equals(that.defaultTtl)) { + if (o == null || getClass() != o.getClass()) { return false; } - return filterBitsPerKey.equals(that.filterBitsPerKey); + final SlateDbStorageOptions that = (SlateDbStorageOptions) o; + return Objects.equals(filterBitsPerKey, that.filterBitsPerKey); } @Override public int hashCode() { - int result = clockType.hashCode(); - result = 31 * result + defaultTtl.hashCode(); - result = 31 * result + filterBitsPerKey.hashCode(); - return result; + return Objects.hashCode(filterBitsPerKey); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Delete.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Delete.java new file mode 100644 index 000000000..eb9851866 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Delete.java @@ -0,0 +1,33 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.Arrays; +import java.util.Objects; + +public class Delete extends WalEntry { + private final byte[] key; + + public Delete(final byte[] key) { + this.key = Objects.requireNonNull(key); + } + + public byte[] key() { + return key; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Delete delete = (Delete) o; + return Objects.deepEquals(key, delete.key); + } + + @Override + public int hashCode() { + return Arrays.hashCode(key); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 1fdf6cbda..e71ac8274 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -100,16 +100,14 @@ public KeyValueIterator range( final LssId lssId, final int pssId, final Optional expectedWrittenOffset, - final RangeBound from, - final RangeBound to + final Range range ) { return delegate.range( storeId, lssId, pssId, expectedWrittenOffset, - from, - to + range ); } @@ -121,10 +119,9 @@ public List listStores() { @Override public CreateStoreResult createStore( final String storeName, - final int logicalShards, final CreateStoreOptions options ) { - return delegate.createStore(storeName, logicalShards, options); + return delegate.createStore(storeName, options); } public void close() { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java index 4bfd8709c..b1adf95c2 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java @@ -14,7 +14,6 @@ import java.util.Arrays; import java.util.Objects; -import java.util.Optional; public class Put extends WalEntry { private final byte[] key; @@ -22,27 +21,30 @@ public class Put extends WalEntry { public Put(final byte[] key, final byte[] value) { this.key = Objects.requireNonNull(key); - this.value = value; + this.value = Objects.requireNonNull(value); } public byte[] key() { return key; } - public Optional value() { - return Optional.ofNullable(value); + public byte[] value() { + return value; } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } - if (!(o instanceof Put)) { + if (o == null || getClass() != o.getClass()) { return false; } final Put put = (Put) o; - return Objects.deepEquals(key, put.key) && Objects.deepEquals(value, put.value); + return Objects.deepEquals(key, put.key) && Objects.deepEquals( + value, + put.value + ); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index d4c59c351..760e167fc 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -53,15 +53,13 @@ KeyValueIterator range( LssId lssId, int pssId, Optional expectedWrittenOffset, - RangeBound from, - RangeBound to + Range range ); List listStores(); CreateStoreResult createStore( String storeId, - int logicalShards, CreateStoreOptions options ); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java index 2f0b21191..0480f676f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java @@ -1,6 +1,7 @@ package dev.responsive.kafka.internal.db.rs3.client; import java.util.Arrays; +import java.util.Objects; public class Range { private final RangeBound start; @@ -61,4 +62,24 @@ public Boolean map(final RangeBound.Unbounded b) { }); } + public static Range unbounded() { + return new Range(RangeBound.unbounded(), RangeBound.unbounded()); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Range range = (Range) o; + return Objects.equals(start, range.start) && Objects.equals(end, range.end); + } + + @Override + public int hashCode() { + return Objects.hash(start, end); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java index ccc2d65bc..6bd674806 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java @@ -15,6 +15,7 @@ import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.rs3.Rs3; import io.grpc.stub.StreamObserver; @@ -36,23 +37,23 @@ public class GrpcKeyValueIterator implements KeyValueIterator { private final GrpcRangeRequestProxy requestProxy; private final GrpcMessageQueue queue; - private RangeBound startBound; + private Range range; private RangeResultObserver resultObserver; public GrpcKeyValueIterator( - RangeBound initialStartBound, + Range range, GrpcRangeRequestProxy requestProxy ) { this.requestProxy = requestProxy; this.queue = new GrpcMessageQueue<>(); - this.startBound = initialStartBound; + this.range = range; sendRangeRequest(); } private void sendRangeRequest() { // Note that backoff on retry is handled internally by the request proxy resultObserver = new RangeResultObserver(); - requestProxy.send(startBound, resultObserver); + requestProxy.send(range, resultObserver); } @Override @@ -66,7 +67,9 @@ public KeyValue next() { if (nextKeyValue.isPresent()) { queue.poll(); final var keyValue = nextKeyValue.get(); - this.startBound = RangeBound.exclusive(keyValue.key.get()); + final var newStartRange = RangeBound.exclusive(keyValue.key.get()); + final var newEndRange = this.range.end(); + this.range = new Range(newStartRange, newEndRange); return keyValue; } else { throw new NoSuchElementException(); @@ -134,8 +137,10 @@ public void onNext(final Rs3.RangeResult rangeResult) { } else if (rangeResult.getType() == Rs3.RangeResult.Type.END_OF_STREAM) { queue.put(new EndOfStream()); } else { - final var result = rangeResult.getResult(); - queue.put(new Result(result.getKey(), result.getValue())); + final var kv = rangeResult.getResult().getBasicKv(); + final var key = kv.getKey().getKey(); + final var value = kv.getValue().getValue(); + queue.put(new Result(key, value)); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 82ff880de..f5eba6c16 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -12,21 +12,25 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicDeleteProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; import static dev.responsive.kafka.internal.utils.Utils.uuidFromProto; import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets; +import dev.responsive.kafka.internal.db.rs3.client.Delete; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.kafka.internal.db.rs3.client.Store; import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; @@ -226,20 +230,18 @@ public KeyValueIterator range( final LssId lssId, final int pssId, final Optional expectedWrittenOffset, - RangeBound from, - RangeBound to + Range range ) { final var requestBuilder = Rs3.RangeRequest.newBuilder() .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) - .setPssId(pssId) - .setTo(protoBound(to)); + .setPssId(pssId); expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset); final Supplier rangeDescription = () -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"; final var asyncStub = stubs.stubs(storeId, pssId).asyncStub(); final var rangeProxy = new RangeProxy(requestBuilder, asyncStub, rangeDescription); - return new GrpcKeyValueIterator(from, rangeProxy); + return new GrpcKeyValueIterator(range, rangeProxy); } @Override @@ -254,7 +256,7 @@ public Optional get( .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) - .setKey(ByteString.copyFrom(key)); + .setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key))); expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset); final var request = requestBuilder.build(); final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); @@ -267,8 +269,9 @@ public Optional get( return Optional.empty(); } final Rs3.KeyValue keyValue = result.getResult(); - checkField(keyValue::hasValue, "value"); - return Optional.of(keyValue.getValue().toByteArray()); + checkField(keyValue::hasBasicKv, "value"); + final var value = keyValue.getBasicKv().getValue().getValue(); + return Optional.of(value.toByteArray()); } @Override @@ -290,21 +293,17 @@ public List listStores() { @Override public CreateStoreResult createStore( final String storeName, - final int logicalShards, final CreateStoreOptions options ) { final var request = Rs3.CreateStoreRequest.newBuilder() .setStoreName(storeName) - .setLogicalShards(logicalShards) - .setOptions(options.toProto()) + .setOptions(GrpcRs3Util.createStoreOptionsProto(options)) .build(); final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub(); final Rs3.CreateStoreResult result = withRetry( () -> stub.createStore(request), - () -> "CreateStore(storeName=" + storeName - + ", logicalShards=" + logicalShards - + ", createStoreOptions=" + options + ")" + () -> "CreateStore(storeName=" + storeName + ", createStoreOptions=" + options + ")" ); return new CreateStoreResult(uuidFromProto(result.getStoreId()), result.getPssIdsList()); @@ -315,14 +314,9 @@ private void addWalEntryToSegment( final Rs3.WriteWALSegmentRequest.Builder builder ) { if (entry instanceof Put) { - final var put = (Put) entry; - final var putBuilder = Rs3.WriteWALSegmentRequest.Put.newBuilder() - .setKey(ByteString.copyFrom(put.key())); - if (put.value().isPresent()) { - putBuilder.setValue(ByteString.copyFrom(put.value().get())); - } - putBuilder.setTtl(Rs3.Ttl.newBuilder().setTtlType(Rs3.Ttl.TtlType.DEFAULT).build()); - builder.setPut(putBuilder.build()); + builder.setPut(basicPutProto((Put) entry)); + } else if (entry instanceof Delete) { + builder.setDelete(basicDeleteProto((Delete) entry)); } } @@ -332,28 +326,37 @@ private void checkField(final Supplier check, final String field) { } } - private Rs3.Bound protoBound(RangeBound bound) { + private Rs3.Range protoRange(Range range) { + final var protoRange = Rs3.BasicRange.newBuilder() + .setFrom(protoBound(range.start())) + .setTo(protoBound(range.end())); + return Rs3.Range.newBuilder() + .setBasicRange(protoRange) + .build(); + } + + private Rs3.BasicBound protoBound(RangeBound bound) { return bound.map(new RangeBound.Mapper<>() { @Override - public Rs3.Bound map(final RangeBound.InclusiveBound b) { - return Rs3.Bound.newBuilder() - .setType(Rs3.Bound.Type.INCLUSIVE) - .setKey(ByteString.copyFrom(b.key())) + public Rs3.BasicBound map(final RangeBound.InclusiveBound b) { + return Rs3.BasicBound.newBuilder() + .setType(Rs3.BoundType.INCLUSIVE) + .setKey(basicKeyProto(b.key())) .build(); } @Override - public Rs3.Bound map(final RangeBound.ExclusiveBound b) { - return Rs3.Bound.newBuilder() - .setType(Rs3.Bound.Type.EXCLUSIVE) - .setKey(ByteString.copyFrom(b.key())) + public Rs3.BasicBound map(final RangeBound.ExclusiveBound b) { + return Rs3.BasicBound.newBuilder() + .setType(Rs3.BoundType.EXCLUSIVE) + .setKey(basicKeyProto(b.key())) .build(); } @Override - public Rs3.Bound map(final RangeBound.Unbounded b) { - return Rs3.Bound.newBuilder() - .setType(Rs3.Bound.Type.UNBOUNDED) + public Rs3.BasicBound map(final RangeBound.Unbounded b) { + return Rs3.BasicBound.newBuilder() + .setType(Rs3.BoundType.UNBOUNDED) .build(); } }); @@ -377,8 +380,8 @@ private RangeProxy( } @Override - public void send(final RangeBound start, final StreamObserver resultObserver) { - requestBuilder.setFrom(protoBound(start)); + public void send(final Range range, final StreamObserver resultObserver) { + requestBuilder.setRange(protoRange(range)); while (true) { try { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java index 0b3c41aaf..644bf48c6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java @@ -14,7 +14,7 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; -import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.rs3.Rs3; import io.grpc.stub.StreamObserver; @@ -33,5 +33,5 @@ public interface GrpcRangeRequestProxy { * @param start The updated start bound based on key-values seen with `resultObserver` * @param resultObserver An observer for key-value results and the end of stream marker */ - void send(RangeBound start, StreamObserver resultObserver); + void send(Range range, StreamObserver resultObserver); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java index 669881f63..135538e27 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java @@ -12,8 +12,13 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import com.google.protobuf.ByteString; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; +import dev.responsive.kafka.internal.db.rs3.client.Delete; +import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.rs3.Rs3; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; @@ -56,4 +61,70 @@ private static Optional getGrpcStatus(Throwable t) { return Optional.empty(); } } + + public static Rs3.BasicKey basicKeyProto(final byte[] key) { + return Rs3.BasicKey.newBuilder() + .setKey(ByteString.copyFrom(key)) + .build(); + } + + public static Rs3.BasicKeyValue basicKeyValueProto( + final byte[] key, + final byte[] value + ) { + final var keyBldr = Rs3.BasicKey.newBuilder(); + keyBldr.setKey(ByteString.copyFrom(key)); + + final var valueBldr = Rs3.BasicValue.newBuilder(); + valueBldr.setValue(ByteString.copyFrom(value)); + + return Rs3.BasicKeyValue.newBuilder() + .setKey(keyBldr) + .setValue(valueBldr) + .build(); + } + + public static Rs3.WriteWALSegmentRequest.Put basicPutProto(final Put put) { + final var kv = basicKeyValueProto(put.key(), put.value()); + return Rs3.WriteWALSegmentRequest.Put.newBuilder() + .setKv(Rs3.KeyValue.newBuilder().setBasicKv(kv).build()) + .setTtl(Rs3.Ttl.newBuilder().setTtlType(Rs3.Ttl.TtlType.DEFAULT)) + .build(); + } + + public static Rs3.WriteWALSegmentRequest.Delete basicDeleteProto(final Delete delete) { + final var key = basicKeyProto(delete.key()); + return Rs3.WriteWALSegmentRequest.Delete.newBuilder() + .setKey(Rs3.Key.newBuilder().setBasicKey(key)) + .build(); + } + + public static Rs3.StoreType storeTypeProto(final CreateStoreTypes.StoreType storeType) { + switch (storeType) { + case BASIC: + return Rs3.StoreType.BASIC; + case WINDOW: + return Rs3.StoreType.WINDOW; + default: + throw new IllegalArgumentException("Unknown store type: " + storeType); + } + } + + public static Rs3.CreateStoreOptions createStoreOptionsProto( + CreateStoreTypes.CreateStoreOptions options + ) { + final var builder = Rs3.CreateStoreOptions.newBuilder(); + builder.setLogicalShards(options.logicalShards()); + builder.setStoreType(storeTypeProto(options.storeType())); + options.clockType().ifPresent( + type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal())) + ); + options.defaultTtl().ifPresent(builder::setDefaultTtl); + options.slateDbOptions().ifPresent(slateDbOptions -> { + final var storageOptions = Rs3.SlateDbStorageOptions.newBuilder(); + slateDbOptions.filterBitsPerKey().ifPresent(storageOptions::setFilterBitsPerKey); + builder.setSlatedbStorageOptions(storageOptions); + }); + return builder.build(); + } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java index c185d67e7..a5617d564 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java @@ -15,13 +15,13 @@ import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; @@ -74,7 +74,7 @@ public void testTableMapping() { final String tableName = "test-table"; final int partitions = 5; - when(client.createStore(anyString(), anyInt(), any(CreateStoreOptions.class))) + when(client.createStore(anyString(), any(CreateStoreOptions.class))) .thenReturn(new CreateStoreResult(storeId, List.of(1, 2, 3, 4, 5))); final RS3TableFactory factory = newTestFactory(); @@ -88,11 +88,14 @@ public void testTableMapping() { assertEquals(tableName, rs3Table.name()); assertEquals(storeId, rs3Table.storedId()); - verify(client).createStore( - tableName, + final var expectedOptions = new CreateStoreOptions( partitions, - new CreateStoreOptions(Optional.empty(), Optional.empty(), Optional.empty()) + CreateStoreTypes.StoreType.BASIC, + Optional.empty(), + Optional.empty(), + Optional.empty() ); + verify(client).createStore(tableName, expectedOptions); } private RS3TableFactory newTestFactory() { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java index e2ec20da3..06dfa9571 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.eq; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; +import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.rs3.Rs3; import io.grpc.Status; @@ -43,7 +44,10 @@ class GrpcKeyValueIteratorTest { @Test @SuppressWarnings("unchecked") public void shouldIterateKeyValueResults() { - final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + final var range = new Range( + RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + RangeBound.unbounded() + ); Mockito.doAnswer(invocation -> { StreamObserver observer = invocation.getArgument(1, StreamObserver.class); observer.onNext(newKeyValueResult("a")); @@ -52,9 +56,9 @@ public void shouldIterateKeyValueResults() { observer.onNext(newEndOfStreamResult()); observer.onCompleted(); return null; - }).when(requestProxy).send(eq(startBound), any()); + }).when(requestProxy).send(eq(range), any()); - try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { assertNextKey(iter, "a"); assertNextKey(iter, "b"); assertNextKey(iter, "c"); @@ -65,15 +69,21 @@ public void shouldIterateKeyValueResults() { @Test @SuppressWarnings("unchecked") public void shouldRetryRangeRequestAfterTransientFailure() { - final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + final var range = new Range( + RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + RangeBound.unbounded() + ); Mockito.doAnswer(invocation -> { StreamObserver observer = invocation.getArgument(1, StreamObserver.class); observer.onNext(newKeyValueResult("a")); observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); return null; - }).when(requestProxy).send(eq(startBound), any()); + }).when(requestProxy).send(eq(range), any()); - final var retryStartBound = RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)); + final var retryRange = new Range( + RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)), + RangeBound.unbounded() + ); Mockito.doAnswer(invocation -> { StreamObserver observer = invocation.getArgument(1, StreamObserver.class); observer.onNext(newKeyValueResult("b")); @@ -81,9 +91,9 @@ public void shouldRetryRangeRequestAfterTransientFailure() { observer.onNext(newEndOfStreamResult()); observer.onCompleted(); return null; - }).when(requestProxy).send(eq(retryStartBound), any()); + }).when(requestProxy).send(eq(retryRange), any()); - try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { assertNextKey(iter, "a"); assertNextKey(iter, "b"); assertNextKey(iter, "c"); @@ -94,15 +104,21 @@ public void shouldRetryRangeRequestAfterTransientFailure() { @Test @SuppressWarnings("unchecked") public void shouldRetryAfterUnexpectedStreamCompletion() { - final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + final var range = new Range( + RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + RangeBound.unbounded() + ); Mockito.doAnswer(invocation -> { StreamObserver observer = invocation.getArgument(1, StreamObserver.class); observer.onNext(newKeyValueResult("a")); observer.onCompleted(); return null; - }).when(requestProxy).send(eq(startBound), any()); + }).when(requestProxy).send(eq(range), any()); - final var retryStartBound = RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)); + final var retryRange = new Range( + RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)), + RangeBound.unbounded() + ); Mockito.doAnswer(invocation -> { StreamObserver observer = invocation.getArgument(1, StreamObserver.class); observer.onNext(newKeyValueResult("b")); @@ -110,9 +126,9 @@ public void shouldRetryAfterUnexpectedStreamCompletion() { observer.onNext(newEndOfStreamResult()); observer.onCompleted(); return null; - }).when(requestProxy).send(eq(retryStartBound), any()); + }).when(requestProxy).send(eq(retryRange), any()); - try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { assertNextKey(iter, "a"); assertNextKey(iter, "b"); assertNextKey(iter, "c"); @@ -123,15 +139,18 @@ public void shouldRetryAfterUnexpectedStreamCompletion() { @Test @SuppressWarnings("unchecked") public void shouldPropagateUnexpectedFailures() { - final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + final var range = new Range( + RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + RangeBound.unbounded() + ); Mockito.doAnswer(invocation -> { StreamObserver observer = invocation.getArgument(1, StreamObserver.class); observer.onNext(newKeyValueResult("a")); observer.onError(new StatusRuntimeException(Status.UNKNOWN)); return null; - }).when(requestProxy).send(eq(startBound), any()); + }).when(requestProxy).send(eq(range), any()); - try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { assertNextKey(iter, "a"); final var rs3Exception = assertThrows(RS3Exception.class, iter::next); assertThat(rs3Exception.getCause(), instanceOf(StatusRuntimeException.class)); @@ -148,5 +167,4 @@ private void assertNextKey(GrpcKeyValueIterator iter, String key) { assertThat(keyString, is(key)); } - } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java index ddf14fda1..a0779171a 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java @@ -9,6 +9,7 @@ import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.Put; +import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; @@ -120,8 +121,10 @@ public void shouldScanAllKeyValues() { LSS_ID, PSS_ID, Optional.of(5L), - RangeBound.unbounded(), - RangeBound.unbounded() + new Range( + RangeBound.unbounded(), + RangeBound.unbounded() + ) ); assertNext(iter, "a", "foo"); @@ -164,8 +167,10 @@ public void shouldScanKeyValuesInBoundedRange() { LSS_ID, PSS_ID, Optional.of(10L), - RangeBound.inclusive("b".getBytes(StandardCharsets.UTF_8)), - RangeBound.exclusive("e".getBytes(StandardCharsets.UTF_8)) + new Range( + RangeBound.inclusive("b".getBytes(StandardCharsets.UTF_8)), + RangeBound.exclusive("e".getBytes(StandardCharsets.UTF_8)) + ) ); assertNext(iter, "b", "bar"); @@ -200,8 +205,10 @@ public void shouldRetryRangeWithNetworkInterruption() { LSS_ID, PSS_ID, Optional.of(5L), - RangeBound.unbounded(), - RangeBound.unbounded() + new Range( + RangeBound.unbounded(), + RangeBound.unbounded() + ) ); assertNext(iter, "a", "foo"); @@ -311,18 +318,20 @@ public void get( } } - final var keyValueResult = Rs3.KeyValue - .newBuilder() - .setKey(req.getKey()); - final var key = Bytes.wrap(req.getKey().toByteArray()); - final var value = table.get(key); - if (value != null) { - keyValueResult.setValue(ByteString.copyFrom(value.get())); + final var key = req.getKey().getBasicKey(); + final var kvBuilder = Rs3.BasicKeyValue.newBuilder().setKey(key); + + final var keyBytes = Bytes.wrap(key.getKey().toByteArray()); + final var valueBytes = table.get(keyBytes); + if (valueBytes != null) { + final var value = Rs3.BasicValue.newBuilder() + .setValue(ByteString.copyFrom(valueBytes.get())); + kvBuilder.setValue(value); } final var result = Rs3.GetResult .newBuilder() - .setResult(keyValueResult) + .setResult(Rs3.KeyValue.newBuilder().setBasicKv(kvBuilder)) .build(); responseObserver.onNext(result); responseObserver.onCompleted(); @@ -357,14 +366,14 @@ public void range( continue; } - final var keyValue = Rs3.KeyValue - .newBuilder() - .setKey(ByteString.copyFrom(keyValueEntry.getKey().get())) - .setValue(ByteString.copyFrom(keyValueEntry.getValue().get())); + final var keyValue = GrpcRs3Util.basicKeyValueProto( + keyValueEntry.getKey().get(), + keyValueEntry.getValue().get() + ); final var keyValueResult = Rs3.RangeResult.newBuilder() .setType(Rs3.RangeResult.Type.RESULT) - .setResult(keyValue) + .setResult(Rs3.KeyValue.newBuilder().setBasicKv(keyValue)) .build(); responseObserver.onNext(keyValueResult); @@ -404,13 +413,14 @@ public void onNext(final Rs3.WriteWALSegmentRequest req) { TestRs3Service.this.offset.getAndUpdate( current -> Math.max(current, req.getEndOffset()) ); - final var put = req.getPut(); - final var keyBytes = Bytes.wrap(put.getKey().toByteArray()); - - if (put.hasValue()) { - final var valueBytes = Bytes.wrap(put.getValue().toByteArray()); + if (req.hasPut()) { + final var kv = req.getPut().getKv().getBasicKv(); + final var keyBytes = Bytes.wrap(kv.getKey().getKey().toByteArray()); + final var valueBytes = Bytes.wrap(kv.getValue().getValue().toByteArray()); table.put(keyBytes, valueBytes); - } else { + } else if (req.hasDelete()) { + final var key = req.getDelete().getKey().getBasicKey(); + final var keyBytes = Bytes.wrap(key.getKey().toByteArray()); table.remove(keyBytes); } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index aa1cc1c75..d72807793 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -12,6 +12,8 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.createStoreOptionsProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newEndOfStreamResult; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; @@ -31,7 +33,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; @@ -39,7 +40,7 @@ import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; -import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; @@ -49,6 +50,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -265,7 +267,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto(put1)) + .setPut(basicPutProto(put1)) .build() ); verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() @@ -274,7 +276,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto(put2)) + .setPut(basicPutProto(put2)) .build() ); verify(writeWALSegmentRequestObserver).onCompleted(); @@ -304,7 +306,7 @@ public void shouldWriteWalEntriesWithExpectedWrittenOffsetNone() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(GrpcRS3Client.WAL_OFFSET_NONE) .setEndOffset(20) - .setPut(putProto(put1)) + .setPut(basicPutProto(put1)) .build() ); } @@ -422,7 +424,7 @@ public void shouldWriteWalSegmentSync() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto((Put) entries.get(0))) + .setPut(basicPutProto((Put) entries.get(0))) .build() ); verify(writeWALSegmentRequestObserver).onNext( @@ -432,7 +434,7 @@ public void shouldWriteWalSegmentSync() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto((Put) entries.get(1))) + .setPut(basicPutProto((Put) entries.get(1))) .build() ); @@ -485,7 +487,7 @@ public void shouldRetryWriteWalSegmentSync() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto((Put) entries.get(0))) + .setPut(basicPutProto((Put) entries.get(0))) .build() ); verify(writeWALSegmentRequestObserver).onNext( @@ -495,7 +497,7 @@ public void shouldRetryWriteWalSegmentSync() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto((Put) entries.get(1))) + .setPut(basicPutProto((Put) entries.get(1))) .build() ); assertThat(result, is(Optional.of(flushedOffset))); @@ -546,7 +548,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInOnNext() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto((Put) entries.get(0))) + .setPut(basicPutProto((Put) entries.get(0))) .build() ); assertThat(result, is(Optional.of(flushedOffset))); @@ -596,7 +598,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInFinish() { .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) - .setPut(putProto((Put) entries.get(0))) + .setPut(basicPutProto((Put) entries.get(0))) .build() ); assertThat(result, is(Optional.of(flushedOffset))); @@ -639,12 +641,12 @@ public void shouldGetWithExpectedWrittenOffset() { // given: when(stub.get(any())).thenReturn( Rs3.GetResult.newBuilder() - .setResult(Rs3.KeyValue.newBuilder() - .setKey(ByteString.copyFromUtf8("foo")) - .setValue(ByteString.copyFromUtf8("bar")) - ) - .build() - ); + .setResult(Rs3.KeyValue.newBuilder().setBasicKv( + GrpcRs3Util.basicKeyValueProto( + "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8)) + )) + .build()); // when: final var result = client.get(STORE_ID, LSS_ID, PSS_ID, Optional.of(123L), "foo".getBytes()); @@ -656,7 +658,9 @@ public void shouldGetWithExpectedWrittenOffset() { .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(123L) - .setKey(ByteString.copyFromUtf8("foo")) + .setKey(Rs3.Key.newBuilder().setBasicKey( + GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8)) + )) .build() ); } @@ -666,9 +670,11 @@ public void shouldGet() { // given: when(stub.get(any())).thenReturn( Rs3.GetResult.newBuilder() - .setResult(Rs3.KeyValue.newBuilder() - .setKey(ByteString.copyFromUtf8("foo")) - .setValue(ByteString.copyFromUtf8("bar")) + .setResult(Rs3.KeyValue.newBuilder().setBasicKv( + GrpcRs3Util.basicKeyValueProto( + "foo".getBytes(StandardCharsets.UTF_8), + "bar".getBytes(StandardCharsets.UTF_8) + )) ) .build() ); @@ -682,7 +688,9 @@ public void shouldGet() { .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) .setStoreId(uuidToProto(STORE_ID)) - .setKey(ByteString.copyFromUtf8("foo")) + .setKey(Rs3.Key.newBuilder().setBasicKey( + GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8)) + )) .build() ); } @@ -772,8 +780,7 @@ public void shouldRetryRangeRequest() { LSS_ID, PSS_ID, Optional.of(123L), - RangeBound.unbounded(), - RangeBound.unbounded() + Range.unbounded() )) { assertThat(iter.hasNext(), is(false)); } @@ -799,8 +806,7 @@ public void shouldRetryAfterObserverOnError() { LSS_ID, PSS_ID, Optional.of(123L), - RangeBound.unbounded(), - RangeBound.unbounded() + Range.unbounded() )) { assertThat(iter.hasNext(), is(false)); } @@ -820,8 +826,7 @@ public void shouldTimeoutRangeRequest() { LSS_ID, PSS_ID, Optional.of(123L), - RangeBound.unbounded(), - RangeBound.unbounded() + Range.unbounded() )); var endTimeMs = time.milliseconds(); assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); @@ -838,8 +843,7 @@ public void shouldPropagateUnexpectedErrorInRangeRequest() { LSS_ID, PSS_ID, Optional.of(123L), - RangeBound.unbounded(), - RangeBound.unbounded() + Range.unbounded() )); assertThat(rs3Exception.getCause(), is(instanceOf(StatusRuntimeException.class))); @@ -858,15 +862,13 @@ public void shouldTimeoutAfterObserverOnError() { return null; }).when(asyncStub).range(any(), any()); - final var startTimeMs = time.milliseconds(); try ( final var iter = client.range( STORE_ID, LSS_ID, PSS_ID, Optional.of(123L), - RangeBound.unbounded(), - RangeBound.unbounded() + Range.unbounded() ) ) { assertThrows(RS3TimeoutException.class, iter::hasNext); @@ -974,20 +976,24 @@ public void shouldCreateStore() { ); final CreateStoreOptions options = new CreateStoreOptions( + logicalShards, + CreateStoreTypes.StoreType.BASIC, Optional.of(ClockType.STREAM_TIME), Optional.of(10_000L), Optional.empty() ); // when: - final var result = client.createStore(STORE_NAME, logicalShards, options); + final var result = client.createStore(STORE_NAME, options); // then: assertThat(result, equalTo(new CreateStoreTypes.CreateStoreResult(STORE_ID, pss_ids))); - verify(stub).createStore(Rs3.CreateStoreRequest.newBuilder() - .setStoreName(STORE_NAME) - .setLogicalShards(logicalShards) - .setOptions(options.toProto()).build()); + verify(stub).createStore( + Rs3.CreateStoreRequest.newBuilder() + .setStoreName(STORE_NAME) + .setOptions(createStoreOptionsProto(options)) + .build() + ); } @Test @@ -1004,21 +1010,29 @@ public void shouldRetryCreateStore() { .build() ); - final CreateStoreOptions options = new CreateStoreOptions( + final var slateDbOptions = new CreateStoreTypes.SlateDbStorageOptions( + Optional.of(20) + ); + + final var options = new CreateStoreOptions( + logicalShards, + CreateStoreTypes.StoreType.BASIC, Optional.empty(), Optional.of(10_000L), - Optional.of(20) + Optional.of(slateDbOptions) ); // when: - final var result = client.createStore(STORE_NAME, logicalShards, options); + final var result = client.createStore(STORE_NAME, options); // then: assertThat(result, equalTo(new CreateStoreTypes.CreateStoreResult(STORE_ID, pss_ids))); - verify(stub, times(2)).createStore(Rs3.CreateStoreRequest.newBuilder() - .setStoreName(STORE_NAME) - .setLogicalShards(logicalShards) - .setOptions(options.toProto()).build()); + verify(stub, times(2)).createStore( + Rs3.CreateStoreRequest.newBuilder() + .setStoreName(STORE_NAME) + .setOptions(createStoreOptionsProto(options)) + .build() + ); } @Test @@ -1028,16 +1042,22 @@ public void shouldPropagateUnexpectedExceptionsFromCreateStore() { .thenThrow(new StatusRuntimeException(Status.UNKNOWN)); final int logicalShards = 5; + final var slateDbOptions = new CreateStoreTypes.SlateDbStorageOptions( + Optional.of(20) + ); + final CreateStoreOptions options = new CreateStoreOptions( + logicalShards, + CreateStoreTypes.StoreType.BASIC, Optional.empty(), Optional.of(10_000L), - Optional.of(20) + Optional.of(slateDbOptions) ); // when: final RS3Exception exception = assertThrows( RS3Exception.class, - () -> client.createStore(STORE_NAME, logicalShards, options) + () -> client.createStore(STORE_NAME, options) ); // then: @@ -1053,16 +1073,22 @@ public void shouldTimeoutCreateStore() { .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); final int logicalShards = 5; + + final var slateDbOptions = new CreateStoreTypes.SlateDbStorageOptions( + Optional.of(20) + ); final CreateStoreOptions options = new CreateStoreOptions( + logicalShards, + CreateStoreTypes.StoreType.BASIC, Optional.empty(), Optional.of(10_000L), - Optional.of(20) + Optional.of(slateDbOptions) ); // when: assertThrows( RS3TimeoutException.class, - () -> client.createStore(STORE_NAME, logicalShards, options) + () -> client.createStore(STORE_NAME, options) ); // then: @@ -1084,8 +1110,7 @@ public void shouldPropagateUnexpectedExceptionFromObserverOnError() { LSS_ID, PSS_ID, Optional.of(123L), - RangeBound.unbounded(), - RangeBound.unbounded() + Range.unbounded() )) { final var rs3Exception = assertThrows(RS3Exception.class, iter::hasNext); assertThat(rs3Exception.getCause(), is(instanceOf(StatusRuntimeException.class))); @@ -1101,18 +1126,6 @@ private StreamObserver verifyWalSegmentResultObserver return writeWALSegmentResultObserverCaptor.getValue(); } - private Rs3.WriteWALSegmentRequest.Put putProto(final Put put) { - final var builder = Rs3.WriteWALSegmentRequest.Put.newBuilder() - .setKey(ByteString.copyFrom(put.key())); - if (put.value().isPresent()) { - builder.setValue(ByteString.copyFrom(put.value().get())); - builder.setTtl(Rs3.Ttl.newBuilder() - .setTtlType(Rs3.Ttl.TtlType.DEFAULT) - .build()); - } - return builder.build(); - } - public static class TestException extends RuntimeException { private static final long serialVersionUID = 0L; } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java index 3d8730ffa..11ca07b11 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java @@ -12,21 +12,21 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; -import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.rs3.Rs3; +import java.nio.charset.StandardCharsets; public class GrpsRs3TestUtil { public static Rs3.RangeResult newKeyValueResult(String key) { - final var keyValue = Rs3.KeyValue.newBuilder() - .setKey(ByteString.copyFromUtf8(key)) - .setValue(ByteString.copyFromUtf8("dummy")) - .build(); + final var keyValue = GrpcRs3Util.basicKeyValueProto( + key.getBytes(StandardCharsets.UTF_8), + "dummy".getBytes(StandardCharsets.UTF_8) + ); return Rs3.RangeResult.newBuilder() .setType(Rs3.RangeResult.Type.RESULT) - .setResult(keyValue) + .setResult(Rs3.KeyValue.newBuilder().setBasicKv(keyValue)) .build(); } @@ -38,17 +38,18 @@ public static Rs3.RangeResult newEndOfStreamResult() { } public static Range newRangeFromProto(Rs3.RangeRequest req) { - final var startBound = newRangeBoundFromProto(req.getFrom()); - final var endBound = newRangeBoundFromProto(req.getTo()); + final var range = req.getRange().getBasicRange(); + final var startBound = newRangeBoundFromProto(range.getFrom()); + final var endBound = newRangeBoundFromProto(range.getTo()); return new Range(startBound, endBound); } - private static RangeBound newRangeBoundFromProto(Rs3.Bound bound) { + private static RangeBound newRangeBoundFromProto(Rs3.BasicBound bound) { switch (bound.getType()) { case EXCLUSIVE: - return RangeBound.exclusive(bound.getKey().toByteArray()); + return RangeBound.exclusive(bound.getKey().getKey().toByteArray()); case INCLUSIVE: - return RangeBound.inclusive(bound.getKey().toByteArray()); + return RangeBound.inclusive(bound.getKey().getKey().toByteArray()); case UNBOUNDED: return RangeBound.unbounded(); default: