diff --git a/kafka-client/.jqwik-database b/kafka-client/.jqwik-database new file mode 100644 index 000000000..711006c3d Binary files /dev/null and b/kafka-client/.jqwik-database differ diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 124d550e9..148cc5af8 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -154,4 +154,5 @@ dependencies { testImplementation("io.grpc:grpc-inprocess:${libs.versions.grpc.orNull}") testImplementation("software.amazon.awssdk:kms:2.20.0") testImplementation("software.amazon.awssdk:sso:2.20.0") + testImplementation("net.jqwik:jqwik:1.9.2") } diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 92841abf7..da6498b98 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 92841abf7f807ad22cd8336ec36879049ac170a0 +Subproject commit da6498b9847edec32c0890fdbb0db9ed709abc81 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowBytesStoreSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowBytesStoreSupplier.java index 0335092fc..2004a56f9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowBytesStoreSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowBytesStoreSupplier.java @@ -27,7 +27,10 @@ public class ResponsiveWindowBytesStoreSupplier implements WindowBytesStoreSuppl public ResponsiveWindowBytesStoreSupplier(final ResponsiveWindowParams params) { this.params = params; - this.segmentIntervalMs = computeSegmentInterval(params.retentionPeriod(), params.numSegments()); + this.segmentIntervalMs = computeSegmentInterval( + params.retentionPeriodMs(), + params.numSegments() + ); } @Override @@ -57,7 +60,7 @@ public boolean retainDuplicates() { @Override public long retentionPeriod() { - return params.retentionPeriod(); + return params.retentionPeriodMs(); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowParams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowParams.java index 825df67c5..d009ba3ca 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowParams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveWindowParams.java @@ -106,7 +106,7 @@ public long windowSize() { return windowSizeMs; } - public long retentionPeriod() { + public long retentionPeriodMs() { return retentionPeriodMs; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/PssTablePartitioner.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/PssTablePartitioner.java index 241d4e492..2ce1cc619 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/PssTablePartitioner.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/PssTablePartitioner.java @@ -15,18 +15,20 @@ import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; import dev.responsive.kafka.internal.db.rs3.client.LssId; import java.util.Objects; -import org.apache.kafka.common.utils.Bytes; -public class PssTablePartitioner implements TablePartitioner { +public abstract class PssTablePartitioner implements TablePartitioner { private final PssPartitioner pssPartitioner; public PssTablePartitioner(final PssPartitioner pssPartitioner) { this.pssPartitioner = Objects.requireNonNull(pssPartitioner); } + public abstract byte[] serialize(K key); + @Override - public Integer tablePartition(int kafkaPartition, Bytes key) { - return pssPartitioner.pss(key.get(), new LssId(kafkaPartition)); + public Integer tablePartition(int kafkaPartition, K key) { + final var serializedKey = serialize(key); + return pssPartitioner.pss(serializedKey, new LssId(kafkaPartition)); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVFlushManager.java index aea6a20e6..c504d397e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVFlushManager.java @@ -17,33 +17,23 @@ import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; -import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; -import dev.responsive.kafka.internal.db.rs3.client.StreamSender; -import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.kafka.internal.stores.RemoteWriteResult; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.function.Consumer; import org.apache.kafka.common.utils.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class RS3KVFlushManager extends KVFlushManager { - private static final Logger LOG = LoggerFactory.getLogger(RS3KVFlushManager.class); - private final UUID storeId; private final RS3Client rs3Client; private final LssId lssId; private final RS3KVTable table; - private final HashMap> writtenOffsets; + private final Map> writtenOffsets; private final int kafkaPartition; private final PssPartitioner pssPartitioner; private final HashMap writers = new HashMap<>(); @@ -53,7 +43,7 @@ public RS3KVFlushManager( final RS3Client rs3Client, final LssId lssId, final RS3KVTable table, - final HashMap> writtenOffsets, + final Map> writtenOffsets, final int kafkaPartition, final PssPartitioner pssPartitioner ) { @@ -73,7 +63,12 @@ public String tableName() { @Override public TablePartitioner partitioner() { - return new PssTablePartitioner(pssPartitioner); + return new PssTablePartitioner<>(pssPartitioner) { + @Override + public byte[] serialize(final Bytes key) { + return key.get(); + } + }; } @Override @@ -132,8 +127,10 @@ public RemoteWriteResult updateOffset(final long consumedOffset) { @Override public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) { - // TODO: fill me in with info about last written offsets - return ""; + return String.format(">", + failedTablePartition, batchOffset, table.lastWrittenOffset(kafkaPartition), + storeId, lssId.id()); } @Override @@ -162,136 +159,43 @@ public CompletionStage> flush() { } } - private static final class RS3StreamFactory { - private final UUID storeId; - private final RS3Client rs3Client; - private final int pssId; - private final LssId lssId; - private final long endOffset; - private final Optional expectedWrittenOffset; - - private RS3StreamFactory( - final UUID storeId, - final RS3Client rs3Client, - final int pssId, - final LssId lssId, - final long endOffset, - final Optional expectedWrittenOffset - ) { - this.storeId = storeId; - this.rs3Client = rs3Client; - this.pssId = pssId; - this.lssId = lssId; - this.endOffset = endOffset; - this.expectedWrittenOffset = expectedWrittenOffset; - } - - StreamSenderMessageReceiver> writeWalSegmentAsync() { - return rs3Client.writeWalSegmentAsync( - storeId, - lssId, - pssId, - expectedWrittenOffset, - endOffset - ); - } - - Optional writeWalSegmentSync(List entries) { - return rs3Client.writeWalSegment( - storeId, - lssId, - pssId, - expectedWrittenOffset, - endOffset, - entries - ); - } - - } - - private static final class RS3KVWriter implements RemoteWriter { - private final RS3StreamFactory streamFactory; + private static final class RS3KVWriter extends RS3Writer { private final RS3KVTable table; - private final int kafkaPartition; - private final List retryBuffer = new ArrayList<>(); - private final StreamSenderMessageReceiver> sendRecv; private RS3KVWriter( final UUID storeId, final RS3Client rs3Client, - final RS3KVTable table, + final RS3KVTable rs3Table, final int pssId, final LssId lssId, final long endOffset, final Optional expectedWrittenOffset, final int kafkaPartition ) { - this.table = Objects.requireNonNull(table); - this.streamFactory = new RS3StreamFactory( - storeId, - rs3Client, - pssId, - lssId, - endOffset, - expectedWrittenOffset - ); - this.kafkaPartition = kafkaPartition; - this.sendRecv = streamFactory.writeWalSegmentAsync(); - } - - long endOffset() { - return streamFactory.endOffset; + super(storeId, rs3Client, pssId, lssId, endOffset, expectedWrittenOffset, kafkaPartition); + this.table = rs3Table; } @Override - public void insert(final Bytes key, final byte[] value, final long timestampMs) { - maybeSendNext(table.insert(kafkaPartition, key, value, timestampMs)); - } - - @Override - public void delete(final Bytes key) { - maybeSendNext(table.delete(kafkaPartition, key)); - } - - private void maybeSendNext(WalEntry entry) { - retryBuffer.add(entry); - ifActiveStream(sender -> sender.sendNext(entry)); - } - - private void ifActiveStream(Consumer> streamConsumer) { - if (sendRecv.isActive()) { - try { - streamConsumer.accept(sendRecv.sender()); - } catch (final RS3TransientException e) { - // Retry the stream in flush() - } - } + protected WalEntry createInsert( + final Bytes key, + final byte[] value, + final long timestampMs + ) { + return table.insert( + kafkaPartition(), + key, + value, + timestampMs + ); } @Override - public CompletionStage> flush() { - ifActiveStream(StreamSender::finish); - - return sendRecv.completion().handle((result, throwable) -> { - Optional flushedOffset = result; - - var cause = throwable; - if (throwable instanceof CompletionException) { - cause = throwable.getCause(); - } - - if (cause instanceof RS3TransientException) { - flushedOffset = streamFactory.writeWalSegmentSync(retryBuffer); - } else if (cause instanceof RuntimeException) { - throw (RuntimeException) throwable; - } else if (cause != null) { - throw new RuntimeException(throwable); - } - - LOG.debug("last flushed offset for pss/lss {}/{} is {}", - streamFactory.pssId, streamFactory.lssId, flushedOffset); - return RemoteWriteResult.success(kafkaPartition); - }); + protected WalEntry createDelete(final Bytes key) { + return table.delete( + kafkaPartition(), + key + ); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java index cc2dcd429..b71635a7f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java @@ -14,10 +14,13 @@ import dev.responsive.kafka.internal.db.KVFlushManager; import dev.responsive.kafka.internal.db.RemoteKVTable; +import dev.responsive.kafka.internal.db.rs3.client.Delete; import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.LssMetadata; import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil; import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; @@ -25,12 +28,9 @@ import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration; import dev.responsive.kafka.internal.utils.MergeKeyValueIterator; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.KeyValueIterator; @@ -43,6 +43,7 @@ public class RS3KVTable implements RemoteKVTable { private final String name; private final UUID storeId; private final RS3Client rs3Client; + private final RS3ClientUtil rs3ClientUtil; private final PssPartitioner pssPartitioner; private LssId lssId; private Long fetchOffset = ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; @@ -63,6 +64,7 @@ public RS3KVTable( Objects.requireNonNull(responsiveMetrics), Objects.requireNonNull(scopeBuilder) ); + this.rs3ClientUtil = new RS3ClientUtil(storeId, rs3Client, pssPartitioner); this.pssPartitioner = Objects.requireNonNull(pssPartitioner); } @@ -79,40 +81,14 @@ public KVFlushManager init(final int kafkaPartition) { this.lssId = new LssId(kafkaPartition); - // TODO: we should write an empty segment periodically to any PSS that we haven't - // written to to bump the written offset - final HashMap> lastWrittenOffset = new HashMap<>(); - for (final int pss : pssPartitioner.pssForLss(this.lssId)) { - final var offsets = rs3Client.getCurrentOffsets(storeId, lssId, pss); - lastWrittenOffset.put(pss, offsets.writtenOffset()); - } - final var fetchOffsetOrMinusOne = lastWrittenOffset.values().stream() - .map(v -> v.orElse(-1L)) - .min(Long::compare) - .orElse(-1L); - if (fetchOffsetOrMinusOne == -1) { - this.fetchOffset = ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; - } else { - this.fetchOffset = fetchOffsetOrMinusOne; - } - - final var writtenOffsetsStr = lastWrittenOffset.entrySet().stream() - .map(e -> String.format("%s -> %s", - e.getKey(), - e.getValue().map(Object::toString).orElse("none"))) - .collect(Collectors.joining(",")); - LOG.info("restore rs3 kv table from offset {} for {}. recorded written offsets: {}", - fetchOffset, - kafkaPartition, - writtenOffsetsStr - ); - - flushManager = new RS3KVFlushManager( + LssMetadata lssMetadata = rs3ClientUtil.fetchLssMetadata(lssId); + this.fetchOffset = lssMetadata.lastWrittenOffset(); + this.flushManager = new RS3KVFlushManager( storeId, rs3Client, lssId, this, - lastWrittenOffset, + lssMetadata.writtenOffsets(), kafkaPartition, pssPartitioner ); @@ -127,7 +103,7 @@ public byte[] get(final int kafkaPartition, final Bytes key, final long minValid lssId, pssId, flushManager.writtenOffset(pssId), - key.get() + key ).orElse(null); } @@ -138,7 +114,7 @@ public KeyValueIterator range( final Bytes to, final long streamTimeMs ) { - final var range = new Range(RangeBound.inclusive(from.get()), RangeBound.exclusive(to.get())); + final var range = new Range<>(RangeBound.inclusive(from), RangeBound.exclusive(to)); final List> pssIters = new ArrayList<>(); for (int pssId : pssPartitioner.pssForLss(this.lssId)) { @@ -179,7 +155,7 @@ public String name() { return name; } - public UUID storedId() { + public UUID storeId() { return storeId; } @@ -198,10 +174,7 @@ public WalEntry insert( @Override public WalEntry delete(final int kafkaPartition, final Bytes key) { - return new Put( - key.get(), - null - ); + return new Delete(key.get()); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java index bfc927387..6bc592782 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java @@ -13,6 +13,7 @@ package dev.responsive.kafka.internal.db.rs3; import dev.responsive.kafka.internal.db.RemoteKVTable; +import dev.responsive.kafka.internal.db.RemoteWindowTable; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; @@ -21,6 +22,7 @@ import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.stores.TtlResolver; +import java.time.Duration; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -47,10 +49,24 @@ public RemoteKVTable kvTable( final ResponsiveMetrics.MetricScopeBuilder scopeBuilder, final Supplier computeNumKafkaPartitions ) { + + final Optional defaultTtl = ttlResolver.isPresent() + && ttlResolver.get().defaultTtl().isFinite() + ? Optional.of(ttlResolver.get().defaultTtl().duration()) + : Optional.empty(); + final Optional clockType = ttlResolver.isPresent() + ? Optional.of(ClockType.WALL_CLOCK) + : Optional.empty(); + final var rs3Client = connector.connect(); final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore( - storeName, ttlResolver, computeNumKafkaPartitions.get(), rs3Client + storeName, + CreateStoreTypes.StoreType.BASIC, + clockType, + defaultTtl, + computeNumKafkaPartitions.get(), + rs3Client )); final PssPartitioner pssPartitioner = new PssDirectPartitioner(); @@ -64,23 +80,47 @@ public RemoteKVTable kvTable( ); } + public RemoteWindowTable windowTable( + final String storeName, + final Duration defaultTtl, + final ResponsiveMetrics responsiveMetrics, + final ResponsiveMetrics.MetricScopeBuilder scopeBuilder, + final Supplier computeNumKafkaPartitions + ) { + final var rs3Client = connector.connect(); + final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore( + storeName, + CreateStoreTypes.StoreType.WINDOW, + Optional.of(ClockType.WALL_CLOCK), + Optional.of(defaultTtl), + computeNumKafkaPartitions.get(), + rs3Client + )); + + final var pssPartitioner = new PssDirectPartitioner(); + return new RS3WindowTable( + storeName, + storeId, + rs3Client, + pssPartitioner, + responsiveMetrics, + scopeBuilder + ); + } + public static UUID createStore( final String storeName, - final Optional> ttlResolver, + final CreateStoreTypes.StoreType storeType, + final Optional clockType, + final Optional defaultTtl, final int numKafkaPartitions, final RS3Client rs3Client ) { - - final Optional defaultTtl = - ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite() - ? Optional.of(ttlResolver.get().defaultTtl().duration().toMillis()) - : Optional.empty(); - final var options = new CreateStoreOptions( numKafkaPartitions, - CreateStoreTypes.StoreType.BASIC, - ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(), - defaultTtl, + storeType, + clockType, + defaultTtl.map(Duration::toMillis), Optional.empty() ); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowFlushManager.java new file mode 100644 index 000000000..14d1e2b95 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowFlushManager.java @@ -0,0 +1,177 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3; + +import dev.responsive.kafka.internal.db.RemoteWriter; +import dev.responsive.kafka.internal.db.WindowFlushManager; +import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.stores.RemoteWriteResult; +import dev.responsive.kafka.internal.utils.WindowedKey; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +class RS3WindowFlushManager implements WindowFlushManager { + private final UUID storeId; + private final RS3Client rs3Client; + private final LssId lssId; + private final RS3WindowTable table; + private final int kafkaPartition; + private final PssPartitioner pssPartitioner; + private final Map> writtenOffsets; + + private long streamTime; + private final Map writers = new HashMap<>(); + + public RS3WindowFlushManager( + final UUID storeId, + final RS3Client rs3Client, + final LssId lssId, + final RS3WindowTable table, + final int kafkaPartition, + final PssPartitioner pssPartitioner, + final Map> writtenOffsets, + final long initialStreamTime + ) { + this.storeId = storeId; + this.rs3Client = rs3Client; + this.lssId = lssId; + this.table = table; + this.kafkaPartition = kafkaPartition; + this.pssPartitioner = pssPartitioner; + this.writtenOffsets = writtenOffsets; + this.streamTime = initialStreamTime; + } + + @Override + public String tableName() { + return table.name(); + } + + @Override + public TablePartitioner partitioner() { + return new PssTablePartitioner<>(pssPartitioner) { + @Override + public byte[] serialize(final WindowedKey key) { + return key.key.get(); + } + }; + } + + @Override + public RemoteWriter createWriter( + final Integer pssId, + final long consumedOffset + ) { + final var writer = new Rs3WindowWriter( + storeId, + rs3Client, + table, + pssId, + lssId, + consumedOffset, + writtenOffsets.get(pssId), + kafkaPartition + ); + writers.put(pssId, writer); + return writer; + } + + @Override + public void writeAdded(final WindowedKey key) { + streamTime = Long.max(streamTime, key.windowStartMs); + } + + @Override + public RemoteWriteResult preFlush() { + return RemoteWriteResult.success(null); + } + + @Override + public RemoteWriteResult postFlush(final long consumedOffset) { + for (final var entry : writers.entrySet()) { + writtenOffsets.put(entry.getKey(), Optional.of(entry.getValue().endOffset())); + } + writers.clear(); + return RemoteWriteResult.success(null); + } + + @Override + public String failedFlushInfo( + final long batchOffset, + final Integer failedTablePartition + ) { + return String.format(">", + failedTablePartition, batchOffset, table.lastWrittenOffset(kafkaPartition), + storeId, lssId.id()); + } + + @Override + public String logPrefix() { + return tableName() + ".rs3.flushmanager"; + } + + @Override + public long streamTime() { + return streamTime; + } + + public Optional writtenOffset(final int pssId) { + return writtenOffsets.get(pssId); + } + + private static class Rs3WindowWriter extends RS3Writer { + private final RS3WindowTable table; + + protected Rs3WindowWriter( + final UUID storeId, + final RS3Client rs3Client, + final RS3WindowTable table, + final int pssId, + final LssId lssId, + final long endOffset, + final Optional expectedWrittenOffset, + final int kafkaPartition + ) { + super(storeId, rs3Client, pssId, lssId, endOffset, expectedWrittenOffset, kafkaPartition); + this.table = table; + } + + @Override + protected WalEntry createInsert( + final WindowedKey key, + final byte[] value, + final long timestampMs + ) { + return table.insert( + kafkaPartition(), + key, + value, + timestampMs + ); + } + + @Override + protected WalEntry createDelete(final WindowedKey key) { + return table.delete( + kafkaPartition(), + key + ); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTable.java new file mode 100644 index 000000000..6bfc822d4 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTable.java @@ -0,0 +1,350 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3; + +import dev.responsive.kafka.internal.db.RemoteWindowTable; +import dev.responsive.kafka.internal.db.WindowFlushManager; +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.LssMetadata; +import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client; +import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil; +import dev.responsive.kafka.internal.db.rs3.client.Range; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.db.rs3.client.WindowedDelete; +import dev.responsive.kafka.internal.db.rs3.client.WindowedPut; +import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; +import dev.responsive.kafka.internal.utils.MergeKeyValueIterator; +import dev.responsive.kafka.internal.utils.WindowedKey; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RS3WindowTable implements RemoteWindowTable { + private static final Logger LOG = LoggerFactory.getLogger(RS3WindowTable.class); + + private final String name; + private final UUID storeId; + private final RS3Client rs3Client; + private final RS3ClientUtil rs3ClientUtil; + private final PssPartitioner pssPartitioner; + + // Initialized in `init()` + private LssId lssId; + private Long fetchOffset; + private RS3WindowFlushManager flushManager; + + public RS3WindowTable( + final String name, + final UUID storeId, + final RS3Client rs3Client, + final PssPartitioner pssPartitioner, + final ResponsiveMetrics responsiveMetrics, + final ResponsiveMetrics.MetricScopeBuilder scopeBuilder + ) { + this( + name, + storeId, + new MeteredRS3Client( + Objects.requireNonNull(rs3Client), + Objects.requireNonNull(responsiveMetrics), + Objects.requireNonNull(scopeBuilder) + ), + pssPartitioner + ); + } + + // Visible for testing + RS3WindowTable( + final String name, + final UUID storeId, + final RS3Client rs3Client, + final PssPartitioner pssPartitioner + ) { + this.name = Objects.requireNonNull(name); + this.storeId = Objects.requireNonNull(storeId); + this.rs3Client = Objects.requireNonNull(rs3Client); + this.rs3ClientUtil = new RS3ClientUtil(storeId, rs3Client, pssPartitioner); + this.pssPartitioner = Objects.requireNonNull(pssPartitioner); + } + + public UUID storeId() { + return storeId; + } + + @Override + public WindowFlushManager init(final int kafkaPartition) { + if (flushManager != null) { + LOG.error("already initialized for store {}:{}", name, kafkaPartition); + throw new IllegalStateException(String.format( + "already initialized for store %s:%d", + name, + kafkaPartition + )); + } + + this.lssId = new LssId(kafkaPartition); + LssMetadata lssMetadata = rs3ClientUtil.fetchLssMetadata(lssId); + this.fetchOffset = lssMetadata.lastWrittenOffset(); + + final var initialStreamTime = -1L; // TODO: Initialize from RS3 metadata? + this.flushManager = new RS3WindowFlushManager( + storeId, + rs3Client, + lssId, + this, + kafkaPartition, + pssPartitioner, + lssMetadata.writtenOffsets(), + initialStreamTime + ); + return flushManager; + } + + private void throwIfPartitionNotInitialized(final int kafkaPartition) { + if (flushManager == null) { + throw new IllegalStateException(String.format( + "Cannot complete operation on store %s is not yet initialized", + name + )); + } else if (lssId.id() != kafkaPartition) { + throw new IllegalStateException(String.format( + "Cannot complete operation on store %s for kafka partition %d since the store " + + "was initialized for a separate partition %d", + name, + kafkaPartition, + lssId.id() + )); + } + } + + @Override + public byte[] fetch( + final int kafkaPartition, + final Bytes key, + final long windowStart + ) { + throwIfPartitionNotInitialized(kafkaPartition); + final int pssId = pssPartitioner.pss(key.get(), this.lssId); + final var windowKey = new WindowedKey(key, windowStart); + return rs3Client.windowedGet( + storeId, + lssId, + pssId, + flushManager.writtenOffset(pssId), + windowKey + ).orElse(null); + } + + @Override + public KeyValueIterator fetch( + final int kafkaPartition, + final Bytes key, + final long timeFrom, + final long timeTo + ) { + throwIfPartitionNotInitialized(kafkaPartition); + final int pssId = pssPartitioner.pss(key.get(), this.lssId); + final var windowRange = new Range<>( + RangeBound.inclusive(new WindowedKey(key, timeFrom)), + RangeBound.exclusive(new WindowedKey(key, timeTo)) + ); + return rs3Client.windowedRange( + storeId, + lssId, + pssId, + flushManager.writtenOffset(pssId), + windowRange + ); + } + + @Override + public KeyValueIterator backFetch( + final int kafkaPartition, + final Bytes key, + final long timeFrom, + final long timeTo + ) { + throwIfPartitionNotInitialized(kafkaPartition); + throw new UnsupportedOperationException(); + } + + @Override + public KeyValueIterator fetchRange( + final int kafkaPartition, + final Bytes fromKey, + final Bytes toKey, + final long timeFrom, + final long timeTo + ) { + throwIfPartitionNotInitialized(kafkaPartition); + final List> pssIters = new ArrayList<>(); + final var windowRange = new Range<>( + RangeBound.inclusive(new WindowedKey(fromKey, timeFrom)), + RangeBound.exclusive(new WindowedKey(toKey, timeTo)) + ); + + for (int pssId : pssPartitioner.pssForLss(this.lssId)) { + pssIters.add(rs3Client.windowedRange( + storeId, + lssId, + pssId, + flushManager.writtenOffset(pssId), + windowRange + )); + } + return new MergeKeyValueIterator<>(pssIters); + } + + @Override + public KeyValueIterator backFetchRange( + final int kafkaPartition, + final Bytes fromKey, + final Bytes toKey, + final long timeFrom, + final long timeTo + ) { + throwIfPartitionNotInitialized(kafkaPartition); + throw new UnsupportedOperationException(); + } + + @Override + public KeyValueIterator fetchAll( + final int kafkaPartition, + final long timeFrom, + final long timeTo + ) { + throwIfPartitionNotInitialized(kafkaPartition); + final List> pssIters = new ArrayList<>(); + + // TODO: the types break down here. We want to tell the server that we are + // interested in all keys within a given time range, but the schema does + // not support a partially specified bound. For now, we fetch everything and + // filter here. + final var timeRange = new Range<>( + RangeBound.inclusive(timeFrom), + RangeBound.exclusive(timeTo) + ); + + for (int pssId : pssPartitioner.pssForLss(this.lssId)) { + final var rangeIter = rs3Client.windowedRange( + storeId, + lssId, + pssId, + flushManager.writtenOffset(pssId), + Range.unbounded() + ); + pssIters.add(new TimeRangeFilter(timeRange, rangeIter)); + } + return new MergeKeyValueIterator<>(pssIters); + } + + @Override + public KeyValueIterator backFetchAll( + final int kafkaPartition, + final long timeFrom, + final long timeTo + ) { + throwIfPartitionNotInitialized(kafkaPartition); + throw new UnsupportedOperationException(); + } + + @Override + public String name() { + return name; + } + + @Override + public WalEntry insert( + final int kafkaPartition, + final WindowedKey key, + final byte[] value, + final long timestampMs + ) { + return new WindowedPut( + key.key.get(), + value, + timestampMs, + key.windowStartMs + ); + } + + @Override + public WalEntry delete(final int kafkaPartition, final WindowedKey key) { + return new WindowedDelete( + key.key.get(), + key.windowStartMs + ); + } + + @Override + public long lastWrittenOffset(final int kafkaPartition) { + return fetchOffset; + } + + private static class TimeRangeFilter implements KeyValueIterator { + private final Range timeRange; + private final KeyValueIterator delegate; + + private TimeRangeFilter( + final Range timeRange, + final KeyValueIterator delegate + ) { + this.timeRange = timeRange; + this.delegate = delegate; + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public WindowedKey peekNextKey() { + skipToNextKeyInRange(); + return delegate.peekNextKey(); + } + + private void skipToNextKeyInRange() { + while (true) { + final var nextKey = delegate.peekNextKey(); + if (nextKey == null) { + break; + } else if (timeRange.contains(nextKey.windowStartMs)) { + return; + } else { + delegate.next(); + } + } + } + + @Override + public boolean hasNext() { + return peekNextKey() != null; + } + + @Override + public KeyValue next() { + skipToNextKeyInRange(); + return delegate.next(); + } + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3Writer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3Writer.java new file mode 100644 index 000000000..ddc215ed2 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3Writer.java @@ -0,0 +1,144 @@ +package dev.responsive.kafka.internal.db.rs3; + +import dev.responsive.kafka.internal.db.RemoteWriter; +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.kafka.internal.db.rs3.client.StreamSender; +import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; +import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.stores.RemoteWriteResult; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class RS3Writer implements RemoteWriter { + private static final Logger LOG = LoggerFactory.getLogger(RS3Writer.class); + + private final UUID storeId; + private final RS3Client rs3Client; + private final int pssId; + private final LssId lssId; + private final long endOffset; + private final Optional expectedWrittenOffset; + private final int kafkaPartition; + private final List retryBuffer = new ArrayList<>(); + private final StreamSenderMessageReceiver> sendRecv; + + protected RS3Writer( + final UUID storeId, + final RS3Client rs3Client, + final int pssId, + final LssId lssId, + final long endOffset, + final Optional expectedWrittenOffset, + final int kafkaPartition + ) { + this.storeId = storeId; + this.rs3Client = rs3Client; + this.pssId = pssId; + this.lssId = lssId; + this.endOffset = endOffset; + this.expectedWrittenOffset = expectedWrittenOffset; + this.kafkaPartition = kafkaPartition; + this.sendRecv = writeWalSegmentAsync(); + } + + protected abstract WalEntry createInsert( + K key, + byte[] value, + long timestampMs + ); + + protected abstract WalEntry createDelete( + K key + ); + + public long endOffset() { + return endOffset; + } + + public int kafkaPartition() { + return kafkaPartition; + } + + @Override + public void insert(final K key, final byte[] value, final long timestampMs) { + final var insertEntry = createInsert(key, value, timestampMs); + maybeSendNext(insertEntry); + } + + @Override + public void delete(final K key) { + final var deleteEntry = createDelete(key); + maybeSendNext(deleteEntry); + } + + private void maybeSendNext(WalEntry entry) { + retryBuffer.add(entry); + ifActiveStream(sender -> sender.sendNext(entry)); + } + + private void ifActiveStream(Consumer> streamConsumer) { + if (sendRecv.isActive()) { + try { + streamConsumer.accept(sendRecv.sender()); + } catch (final RS3TransientException e) { + // Retry the stream in flush() + } + } + } + + @Override + public CompletionStage> flush() { + ifActiveStream(StreamSender::finish); + + return sendRecv.completion().handle((result, throwable) -> { + Optional flushedOffset = result; + + var cause = throwable; + if (throwable instanceof CompletionException) { + cause = throwable.getCause(); + } + + if (cause instanceof RS3TransientException) { + flushedOffset = writeWalSegmentSync(retryBuffer); + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) throwable; + } else if (cause != null) { + throw new RuntimeException(throwable); + } + + LOG.debug("last flushed offset for pss/lss {}/{} is {}", pssId, lssId, flushedOffset); + return RemoteWriteResult.success(pssId); + }); + } + + StreamSenderMessageReceiver> writeWalSegmentAsync() { + return rs3Client.writeWalSegmentAsync( + storeId, + lssId, + pssId, + expectedWrittenOffset, + endOffset + ); + } + + Optional writeWalSegmentSync(List entries) { + return rs3Client.writeWalSegment( + storeId, + lssId, + pssId, + expectedWrittenOffset, + endOffset, + entries + ); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Delete.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Delete.java index eb9851866..10500e936 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Delete.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Delete.java @@ -1,3 +1,15 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + package dev.responsive.kafka.internal.db.rs3.client; import java.util.Arrays; @@ -14,6 +26,11 @@ public byte[] key() { return key; } + @Override + public void visit(final Visitor visitor) { + visitor.visit(this); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssMetadata.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssMetadata.java new file mode 100644 index 000000000..c1d7c8145 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssMetadata.java @@ -0,0 +1,36 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.Map; +import java.util.Optional; + +public class LssMetadata { + private final long lastWrittenOffset; + private final Map> writtenOffsets; + + public LssMetadata(long lastWrittenOffset, Map> writtenOffsets) { + this.lastWrittenOffset = lastWrittenOffset; + this.writtenOffsets = writtenOffsets; + } + + /** + * Get the last written offset for the LSS. + * + * @return The last written offset or + * {@link dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration#NO_COMMITTED_OFFSET} + * if there is none + */ + public long lastWrittenOffset() { + return lastWrittenOffset; + } + + /** + * Get the last written offset for each PSS mapped to the LSS. It may return + * `Optional.empty()` if the PSS has no written offsets yet. + * + * @return a map of the last written offsets for each PSS ID + */ + public Map> writtenOffsets() { + return writtenOffsets; + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 09a6e75a0..5452eb4a3 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -3,6 +3,7 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; +import dev.responsive.kafka.internal.utils.WindowedKey; import java.time.Duration; import java.time.Instant; import java.util.List; @@ -86,10 +87,16 @@ public Optional get( final LssId lssId, final int pssId, final Optional expectedWrittenOffset, - final byte[] key + final Bytes key ) { final Instant start = Instant.now(); - final Optional result = delegate.get(storeId, lssId, pssId, expectedWrittenOffset, key); + final Optional result = delegate.get( + storeId, + lssId, + pssId, + expectedWrittenOffset, + key + ); getSensor.record(Duration.between(start, Instant.now()).toNanos()); return result; } @@ -100,7 +107,7 @@ public KeyValueIterator range( final LssId lssId, final int pssId, final Optional expectedWrittenOffset, - final Range range + final Range range ) { return delegate.range( storeId, @@ -111,6 +118,43 @@ public KeyValueIterator range( ); } + @Override + public Optional windowedGet( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset, + final WindowedKey key + ) { + final Instant start = Instant.now(); + final Optional result = delegate.windowedGet( + storeId, + lssId, + pssId, + expectedWrittenOffset, + key + ); + getSensor.record(Duration.between(start, Instant.now()).toNanos()); + return result; + } + + @Override + public KeyValueIterator windowedRange( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset, + final Range range + ) { + return delegate.windowedRange( + storeId, + lssId, + pssId, + expectedWrittenOffset, + range + ); + } + @Override public List listStores() { return delegate.listStores(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java index b1adf95c2..a99608d3b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Put.java @@ -32,6 +32,11 @@ public byte[] value() { return value; } + @Override + public void visit(final Visitor visitor) { + visitor.visit(this); + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -51,4 +56,5 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(Arrays.hashCode(key), Arrays.hashCode(value)); } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index a9b7e7614..4ca0eedca 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -14,6 +14,7 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; +import dev.responsive.kafka.internal.utils.WindowedKey; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -45,7 +46,15 @@ Optional get( LssId lssId, int pssId, Optional expectedWrittenOffset, - byte[] key + Bytes key + ); + + Optional windowedGet( + UUID storeId, + LssId lssId, + int pssId, + Optional expectedWrittenOffset, + WindowedKey key ); KeyValueIterator range( @@ -53,7 +62,15 @@ KeyValueIterator range( LssId lssId, int pssId, Optional expectedWrittenOffset, - Range range + Range range + ); + + KeyValueIterator windowedRange( + UUID storeId, + LssId lssId, + int pssId, + Optional expectedWrittenOffset, + Range range ); List listStores(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3ClientUtil.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3ClientUtil.java new file mode 100644 index 000000000..f6862439a --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3ClientUtil.java @@ -0,0 +1,74 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client; + +import dev.responsive.kafka.internal.db.rs3.PssPartitioner; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration; +import java.util.HashMap; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RS3ClientUtil { + private static final Logger LOG = LoggerFactory.getLogger(RS3ClientUtil.class); + + private final UUID storeId; + private final PssPartitioner partitioner; + private final RS3Client client; + + public RS3ClientUtil( + final UUID storeId, + final RS3Client client, + final PssPartitioner partitioner + ) { + this.storeId = storeId; + this.partitioner = partitioner; + this.client = client; + } + + public LssMetadata fetchLssMetadata(LssId lssId) { + // TODO: we should write an empty segment periodically to any PSS that we haven't + // written to to bump the written offset + final HashMap> writtenOffsets = new HashMap<>(); + var lastWrittenOffset = ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; + for (final int pss : partitioner.pssForLss(lssId)) { + final var offsets = client.getCurrentOffsets(storeId, lssId, pss); + final var writtenOffset = offsets.writtenOffset(); + + if (writtenOffset.isPresent()) { + final var offset = writtenOffset.get(); + if (offset > lastWrittenOffset) { + lastWrittenOffset = writtenOffset.get(); + } + } + + writtenOffsets.put(pss, offsets.writtenOffset()); + } + + final var writtenOffsetsStr = writtenOffsets.entrySet().stream() + .map(e -> String.format("%s -> %s", + e.getKey(), + e.getValue().map(Object::toString).orElse("none"))) + .collect(Collectors.joining(",")); + LOG.info("Restore RS3 table from offset {} for {}. recorded written offsets: {}", + lastWrittenOffset, + lssId, + writtenOffsetsStr + ); + + return new LssMetadata(lastWrittenOffset, writtenOffsets); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java index 0480f676f..ad3894253 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java @@ -1,69 +1,80 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + package dev.responsive.kafka.internal.db.rs3.client; -import java.util.Arrays; import java.util.Objects; -public class Range { - private final RangeBound start; - private final RangeBound end; +public class Range> { + private final RangeBound start; + private final RangeBound end; - public Range(RangeBound start, RangeBound end) { + public Range(RangeBound start, RangeBound end) { this.start = start; this.end = end; } - public RangeBound start() { + public RangeBound start() { return start; } - public RangeBound end() { + public RangeBound end() { return end; } - public boolean contains(byte[] key) { + public boolean contains(K key) { return greaterThanStartBound(key) && lessThanEndBound(key); } - public boolean greaterThanStartBound(byte[] key) { + public boolean greaterThanStartBound(K key) { return start.map(new RangeBound.Mapper<>() { @Override - public Boolean map(final RangeBound.InclusiveBound b) { - return Arrays.compare(b.key(), key) <= 0; + public Boolean map(final RangeBound.InclusiveBound b) { + return b.key().compareTo(key) <= 0; } @Override - public Boolean map(final RangeBound.ExclusiveBound b) { - return Arrays.compare(b.key(), key) < 0; + public Boolean map(final RangeBound.ExclusiveBound b) { + return b.key().compareTo(key) < 0; } @Override - public Boolean map(final RangeBound.Unbounded b) { + public Boolean map(final RangeBound.Unbounded b) { return true; } }); } - public boolean lessThanEndBound(byte[] key) { + public boolean lessThanEndBound(K key) { return end.map(new RangeBound.Mapper<>() { @Override - public Boolean map(final RangeBound.InclusiveBound b) { - return Arrays.compare(b.key(), key) >= 0; + public Boolean map(final RangeBound.InclusiveBound b) { + return b.key().compareTo(key) >= 0; } @Override - public Boolean map(final RangeBound.ExclusiveBound b) { - return Arrays.compare(b.key(), key) > 0; + public Boolean map(final RangeBound.ExclusiveBound b) { + return b.key().compareTo(key) > 0; } @Override - public Boolean map(final RangeBound.Unbounded b) { + public Boolean map(final RangeBound.Unbounded b) { return true; } }); } - public static Range unbounded() { - return new Range(RangeBound.unbounded(), RangeBound.unbounded()); + public static > Range unbounded() { + return new Range(RangeBound.unbounded(), RangeBound.unbounded()); } @Override @@ -74,7 +85,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final Range range = (Range) o; + final Range range = (Range) o; return Objects.equals(start, range.start) && Objects.equals(end, range.end); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RangeBound.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RangeBound.java index fc8b6a3e0..8299c7f89 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RangeBound.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RangeBound.java @@ -12,38 +12,38 @@ package dev.responsive.kafka.internal.db.rs3.client; -import java.util.Arrays; import java.util.Objects; -public interface RangeBound { +public interface RangeBound { - T map(Mapper mapper); + T map(Mapper mapper); - static Unbounded unbounded() { - return Unbounded.INSTANCE; + @SuppressWarnings("unchecked") + static Unbounded unbounded() { + return (Unbounded) Unbounded.INSTANCE; } - static InclusiveBound inclusive(byte[] key) { - return new InclusiveBound(key); + static InclusiveBound inclusive(K key) { + return new InclusiveBound<>(key); } - static ExclusiveBound exclusive(byte[] key) { - return new ExclusiveBound(key); + static ExclusiveBound exclusive(K key) { + return new ExclusiveBound<>(key); } - class InclusiveBound implements RangeBound { - private final byte[] key; + class InclusiveBound implements RangeBound { + private final K key; - public InclusiveBound(final byte[] key) { + public InclusiveBound(final K key) { this.key = key; } - public byte[] key() { + public K key() { return key; } @Override - public T map(final Mapper mapper) { + public T map(final Mapper mapper) { return mapper.map(this); } @@ -55,29 +55,29 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final InclusiveBound that = (InclusiveBound) o; - return Objects.deepEquals(key, that.key); + final InclusiveBound that = (InclusiveBound) o; + return Objects.equals(key, that.key); } @Override public int hashCode() { - return Arrays.hashCode(key); + return Objects.hashCode(key); } } - class ExclusiveBound implements RangeBound { - private final byte[] key; + class ExclusiveBound implements RangeBound { + private final K key; - public ExclusiveBound(final byte[] key) { + public ExclusiveBound(final K key) { this.key = key; } - public byte[] key() { + public K key() { return key; } @Override - public T map(final Mapper mapper) { + public T map(final Mapper mapper) { return mapper.map(this); } @@ -89,33 +89,33 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final ExclusiveBound that = (ExclusiveBound) o; - return Objects.deepEquals(key, that.key); + final ExclusiveBound that = (ExclusiveBound) o; + return Objects.equals(key, that.key); } @Override public int hashCode() { - return Arrays.hashCode(key); + return Objects.hashCode(key); } } - class Unbounded implements RangeBound { - private static final Unbounded INSTANCE = new Unbounded(); + class Unbounded implements RangeBound { + private static Unbounded INSTANCE = new Unbounded<>(); private Unbounded() {} @Override - public T map(final Mapper mapper) { + public T map(final Mapper mapper) { return mapper.map(this); } } - interface Mapper { - T map(InclusiveBound b); + interface Mapper { + T map(InclusiveBound b); - T map(ExclusiveBound b); + T map(ExclusiveBound b); - T map(Unbounded b); + T map(Unbounded b); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WalEntry.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WalEntry.java index 9ab0a8e8b..48a9229f3 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WalEntry.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WalEntry.java @@ -13,4 +13,15 @@ package dev.responsive.kafka.internal.db.rs3.client; public abstract class WalEntry { + public abstract void visit(Visitor visitor); + + public interface Visitor { + void visit(Put put); + + void visit(Delete delete); + + void visit(WindowedDelete windowedDelete); + + void visit(WindowedPut windowedPut); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WindowedDelete.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WindowedDelete.java new file mode 100644 index 000000000..4d4fa4758 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WindowedDelete.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.Objects; + +public class WindowedDelete extends Delete { + private final long windowTimestamp; + + public WindowedDelete( + final byte[] key, + final long windowTimestamp + ) { + super(key); + this.windowTimestamp = windowTimestamp; + } + + public long windowTimestamp() { + return windowTimestamp; + } + + @Override + public void visit(final Visitor visitor) { + visitor.visit(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final WindowedDelete that = (WindowedDelete) o; + return windowTimestamp == that.windowTimestamp; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), windowTimestamp); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WindowedPut.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WindowedPut.java new file mode 100644 index 000000000..b732222bc --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/WindowedPut.java @@ -0,0 +1,65 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.Objects; + +public class WindowedPut extends Put { + private final long timestamp; + private final long windowTimestamp; + + public WindowedPut( + final byte[] key, + final byte[] value, + final long timestamp, + final long windowTimestamp + ) { + super(key, value); + this.timestamp = timestamp; + this.windowTimestamp = windowTimestamp; + } + + public long timestamp() { + return timestamp; + } + + public long windowTimestamp() { + return windowTimestamp; + } + + @Override + public void visit(final Visitor visitor) { + visitor.visit(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final WindowedPut that = (WindowedPut) o; + return timestamp == that.timestamp && windowTimestamp == that.windowTimestamp; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), timestamp, windowTimestamp); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java index 6bd674806..81999f417 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java @@ -12,11 +12,11 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; -import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.kafka.internal.utils.WindowedKey; import dev.responsive.rs3.Rs3; import io.grpc.stub.StreamObserver; import java.util.NoSuchElementException; @@ -32,27 +32,55 @@ * Internal iterator implementation which supports retries using RS3's asynchronous * Range API. */ -public class GrpcKeyValueIterator implements KeyValueIterator { +public class GrpcKeyValueIterator> implements KeyValueIterator { private static final Logger LOG = LoggerFactory.getLogger(GrpcKeyValueIterator.class); - private final GrpcRangeRequestProxy requestProxy; + private final GrpcRangeRequestProxy requestProxy; + private final GrpcRangeKeyCodec keyCodec; private final GrpcMessageQueue queue; - private Range range; + private final RangeBound end; + private RangeBound start; private RangeResultObserver resultObserver; public GrpcKeyValueIterator( - Range range, - GrpcRangeRequestProxy requestProxy + Range range, + GrpcRangeRequestProxy requestProxy, + GrpcRangeKeyCodec keyCodec ) { this.requestProxy = requestProxy; + this.keyCodec = keyCodec; this.queue = new GrpcMessageQueue<>(); - this.range = range; + this.start = range.start(); + this.end = range.end(); sendRangeRequest(); } + static GrpcKeyValueIterator standard( + Range range, + GrpcRangeRequestProxy requestProxy + ) { + return new GrpcKeyValueIterator<>( + range, + requestProxy, + GrpcRangeKeyCodec.STANDARD_CODEC + ); + } + + static GrpcKeyValueIterator windowed( + Range range, + GrpcRangeRequestProxy requestProxy + ) { + return new GrpcKeyValueIterator<>( + range, + requestProxy, + GrpcRangeKeyCodec.WINDOW_CODEC + ); + } + private void sendRangeRequest() { // Note that backoff on retry is handled internally by the request proxy resultObserver = new RangeResultObserver(); + final var range = new Range<>(start, end); requestProxy.send(range, resultObserver); } @@ -62,14 +90,12 @@ public boolean hasNext() { } @Override - public KeyValue next() { + public KeyValue next() { final var nextKeyValue = peekNextKeyValue(); if (nextKeyValue.isPresent()) { queue.poll(); final var keyValue = nextKeyValue.get(); - final var newStartRange = RangeBound.exclusive(keyValue.key.get()); - final var newEndRange = this.range.end(); - this.range = new Range(newStartRange, newEndRange); + this.start = RangeBound.exclusive(keyValue.key); return keyValue; } else { throw new NoSuchElementException(); @@ -83,7 +109,7 @@ public void close() { } } - Optional> peekNextKeyValue() { + Optional> peekNextKeyValue() { while (true) { try { final var message = queue.peek(); @@ -99,29 +125,27 @@ Optional> peekNextKeyValue() { } } - private Optional> tryUnwrapKeyValue(final Message message) { + private Optional> tryUnwrapKeyValue(final Message message) { return message.map(new Mapper<>() { @Override - public Optional> map(final EndOfStream endOfStream) { + public Optional> map(final EndOfStream endOfStream) { return Optional.empty(); } @Override - public Optional> map(final StreamError error) { + public Optional> map(final StreamError error) { throw GrpcRs3Util.wrapThrowable(error.exception); } @Override - public Optional> map(final Result result) { - final var key = Bytes.wrap(result.key.toByteArray()); - final var value = result.value.toByteArray(); - return Optional.of(new KeyValue<>(key, value)); + public Optional> map(final Result result) { + return Optional.of(keyCodec.decodeKeyValue(result.keyValue)); } }); } @Override - public Bytes peekNextKey() { + public K peekNextKey() { return peekNextKeyValue() .map(bytesKeyValue -> bytesKeyValue.key) .orElse(null); @@ -137,10 +161,7 @@ public void onNext(final Rs3.RangeResult rangeResult) { } else if (rangeResult.getType() == Rs3.RangeResult.Type.END_OF_STREAM) { queue.put(new EndOfStream()); } else { - final var kv = rangeResult.getResult().getBasicKv(); - final var key = kv.getKey().getKey(); - final var value = kv.getValue().getValue(); - queue.put(new Result(key, value)); + queue.put(new Result(rangeResult.getResult())); } } @@ -177,12 +198,12 @@ public T map(final Mapper mapper) { } private static class Result implements Message { - private final ByteString key; - private final ByteString value; + private final Rs3.KeyValue keyValue; - private Result(final ByteString key, final ByteString value) { - this.key = key; - this.value = value; + private Result( + final Rs3.KeyValue keyValue + ) { + this.keyValue = keyValue; } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 111bd0003..223732b2c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -12,13 +12,12 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; -import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicDeleteProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto; -import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeStatusFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeTypeFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.windowKeyProto; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; import static dev.responsive.kafka.internal.utils.Utils.uuidFromProto; import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; @@ -28,17 +27,15 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets; -import dev.responsive.kafka.internal.db.rs3.client.Delete; import dev.responsive.kafka.internal.db.rs3.client.LssId; -import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; import dev.responsive.kafka.internal.db.rs3.client.Range; -import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.kafka.internal.db.rs3.client.StoreInfo; import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.utils.WindowedKey; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; import io.grpc.stub.StreamObserver; @@ -95,14 +92,6 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f ); } - private void withRetry(Runnable grpcOperation, Supplier opDescription) { - final Supplier voidSupplier = () -> { - grpcOperation.run(); - return null; - }; - withRetry(voidSupplier, opDescription); - } - private T withRetry(Supplier grpcOperation, Supplier opDescription) { final var deadlineMs = time.milliseconds() + retryTimeoutMs; return withRetryDeadline(grpcOperation, deadlineMs, opDescription); @@ -159,7 +148,7 @@ public StreamSenderMessageReceiver> writeWalSegmentAsyn .setPssId(pssId) .setEndOffset(endOffset) .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)); - addWalEntryToSegment(entry, entryBuilder); + entry.visit(new WalEntryPutWriter(entryBuilder)); return entryBuilder.build(); }, streamObserver @@ -229,7 +218,43 @@ public KeyValueIterator range( final LssId lssId, final int pssId, final Optional expectedWrittenOffset, - Range range + Range range + ) { + return sendRange( + storeId, + lssId, + pssId, + expectedWrittenOffset, + range, + GrpcRangeKeyCodec.STANDARD_CODEC + ); + } + + @Override + public KeyValueIterator windowedRange( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset, + final Range range + ) { + return sendRange( + storeId, + lssId, + pssId, + expectedWrittenOffset, + range, + GrpcRangeKeyCodec.WINDOW_CODEC + ); + } + + private > KeyValueIterator sendRange( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset, + final Range range, + final GrpcRangeKeyCodec keyCodec ) { final var requestBuilder = Rs3.RangeRequest.newBuilder() .setStoreId(uuidToProto(storeId)) @@ -239,8 +264,13 @@ public KeyValueIterator range( final Supplier rangeDescription = () -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"; final var asyncStub = stubs.stubs(storeId, pssId).asyncStub(); - final var rangeProxy = new RangeProxy(requestBuilder, asyncStub, rangeDescription); - return new GrpcKeyValueIterator(range, rangeProxy); + final var rangeProxy = new RangeProxy<>( + requestBuilder, + asyncStub, + keyCodec, + rangeDescription + ); + return new GrpcKeyValueIterator<>(range, rangeProxy, keyCodec); } @Override @@ -249,17 +279,66 @@ public Optional get( final LssId lssId, final int pssId, final Optional expectedWrittenOffset, - final byte[] key + final Bytes key ) { + final var keyProto = Rs3.Key.newBuilder() + .setBasicKey(basicKeyProto(key.get())); final var requestBuilder = Rs3.GetRequest.newBuilder() - .setStoreId(uuidToProto(storeId)) + .setKey(keyProto); + final var kvOpt = sendGet( + requestBuilder, + storeId, + lssId, + pssId, + expectedWrittenOffset + ); + return kvOpt.map(kv -> { + checkField(kv::hasBasicKv, "value"); + final var value = kv.getBasicKv().getValue().getValue(); + return value.toByteArray(); + }); + } + + @Override + public Optional windowedGet( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset, + final WindowedKey key + ) { + final var keyProto = Rs3.Key.newBuilder() + .setWindowKey(windowKeyProto(key)); + final var requestBuilder = Rs3.GetRequest.newBuilder() + .setKey(keyProto); + final var kvOpt = sendGet( + requestBuilder, + storeId, + lssId, + pssId, + expectedWrittenOffset + ); + return kvOpt.map(kv -> { + checkField(kv::hasWindowKv, "value"); + final var value = kv.getWindowKv().getValue().getValue(); + return value.toByteArray(); + }); + } + + private Optional sendGet( + final Rs3.GetRequest.Builder requestBuilder, + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset + ) { + requestBuilder.setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) - .setKey(Rs3.Key.newBuilder().setBasicKey(basicKeyProto(key))) .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)); + final var request = requestBuilder.build(); final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); - final Rs3.GetResult result = withRetry( () -> stub.get(request), () -> "Get(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")" @@ -267,10 +346,7 @@ public Optional get( if (!result.hasResult()) { return Optional.empty(); } - final Rs3.KeyValue keyValue = result.getResult(); - checkField(keyValue::hasBasicKv, "value"); - final var value = keyValue.getBasicKv().getValue().getValue(); - return Optional.of(value.toByteArray()); + return Optional.of(result.getResult()); } @Override @@ -315,79 +391,38 @@ public CreateStoreResult createStore( return new CreateStoreResult(uuidFromProto(result.getStoreId()), result.getPssIdsList()); } - private void addWalEntryToSegment( - final WalEntry entry, - final Rs3.WriteWALSegmentRequest.Builder builder - ) { - if (entry instanceof Put) { - builder.setPut(basicPutProto((Put) entry)); - } else if (entry instanceof Delete) { - builder.setDelete(basicDeleteProto((Delete) entry)); - } - } - private void checkField(final Supplier check, final String field) { if (!check.get()) { throw new RuntimeException("rs3 resp proto missing field " + field); } } - private Rs3.Range protoRange(Range range) { - final var protoRange = Rs3.BasicRange.newBuilder() - .setFrom(protoBound(range.start())) - .setTo(protoBound(range.end())); - return Rs3.Range.newBuilder() - .setBasicRange(protoRange) - .build(); - } - - private Rs3.BasicBound protoBound(RangeBound bound) { - return bound.map(new RangeBound.Mapper<>() { - @Override - public Rs3.BasicBound map(final RangeBound.InclusiveBound b) { - return Rs3.BasicBound.newBuilder() - .setType(Rs3.BoundType.INCLUSIVE) - .setKey(basicKeyProto(b.key())) - .build(); - } - - @Override - public Rs3.BasicBound map(final RangeBound.ExclusiveBound b) { - return Rs3.BasicBound.newBuilder() - .setType(Rs3.BoundType.EXCLUSIVE) - .setKey(basicKeyProto(b.key())) - .build(); - } - - @Override - public Rs3.BasicBound map(final RangeBound.Unbounded b) { - return Rs3.BasicBound.newBuilder() - .setType(Rs3.BoundType.UNBOUNDED) - .build(); - } - }); - } - - private class RangeProxy implements GrpcRangeRequestProxy { + private class RangeProxy> implements GrpcRangeRequestProxy { private final Rs3.RangeRequest.Builder requestBuilder; private final RS3Grpc.RS3Stub stub; private final Supplier opDescription; + private final GrpcRangeKeyCodec keyCodec; private int attempts = 0; private long deadlineMs = Long.MAX_VALUE; // Set upon the first retry private RangeProxy( final Rs3.RangeRequest.Builder requestBuilder, final RS3Grpc.RS3Stub stub, + final GrpcRangeKeyCodec keyCodec, final Supplier opDescription ) { this.requestBuilder = requestBuilder; this.stub = stub; + this.keyCodec = keyCodec; this.opDescription = opDescription; } @Override - public void send(final Range range, final StreamObserver resultObserver) { - requestBuilder.setRange(protoRange(range)); + public void send( + final Range range, + final StreamObserver resultObserver + ) { + requestBuilder.setRange(keyCodec.encodeRange(range)); while (true) { try { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeKeyCodec.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeKeyCodec.java new file mode 100644 index 000000000..4d47ae0a3 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeKeyCodec.java @@ -0,0 +1,136 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import com.google.protobuf.ByteString; +import dev.responsive.kafka.internal.db.rs3.client.Range; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.kafka.internal.utils.WindowedKey; +import dev.responsive.rs3.Rs3; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; + +public interface GrpcRangeKeyCodec> { + WindowedKeyCodec WINDOW_CODEC = new WindowedKeyCodec(); + BasicKeyCodec STANDARD_CODEC = new BasicKeyCodec(); + + KeyValue decodeKeyValue(Rs3.KeyValue keyValue); + + Rs3.Range encodeRange(Range range); + + class WindowedKeyCodec implements GrpcRangeKeyCodec { + @Override + public KeyValue decodeKeyValue(final Rs3.KeyValue keyValue) { + final var kvProto = keyValue.getWindowKv(); + final var keyProto = kvProto.getKey(); + + final var key = new WindowedKey( + Bytes.wrap(keyProto.getKey().toByteArray()), + keyProto.getWindowTimestamp() + ); + final var value = kvProto.getValue().getValue().toByteArray(); + return new KeyValue<>(key, value); + } + + @Override + public Rs3.Range encodeRange(final Range range) { + final var rangeProto = Rs3.WindowRange.newBuilder(); + rangeProto.setFrom(boundProto(range.start())); + rangeProto.setTo(boundProto(range.end())); + return Rs3.Range.newBuilder() + .setWindowRange(rangeProto) + .build(); + } + + private Rs3.WindowBound boundProto(RangeBound bound) { + final var boundProto = Rs3.WindowBound.newBuilder(); + return bound.map(new RangeBound.Mapper<>() { + @Override + public Rs3.WindowBound map(final RangeBound.InclusiveBound b) { + final var key = Rs3.WindowKey.newBuilder() + .setWindowTimestamp(b.key().windowStartMs) + .setKey(ByteString.copyFrom(b.key().key.get())); + return boundProto.setType(Rs3.BoundType.INCLUSIVE) + .setKey(key) + .build(); + } + + @Override + public Rs3.WindowBound map(final RangeBound.ExclusiveBound b) { + final var key = Rs3.WindowKey.newBuilder() + .setWindowTimestamp(b.key().windowStartMs) + .setKey(ByteString.copyFrom(b.key().key.get())); + return boundProto.setType(Rs3.BoundType.EXCLUSIVE) + .setKey(key) + .build(); + } + + @Override + public Rs3.WindowBound map(final RangeBound.Unbounded b) { + return boundProto.setType(Rs3.BoundType.UNBOUNDED) + .build(); + } + }); + } + } + + class BasicKeyCodec implements GrpcRangeKeyCodec { + @Override + public KeyValue decodeKeyValue(final Rs3.KeyValue keyValue) { + final var kvProto = keyValue.getBasicKv(); + final var key = Bytes.wrap(kvProto.getKey().getKey().toByteArray()); + final var value = kvProto.getValue().getValue().toByteArray(); + return new KeyValue<>(key, value); + } + + @Override + public Rs3.Range encodeRange(final Range range) { + final var rangeProto = Rs3.BasicRange.newBuilder(); + rangeProto.setFrom(boundProto(range.start())); + rangeProto.setTo(boundProto(range.end())); + return Rs3.Range.newBuilder() + .setBasicRange(rangeProto) + .build(); + } + + private Rs3.BasicBound boundProto(RangeBound bound) { + final var boundProto = Rs3.BasicBound.newBuilder(); + return bound.map(new RangeBound.Mapper<>() { + @Override + public Rs3.BasicBound map(final RangeBound.InclusiveBound b) { + final var key = Rs3.BasicKey.newBuilder() + .setKey(ByteString.copyFrom(b.key().get())); + return boundProto.setType(Rs3.BoundType.INCLUSIVE) + .setKey(key) + .build(); + } + + @Override + public Rs3.BasicBound map(final RangeBound.ExclusiveBound b) { + final var key = Rs3.BasicKey.newBuilder() + .setKey(ByteString.copyFrom(b.key().get())); + return boundProto.setType(Rs3.BoundType.EXCLUSIVE) + .setKey(key) + .build(); + } + + @Override + public Rs3.BasicBound map(final RangeBound.Unbounded b) { + return boundProto.setType(Rs3.BoundType.UNBOUNDED) + .build(); + } + }); + } + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java index 6b12fd288..e7ef3f66f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java @@ -24,7 +24,7 @@ * updated. If the observer encounters an error, then it can retry with the updated * start bound. */ -public interface GrpcRangeRequestProxy { +public interface GrpcRangeRequestProxy> { /** * Send a range request with an updated start bound. The results will be passed * through to result observer. If a transient error is encountered through the @@ -33,5 +33,6 @@ public interface GrpcRangeRequestProxy { * @param range The updated range based on key-values seen with `resultObserver` * @param resultObserver An observer for key-value results and the end of stream marker */ - void send(Range range, StreamObserver resultObserver); + void send(Range range, StreamObserver resultObserver); + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java index 39ab5143d..b11e306a5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java @@ -19,6 +19,7 @@ import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; import dev.responsive.kafka.internal.db.rs3.client.StoreInfo; +import dev.responsive.kafka.internal.utils.WindowedKey; import dev.responsive.rs3.Rs3; import io.grpc.Status; import io.grpc.StatusException; @@ -72,18 +73,36 @@ public static Rs3.BasicKey basicKeyProto(final byte[] key) { .build(); } + public static Rs3.WindowKey windowKeyProto(final WindowedKey key) { + return Rs3.WindowKey.newBuilder() + .setKey(ByteString.copyFrom(key.key.get())) + .setWindowTimestamp(key.windowStartMs) + .build(); + } + public static Rs3.BasicKeyValue basicKeyValueProto( final byte[] key, final byte[] value ) { - final var keyBldr = Rs3.BasicKey.newBuilder(); - keyBldr.setKey(ByteString.copyFrom(key)); - + final var keyProto = basicKeyProto(key); final var valueBldr = Rs3.BasicValue.newBuilder(); valueBldr.setValue(ByteString.copyFrom(value)); return Rs3.BasicKeyValue.newBuilder() - .setKey(keyBldr) + .setKey(keyProto) + .setValue(valueBldr) + .build(); + } + + public static Rs3.WindowKeyValue windowKeyValueProto( + final WindowedKey key, + final byte[] value + ) { + final var keyProto = windowKeyProto(key); + final var valueBldr = Rs3.WindowValue.newBuilder(); + valueBldr.setValue(ByteString.copyFrom(value)); + return Rs3.WindowKeyValue.newBuilder() + .setKey(keyProto) .setValue(valueBldr) .build(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java new file mode 100644 index 000000000..6bf05c277 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java @@ -0,0 +1,63 @@ +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import com.google.protobuf.ByteString; +import dev.responsive.kafka.internal.db.rs3.client.Delete; +import dev.responsive.kafka.internal.db.rs3.client.Put; +import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.db.rs3.client.WindowedDelete; +import dev.responsive.kafka.internal.db.rs3.client.WindowedPut; +import dev.responsive.rs3.Rs3; + +public class WalEntryPutWriter implements WalEntry.Visitor { + private final Rs3.WriteWALSegmentRequest.Builder builder; + + public WalEntryPutWriter(final Rs3.WriteWALSegmentRequest.Builder builder) { + this.builder = builder; + } + + @Override + public void visit(final Put put) { + final var kvProto = Rs3.BasicKeyValue.newBuilder() + .setKey(Rs3.BasicKey.newBuilder().setKey(ByteString.copyFrom(put.key()))) + .setValue(Rs3.BasicValue.newBuilder().setValue(ByteString.copyFrom(put.value()))); + final var putProto = Rs3.WriteWALSegmentRequest.Put.newBuilder() + .setKv(Rs3.KeyValue.newBuilder().setBasicKv(kvProto)) + .setTtl(Rs3.Ttl.newBuilder().setTtlType(Rs3.Ttl.TtlType.DEFAULT)); + builder.setPut(putProto); + } + + @Override + public void visit(final Delete delete) { + final var keyProto = Rs3.BasicKey.newBuilder() + .setKey(ByteString.copyFrom(delete.key())); + final var deleteProto = Rs3.WriteWALSegmentRequest.Delete.newBuilder() + .setKey(Rs3.Key.newBuilder().setBasicKey(keyProto)); + builder.setDelete(deleteProto); + } + + @Override + public void visit(final WindowedDelete windowedDelete) { + final var keyProto = Rs3.WindowKey.newBuilder() + .setKey(ByteString.copyFrom(windowedDelete.key())) + .setWindowTimestamp(windowedDelete.windowTimestamp()); + final var deleteProto = Rs3.WriteWALSegmentRequest.Delete.newBuilder() + .setKey(Rs3.Key.newBuilder().setWindowKey(keyProto)); + builder.setDelete(deleteProto); + } + + @Override + public void visit(final WindowedPut windowedPut) { + final var keyProto = Rs3.WindowKey.newBuilder() + .setKey(ByteString.copyFrom(windowedPut.key())) + .setWindowTimestamp(windowedPut.windowTimestamp()); + final var valueProto = Rs3.WindowValue.newBuilder() + .setValue(ByteString.copyFrom(windowedPut.value())); + final var kvProto = Rs3.WindowKeyValue.newBuilder() + .setKey(keyProto) + .setValue(valueProto); + final var putProto = Rs3.WriteWALSegmentRequest.Put.newBuilder() + .setKv(Rs3.KeyValue.newBuilder().setWindowKv(kvProto)) + .setTtl(Rs3.Ttl.newBuilder().setTtlType(Rs3.Ttl.TtlType.DEFAULT)); + builder.setPut(putProto); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java index 93926bb7d..54cc40110 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java @@ -13,9 +13,11 @@ package dev.responsive.kafka.internal.stores; import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG; +import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadMetrics; import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadSessionClients; import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadStoreRegistry; import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; +import static dev.responsive.kafka.internal.utils.StoreUtil.numPartitionsForKafkaTopic; import static dev.responsive.kafka.internal.utils.StoreUtil.streamThreadId; import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.changelogFor; @@ -30,13 +32,16 @@ import dev.responsive.kafka.internal.db.WindowedKeySpec; import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner; +import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener; import dev.responsive.kafka.internal.utils.Iterators; import dev.responsive.kafka.internal.utils.Result; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.StoreUtil; import dev.responsive.kafka.internal.utils.TableName; +import dev.responsive.kafka.internal.utils.Utils; import dev.responsive.kafka.internal.utils.WindowedKey; +import java.time.Duration; import java.util.Collection; import java.util.Map; import java.util.OptionalLong; @@ -93,8 +98,8 @@ public static RemoteWindowOperations create( ); final WindowSegmentPartitioner partitioner = new WindowSegmentPartitioner( - params.retentionPeriod(), - StoreUtil.computeSegmentInterval(params.retentionPeriod(), params.numSegments()), + params.retentionPeriodMs(), + StoreUtil.computeSegmentInterval(params.retentionPeriodMs(), params.numSegments()), params.retainDuplicates() ); @@ -106,6 +111,22 @@ public static RemoteWindowOperations create( case MONGO_DB: table = createMongo(params, sessionClients, partitioner, responsiveConfig); break; + case RS3: + final var responsiveMetrics = loadMetrics(appConfigs); + final var scopeBuilder = responsiveMetrics.storeLevelMetricScopeBuilder( + Utils.extractThreadIdFromThreadName(Thread.currentThread().getName()), + changelog, + params.name().tableName() + ); + + table = createRs3( + params, + sessionClients, + changelog.topic(), + responsiveMetrics, + scopeBuilder + ); + break; case NONE: log.error("Must configure a storage backend type using the config {}", STORAGE_BACKEND_TYPE_CONFIG); @@ -213,6 +234,27 @@ private static RemoteWindowTable createMongo( } } + private static RemoteWindowTable createRs3( + final ResponsiveWindowParams params, + final SessionClients sessionClients, + final String changelogTopicName, + final ResponsiveMetrics responsiveMetrics, + final ResponsiveMetrics.MetricScopeBuilder scopeBuilder + ) { + if (params.schemaType() == SchemaTypes.WindowSchema.STREAM || params.retainDuplicates()) { + throw new UnsupportedOperationException("Duplicate retention is not yet supported in RS3"); + } + + final var defaultTtl = Duration.ofMillis(params.retentionPeriodMs()); + return sessionClients.rs3TableFactory().windowTable( + params.name().tableName(), + defaultTtl, + responsiveMetrics, + scopeBuilder, + () -> numPartitionsForKafkaTopic(sessionClients.admin(), changelogTopicName) + ); + } + @SuppressWarnings("rawtypes") public RemoteWindowOperations( final Logger log, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java index 2f0883e92..0f1c05d86 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java @@ -68,7 +68,7 @@ public class ResponsiveWindowStore public ResponsiveWindowStore(final ResponsiveWindowParams params) { this.params = params; this.name = params.name(); - this.retentionPeriod = params.retentionPeriod(); + this.retentionPeriod = params.retentionPeriodMs(); this.position = Position.emptyPosition(); this.log = new LogContext( String.format("window-store [%s] ", name.kafkaName()) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java index f288c1fa1..d142723c4 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java @@ -152,7 +152,7 @@ public void shouldWriteToStore() throws InterruptedException, ExecutionException new LssId(PARTITION_ID), pss, Optional.of(10L), - key.get() + key ); assertThat(result.get(), is("bar".getBytes())); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java index a5617d564..12e6caf63 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java @@ -28,6 +28,7 @@ import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client.Connector; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -69,7 +70,7 @@ public void setup() { } @Test - public void testTableMapping() { + public void testBasicTableMapping() { final UUID storeId = new UUID(100, 200); final String tableName = "test-table"; final int partitions = 5; @@ -86,7 +87,7 @@ public void testTableMapping() { () -> partitions ); assertEquals(tableName, rs3Table.name()); - assertEquals(storeId, rs3Table.storedId()); + assertEquals(storeId, rs3Table.storeId()); final var expectedOptions = new CreateStoreOptions( partitions, @@ -98,6 +99,37 @@ public void testTableMapping() { verify(client).createStore(tableName, expectedOptions); } + @Test + public void testWindowTableMapping() { + final UUID storeId = new UUID(100, 200); + final String tableName = "test-table"; + final int partitions = 5; + + when(client.createStore(anyString(), any(CreateStoreOptions.class))) + .thenReturn(new CreateStoreResult(storeId, List.of(1, 2, 3, 4, 5))); + + final var factory = newTestFactory(); + final var defaultTtl = Duration.ofMinutes(10); + final RS3WindowTable rs3Table = (RS3WindowTable) factory.windowTable( + tableName, + defaultTtl, + metrics, + scopeBuilder, + () -> partitions + ); + assertEquals(tableName, rs3Table.name()); + assertEquals(storeId, rs3Table.storeId()); + + final var expectedOptions = new CreateStoreOptions( + partitions, + CreateStoreTypes.StoreType.WINDOW, + Optional.of(CreateStoreTypes.ClockType.WALL_CLOCK), + Optional.of(defaultTtl.toMillis()), + Optional.empty() + ); + verify(client).createStore(tableName, expectedOptions); + } + private RS3TableFactory newTestFactory() { final var connector = mock(Connector.class); lenient().when(connector.connect()).thenReturn(client); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTableTest.java new file mode 100644 index 000000000..045e6f631 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3WindowTableTest.java @@ -0,0 +1,149 @@ +package dev.responsive.kafka.internal.db.rs3; + +import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets; +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.StreamSender; +import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; +import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.utils.WindowedKey; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class RS3WindowTableTest { + + @Mock + private RS3Client client; + + @Mock + private PssPartitioner partitioner; + + @Mock + private StreamSenderMessageReceiver> sendRecv; + + @Mock + private StreamSender streamSender; + + @Test + public void shouldReturnValueFromFetchIfKeyFound() { + // Given: + final var storeId = UUID.randomUUID(); + final var kafkaPartition = 5; + final var lssId = new LssId(kafkaPartition); + final var table = new RS3WindowTable("table", storeId, client, partitioner); + final var key = Bytes.wrap("foo".getBytes(StandardCharsets.UTF_8)); + final var timestamp = 300L; + + // When: + final var pssId = 1; + final byte[] value = "bar".getBytes(StandardCharsets.UTF_8); + when(partitioner.pssForLss(lssId)).thenReturn(singletonList(pssId)); + when(partitioner.pss(any(), eq(lssId))).thenReturn(pssId); + when(client.getCurrentOffsets(storeId, lssId, pssId)) + .thenReturn(new CurrentOffsets(Optional.of(5L), Optional.of(5L))); + when(client.windowedGet( + storeId, + lssId, + pssId, + Optional.of(5L), + new WindowedKey(key, timestamp) + )).thenReturn(Optional.of(value)); + + // Then: + table.init(kafkaPartition); + assertThat(table.fetch(kafkaPartition, key, timestamp), is(value)); + } + + @Test + public void shouldReturnNullFromFetchIfKeyNotFound() { + // Given: + final var storeId = UUID.randomUUID(); + final var kafkaPartition = 5; + final var lssId = new LssId(kafkaPartition); + final var table = new RS3WindowTable("table", storeId, client, partitioner); + final var key = Bytes.wrap("foo".getBytes(StandardCharsets.UTF_8)); + final var timestamp = 300L; + + // When: + final var pssId = 1; + when(partitioner.pssForLss(lssId)).thenReturn(singletonList(pssId)); + when(partitioner.pss(any(), eq(lssId))).thenReturn(pssId); + when(client.getCurrentOffsets(storeId, lssId, pssId)) + .thenReturn(new CurrentOffsets(Optional.of(5L), Optional.of(5L))); + when(client.windowedGet( + storeId, + lssId, + pssId, + Optional.of(5L), + new WindowedKey(key, timestamp)) + ).thenReturn(Optional.empty()); + + // Then: + table.init(kafkaPartition); + assertThat(table.fetch(kafkaPartition, key, timestamp), is(nullValue())); + } + + @Test + public void shouldWriteWindowedKeyValues() throws Exception { + // Given: + final var storeId = UUID.randomUUID(); + final var kafkaPartition = 5; + final var lssId = new LssId(kafkaPartition); + final var table = new RS3WindowTable("table", storeId, client, partitioner); + final var pssId = 0; + final var consumedOffset = 4L; + + // When: + final var sendRecvCompletion = new CompletableFuture>(); + when(sendRecv.completion()).thenReturn(sendRecvCompletion); + when(sendRecv.sender()).thenReturn(streamSender); + when(sendRecv.isActive()).thenAnswer(invocation -> !sendRecvCompletion.isDone()); + when(partitioner.pssForLss(lssId)).thenReturn(singletonList(pssId)); + when(client.getCurrentOffsets(storeId, lssId, pssId)) + .thenReturn(new CurrentOffsets(Optional.empty(), Optional.empty())); + when(client.writeWalSegmentAsync(storeId, lssId, pssId, Optional.empty(), consumedOffset)) + .thenReturn(sendRecv); + + // Then: + final var flushManager = table.init(kafkaPartition); + final var writer = flushManager.createWriter(pssId, consumedOffset); + writer.insert( + new WindowedKey(utf8Bytes("super"), 0L), + utf8Bytes("mario"), + 5L + ); + writer.insert( + new WindowedKey(utf8Bytes("super"), 10L), + utf8Bytes("mario"), + 15L + ); + sendRecvCompletion.complete(Optional.of(consumedOffset)); + + final var completion = writer.flush(); + assertThat(completion.toCompletableFuture().isDone(), is(true)); + final var result = completion.toCompletableFuture().get(); + assertThat(result.wasApplied(), is(true)); + assertThat(result.tablePartition(), is(pssId)); + } + + private static byte[] utf8Bytes(String s) { + return s.getBytes(StandardCharsets.UTF_8); + } + +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/RS3ClientUtilTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/RS3ClientUtilTest.java new file mode 100644 index 000000000..35636d77a --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/RS3ClientUtilTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + +import dev.responsive.kafka.internal.db.rs3.PssPartitioner; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration; +import java.util.Arrays; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class RS3ClientUtilTest { + + @Mock + private RS3Client client; + + @Mock + private PssPartitioner partitioner; + + + @Test + public void shouldReturnLastWrittenOffset() { + // Given: + final var storeId = UUID.randomUUID(); + final var lssId = new LssId(0); + when(partitioner.pssForLss(lssId)) + .thenReturn(Arrays.asList(0, 1, 2)); + + // When: + when(client.getCurrentOffsets(storeId, lssId, 0)) + .thenReturn(new CurrentOffsets(Optional.of(15L), Optional.of(10L))); + when(client.getCurrentOffsets(storeId, lssId, 1)) + .thenReturn(new CurrentOffsets(Optional.of(20L), Optional.of(5L))); + when(client.getCurrentOffsets(storeId, lssId, 2)) + .thenReturn(new CurrentOffsets(Optional.empty(), Optional.empty())); + + // Then: + final var clientUtil = new RS3ClientUtil(storeId, client, partitioner); + final var lssMetadata = clientUtil.fetchLssMetadata(lssId); + assertThat(lssMetadata.lastWrittenOffset(), is(20L)); + assertThat(lssMetadata.writtenOffsets().get(0), is(Optional.of(15L))); + assertThat(lssMetadata.writtenOffsets().get(1), is(Optional.of(20L))); + assertThat(lssMetadata.writtenOffsets().get(2), is(Optional.empty())); + } + + @Test + public void shouldReturnMinusOneForLastWrittenOffset() { + // Given: + final var storeId = UUID.randomUUID(); + final var lssId = new LssId(0); + when(partitioner.pssForLss(lssId)) + .thenReturn(Arrays.asList(0, 1)); + + // When: + when(client.getCurrentOffsets(storeId, lssId, 0)) + .thenReturn(new CurrentOffsets(Optional.empty(), Optional.empty())); + when(client.getCurrentOffsets(storeId, lssId, 1)) + .thenReturn(new CurrentOffsets(Optional.empty(), Optional.empty())); + + // Then: + final var clientUtil = new RS3ClientUtil(storeId, client, partitioner); + final var lssMetadata = clientUtil.fetchLssMetadata(lssId); + assertThat( + lssMetadata.lastWrittenOffset(), + is(ResponsiveStoreRegistration.NO_COMMITTED_OFFSET) + ); + } + +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java index 06dfa9571..994e7fdc4 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java @@ -29,6 +29,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.nio.charset.StandardCharsets; +import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -39,13 +40,13 @@ class GrpcKeyValueIteratorTest { @Mock - private GrpcRangeRequestProxy requestProxy; + private GrpcRangeRequestProxy requestProxy; @Test @SuppressWarnings("unchecked") public void shouldIterateKeyValueResults() { - final var range = new Range( - RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + final var range = new Range<>( + RangeBound.inclusive(Bytes.wrap("a".getBytes(StandardCharsets.UTF_8))), RangeBound.unbounded() ); Mockito.doAnswer(invocation -> { @@ -58,7 +59,7 @@ public void shouldIterateKeyValueResults() { return null; }).when(requestProxy).send(eq(range), any()); - try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { + try (final var iter = GrpcKeyValueIterator.standard(range, requestProxy)) { assertNextKey(iter, "a"); assertNextKey(iter, "b"); assertNextKey(iter, "c"); @@ -69,8 +70,8 @@ public void shouldIterateKeyValueResults() { @Test @SuppressWarnings("unchecked") public void shouldRetryRangeRequestAfterTransientFailure() { - final var range = new Range( - RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + final var range = new Range<>( + RangeBound.inclusive(Bytes.wrap("a".getBytes(StandardCharsets.UTF_8))), RangeBound.unbounded() ); Mockito.doAnswer(invocation -> { @@ -80,8 +81,8 @@ public void shouldRetryRangeRequestAfterTransientFailure() { return null; }).when(requestProxy).send(eq(range), any()); - final var retryRange = new Range( - RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)), + final var retryRange = new Range<>( + RangeBound.exclusive(Bytes.wrap("a".getBytes(StandardCharsets.UTF_8))), RangeBound.unbounded() ); Mockito.doAnswer(invocation -> { @@ -93,7 +94,7 @@ public void shouldRetryRangeRequestAfterTransientFailure() { return null; }).when(requestProxy).send(eq(retryRange), any()); - try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { + try (final var iter = GrpcKeyValueIterator.standard(range, requestProxy)) { assertNextKey(iter, "a"); assertNextKey(iter, "b"); assertNextKey(iter, "c"); @@ -104,8 +105,8 @@ public void shouldRetryRangeRequestAfterTransientFailure() { @Test @SuppressWarnings("unchecked") public void shouldRetryAfterUnexpectedStreamCompletion() { - final var range = new Range( - RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + final var range = new Range<>( + RangeBound.inclusive(Bytes.wrap("a".getBytes(StandardCharsets.UTF_8))), RangeBound.unbounded() ); Mockito.doAnswer(invocation -> { @@ -115,8 +116,8 @@ public void shouldRetryAfterUnexpectedStreamCompletion() { return null; }).when(requestProxy).send(eq(range), any()); - final var retryRange = new Range( - RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)), + final var retryRange = new Range<>( + RangeBound.exclusive(Bytes.wrap("a".getBytes(StandardCharsets.UTF_8))), RangeBound.unbounded() ); Mockito.doAnswer(invocation -> { @@ -128,7 +129,7 @@ public void shouldRetryAfterUnexpectedStreamCompletion() { return null; }).when(requestProxy).send(eq(retryRange), any()); - try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { + try (final var iter = GrpcKeyValueIterator.standard(range, requestProxy)) { assertNextKey(iter, "a"); assertNextKey(iter, "b"); assertNextKey(iter, "c"); @@ -139,8 +140,8 @@ public void shouldRetryAfterUnexpectedStreamCompletion() { @Test @SuppressWarnings("unchecked") public void shouldPropagateUnexpectedFailures() { - final var range = new Range( - RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)), + final var range = new Range<>( + RangeBound.inclusive(Bytes.wrap("a".getBytes(StandardCharsets.UTF_8))), RangeBound.unbounded() ); Mockito.doAnswer(invocation -> { @@ -150,7 +151,7 @@ public void shouldPropagateUnexpectedFailures() { return null; }).when(requestProxy).send(eq(range), any()); - try (final var iter = new GrpcKeyValueIterator(range, requestProxy)) { + try (final var iter = GrpcKeyValueIterator.standard(range, requestProxy)) { assertNextKey(iter, "a"); final var rs3Exception = assertThrows(RS3Exception.class, iter::next); assertThat(rs3Exception.getCause(), instanceOf(StatusRuntimeException.class)); @@ -159,7 +160,7 @@ public void shouldPropagateUnexpectedFailures() { } } - private void assertNextKey(GrpcKeyValueIterator iter, String key) { + private void assertNextKey(GrpcKeyValueIterator iter, String key) { assertThat(iter.hasNext(), is(true)); final var keyValue = iter.next(); final var keyBytes = keyValue.key.get(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientBasicEndToEndTest.java similarity index 54% rename from kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java rename to kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientBasicEndToEndTest.java index 4e4bfeb93..15b6dfb5a 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientBasicEndToEndTest.java @@ -6,11 +6,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; import io.grpc.ClientCall; @@ -20,7 +20,6 @@ import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.stub.StreamObserver; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -29,7 +28,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.KeyValueIterator; @@ -38,7 +37,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -class GrpcRS3ClientEndToEndTest { +class GrpcRS3ClientBasicEndToEndTest { private static final String SERVER_NAME = "localhost"; private static final long RETRY_TIMEOUT_MS = 10000; @@ -52,9 +51,15 @@ class GrpcRS3ClientEndToEndTest { @BeforeEach public void setUp() throws IOException { + final var service = new TestGrpcRs3Service( + STORE_ID, + LSS_ID, + PSS_ID, + new BasicKeyValueStore() + ); this.server = InProcessServerBuilder .forName(SERVER_NAME) - .addService(new TestRs3Service()) + .addService(service) .build() .start(); this.channel = Mockito.spy(InProcessChannelBuilder @@ -101,7 +106,7 @@ public void shouldPutAndGet() { LSS_ID, PSS_ID, Optional.of(5L), - key + Bytes.wrap(key) ); assertThat(getResult.isPresent(), is(true)); final var resultValue = getResult.get(); @@ -121,7 +126,7 @@ public void shouldScanAllKeyValues() { LSS_ID, PSS_ID, Optional.of(5L), - new Range( + new Range( RangeBound.unbounded(), RangeBound.unbounded() ) @@ -133,7 +138,7 @@ public void shouldScanAllKeyValues() { assertThat(iter.hasNext(), is(false)); } - private void writeWalSegment(long endOffset, List puts) { + private void writeWalSegment(long endOffset, List puts) { final var sendRecv = client.writeWalSegmentAsync( STORE_ID, LSS_ID, @@ -167,9 +172,9 @@ public void shouldScanKeyValuesInBoundedRange() { LSS_ID, PSS_ID, Optional.of(10L), - new Range( - RangeBound.inclusive("b".getBytes(StandardCharsets.UTF_8)), - RangeBound.exclusive("e".getBytes(StandardCharsets.UTF_8)) + new Range<>( + RangeBound.inclusive(Bytes.wrap("b".getBytes(StandardCharsets.UTF_8))), + RangeBound.exclusive(Bytes.wrap("e".getBytes(StandardCharsets.UTF_8))) ) ); @@ -205,7 +210,7 @@ public void shouldRetryRangeWithNetworkInterruption() { LSS_ID, PSS_ID, Optional.of(5L), - new Range( + new Range( RangeBound.unbounded(), RangeBound.unbounded() ) @@ -266,181 +271,82 @@ private Put buildPut(String key, String value) { return new Put(keyBytes, valueBytes); } - static class TestRs3Service extends RS3Grpc.RS3ImplBase { - private final AtomicLong offset = new AtomicLong(0); + static class BasicKeyValueStore implements TestGrpcRs3Service.KeyValueStore { private final ConcurrentSkipListMap table = new ConcurrentSkipListMap<>(); @Override - public void getOffsets( - final Rs3.GetOffsetsRequest req, - final StreamObserver responseObserver - ) { - final var storeId = new UUID( - req.getStoreId().getHigh(), - req.getStoreId().getLow() - ); - if (req.getPssId() != PSS_ID - || req.getLssId().getId() != LSS_ID.id() - || !storeId.equals(STORE_ID)) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + public void put(final Rs3.KeyValue kvProto) { + if (!kvProto.hasBasicKv()) { + throw new UnsupportedOperationException("Unsupported kv type"); } - - final var currentOffset = offset.get(); - final var result = Rs3.GetOffsetsResult - .newBuilder() - .setFlushedOffset(GrpcRs3Util.walOffsetProto(currentOffset)) - .setWrittenOffset(GrpcRs3Util.walOffsetProto(currentOffset)) - .build(); - responseObserver.onNext(result); - responseObserver.onCompleted(); + final var kv = kvProto.getBasicKv(); + final var keyBytes = Bytes.wrap(kv.getKey().getKey().toByteArray()); + final var valueBytes = Bytes.wrap(kv.getValue().getValue().toByteArray()); + table.put(keyBytes, valueBytes); } @Override - public void get( - final Rs3.GetRequest req, - final StreamObserver responseObserver - ) { - final var storeId = new UUID( - req.getStoreId().getHigh(), - req.getStoreId().getLow() - ); - if (req.getPssId() != PSS_ID - || req.getLssId().getId() != LSS_ID.id() - || !storeId.equals(STORE_ID)) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - return; + public void delete(final Rs3.Key keyProto) { + if (!keyProto.hasBasicKey()) { + throw new UnsupportedOperationException("Unsupported kv type"); } + final var keyBytes = Bytes.wrap(keyProto.getBasicKey().getKey().toByteArray()); + table.remove(keyBytes); + } - if (req.getExpectedWrittenOffset().getIsWritten()) { - if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - return; - } + @Override + public Optional get(final Rs3.Key keyProto) { + if (!keyProto.hasBasicKey()) { + throw new UnsupportedOperationException("Unsupported kv type"); } - - final var key = req.getKey().getBasicKey(); - final var kvBuilder = Rs3.BasicKeyValue.newBuilder().setKey(key); - - final var keyBytes = Bytes.wrap(key.getKey().toByteArray()); + final var keyBytes = Bytes.wrap(keyProto.getBasicKey().getKey().toByteArray()); final var valueBytes = table.get(keyBytes); - if (valueBytes != null) { - final var value = Rs3.BasicValue.newBuilder() - .setValue(ByteString.copyFrom(valueBytes.get())); - kvBuilder.setValue(value); + if (valueBytes == null) { + return Optional.empty(); + } else { + return Optional.of( + Rs3.KeyValue.newBuilder() + .setBasicKv(GrpcRs3Util.basicKeyValueProto(keyBytes.get(), valueBytes.get())) + .build() + ); } - - final var result = Rs3.GetResult - .newBuilder() - .setResult(Rs3.KeyValue.newBuilder().setBasicKv(kvBuilder)) - .build(); - responseObserver.onNext(result); - responseObserver.onCompleted(); } @Override - public void range( - final Rs3.RangeRequest req, - final StreamObserver responseObserver - ) { - final var storeId = new UUID( - req.getStoreId().getHigh(), - req.getStoreId().getLow() - ); - if (req.getPssId() != PSS_ID - || req.getLssId().getId() != LSS_ID.id() - || !storeId.equals(STORE_ID)) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - return; + public Stream range(final Rs3.Range rangeProto) { + if (!rangeProto.hasBasicRange()) { + throw new UnsupportedOperationException("Unsupported kv type"); } - if (req.getExpectedWrittenOffset().getIsWritten()) { - if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - return; - } - } - - final var range = GrpsRs3TestUtil.newRangeFromProto(req); - for (final var keyValueEntry : table.entrySet()) { - if (!range.contains(keyValueEntry.getKey().get())) { - continue; - } - - final var keyValue = GrpcRs3Util.basicKeyValueProto( - keyValueEntry.getKey().get(), - keyValueEntry.getValue().get() - ); - - final var keyValueResult = Rs3.RangeResult.newBuilder() - .setType(Rs3.RangeResult.Type.RESULT) - .setResult(Rs3.KeyValue.newBuilder().setBasicKv(keyValue)) - .build(); - - responseObserver.onNext(keyValueResult); - } + final var basicRange = rangeProto.getBasicRange(); + final var range = new Range<>( + decodeBound(basicRange.getFrom()), + decodeBound(basicRange.getTo()) + ); - final var endOfStream = Rs3.RangeResult.newBuilder() - .setType(Rs3.RangeResult.Type.END_OF_STREAM) - .build(); - responseObserver.onNext(endOfStream); - responseObserver.onCompleted(); + return table.entrySet().stream() + .filter(entry -> range.contains(entry.getKey())) + .map(entry -> Rs3.KeyValue.newBuilder().setBasicKv( + GrpcRs3Util.basicKeyValueProto(entry.getKey().get(), entry.getValue().get()) + ).build()); } - @Override - public StreamObserver writeWALSegmentStream( - final StreamObserver responseObserver - ) { - return new StreamObserver<>() { - @Override - public void onNext(final Rs3.WriteWALSegmentRequest req) { - final var storeId = new UUID( - req.getStoreId().getHigh(), - req.getStoreId().getLow() - ); - if (req.getPssId() != PSS_ID - || req.getLssId().getId() != LSS_ID.id() - || !storeId.equals(STORE_ID)) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - } - - if (req.getExpectedWrittenOffset().getIsWritten()) { - if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { - responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - return; - } - } - - TestRs3Service.this.offset.getAndUpdate( - current -> Math.max(current, req.getEndOffset()) - ); - if (req.hasPut()) { - final var kv = req.getPut().getKv().getBasicKv(); - final var keyBytes = Bytes.wrap(kv.getKey().getKey().toByteArray()); - final var valueBytes = Bytes.wrap(kv.getValue().getValue().toByteArray()); - table.put(keyBytes, valueBytes); - } else if (req.hasDelete()) { - final var key = req.getDelete().getKey().getBasicKey(); - final var keyBytes = Bytes.wrap(key.getKey().toByteArray()); - table.remove(keyBytes); - } - } - - @Override - public void onError(final Throwable throwable) { - responseObserver.onError(throwable); - } - - @Override - public void onCompleted() { - final var result = Rs3.WriteWALSegmentResult - .newBuilder() - .setFlushedOffset(GrpcRs3Util.walOffsetProto(offset.get())) - .build(); - responseObserver.onNext(result); - responseObserver.onCompleted(); + private RangeBound decodeBound(Rs3.BasicBound bound) { + if (bound.getType() == Rs3.BoundType.UNBOUNDED) { + return RangeBound.unbounded(); + } else { + final var key = Bytes.wrap(bound.getKey().getKey().toByteArray()); + if (bound.getType() == Rs3.BoundType.INCLUSIVE) { + return RangeBound.inclusive(key); + } else if (bound.getType() == Rs3.BoundType.EXCLUSIVE) { + return RangeBound.exclusive(key); + } else { + throw new UnsupportedOperationException("Unsupported bound type"); } - }; + } } } + + } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index b449e69ed..a7ba9bfdd 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -44,6 +44,7 @@ import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.StoreInfo; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.utils.WindowedKey; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; import dev.responsive.rs3.Rs3.CreateStoreResult; @@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -650,7 +652,13 @@ public void shouldGetWithExpectedWrittenOffset() { .build()); // when: - final var result = client.get(STORE_ID, LSS_ID, PSS_ID, Optional.of(123L), "foo".getBytes()); + final var result = client.get( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + Bytes.wrap("foo".getBytes()) + ); // then: assertThat(result.get(), is("bar".getBytes())); @@ -681,7 +689,13 @@ public void shouldGet() { ); // when: - final var result = client.get(STORE_ID, LSS_ID, PSS_ID, Optional.empty(), "foo".getBytes()); + final var result = client.get( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.empty(), + Bytes.wrap("foo".getBytes()) + ); // then: assertThat(result.get(), is("bar".getBytes())); @@ -705,7 +719,13 @@ public void shouldHandleNegativeGet() { ); // when: - final var result = client.get(STORE_ID, LSS_ID, PSS_ID, Optional.of(123L), "foo".getBytes()); + final var result = client.get( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + Bytes.wrap("foo".getBytes()) + ); // then: assertThat(result.isEmpty(), is(true)); @@ -719,7 +739,13 @@ public void shouldRetryGet() { .thenReturn(Rs3.GetResult.newBuilder().build()); // when: - final var result = client.get(STORE_ID, LSS_ID, PSS_ID, Optional.of(123L), "foo".getBytes()); + final var result = client.get( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + Bytes.wrap("foo".getBytes()) + ); // then: assertThat(result.isEmpty(), is(true)); @@ -732,10 +758,13 @@ public void shouldPropagateUnexpectedExceptionsFromGet() { .thenThrow(new StatusRuntimeException(Status.UNKNOWN)); // when: - final RS3Exception exception = assertThrows( - RS3Exception.class, - () -> client.get(STORE_ID, LSS_ID, PSS_ID, Optional.of(123L), "foo".getBytes()) - ); + final RS3Exception exception = assertThrows(RS3Exception.class, () -> client.get( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + Bytes.wrap("foo".getBytes()) + )); // then: assertThat(exception.getCause(), instanceOf(StatusRuntimeException.class)); @@ -755,7 +784,7 @@ public void shouldTimeoutGet() { LSS_ID, PSS_ID, Optional.of(123L), - "foo".getBytes() + Bytes.wrap("foo".getBytes()) )); // then: @@ -763,6 +792,87 @@ public void shouldTimeoutGet() { assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); } + @Test + public void shouldWindowedGet() { + final var windowTimestamp = 500L; + final var key = "foo".getBytes(); + + // given: + + final var kvProto = GrpcRs3Util.windowKeyValueProto( + new WindowedKey("foo".getBytes(StandardCharsets.UTF_8), windowTimestamp), + "bar".getBytes(StandardCharsets.UTF_8) + ); + when(stub.get(any())).thenReturn( + Rs3.GetResult.newBuilder() + .setResult(Rs3.KeyValue.newBuilder().setWindowKv(kvProto)) + .build()); + + // when: + final var result = client.windowedGet( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.empty(), + new WindowedKey(Bytes.wrap(key), windowTimestamp) + ); + + // then: + assertThat(result.get(), is("bar".getBytes())); + final var keyProto = GrpcRs3Util.windowKeyProto( + new WindowedKey("foo".getBytes(StandardCharsets.UTF_8), windowTimestamp) + ); + verify(stub).get(Rs3.GetRequest.newBuilder() + .setLssId(lssIdProto(LSS_ID)) + .setPssId(PSS_ID) + .setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) + .setStoreId(uuidToProto(STORE_ID)) + .setKey(Rs3.Key.newBuilder().setWindowKey(keyProto)) + .build() + ); + } + + @Test + public void shouldRetryWindowedGet() { + final var windowTimestamp = 500L; + final var key = "foo".getBytes(); + + // given: + when(stub.get(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) + .thenReturn(Rs3.GetResult.newBuilder().setResult( + Rs3.KeyValue.newBuilder() + .setWindowKv(GrpcRs3Util.windowKeyValueProto( + new WindowedKey("foo".getBytes(StandardCharsets.UTF_8), windowTimestamp), + "bar".getBytes(StandardCharsets.UTF_8) + )) + ).build()); + + // when: + final var result = client.windowedGet( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.empty(), + new WindowedKey(Bytes.wrap(key), windowTimestamp) + ); + + // then: + assertThat(result.get(), is("bar".getBytes())); + final var keyProto = GrpcRs3Util.windowKeyProto( + new WindowedKey("foo".getBytes(StandardCharsets.UTF_8), windowTimestamp) + ); + verify(stub, times(2)) + .get(Rs3.GetRequest.newBuilder() + .setLssId(lssIdProto(LSS_ID)) + .setPssId(PSS_ID) + .setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET) + .setStoreId(uuidToProto(STORE_ID)) + .setKey(Rs3.Key.newBuilder().setWindowKey(keyProto)) + .build() + ); + } + @Test @SuppressWarnings("unchecked") public void shouldRetryRangeRequest() { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientWindowEndToEndTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientWindowEndToEndTest.java new file mode 100644 index 000000000..e264da41c --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientWindowEndToEndTest.java @@ -0,0 +1,308 @@ +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.Range; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.kafka.internal.db.rs3.client.WalEntry; +import dev.responsive.kafka.internal.db.rs3.client.WindowedDelete; +import dev.responsive.kafka.internal.db.rs3.client.WindowedPut; +import dev.responsive.kafka.internal.utils.WindowedKey; +import dev.responsive.rs3.Rs3; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Stream; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class GrpcRS3ClientWindowEndToEndTest { + private static final String SERVER_NAME = "localhost"; + private static final long RETRY_TIMEOUT_MS = 10000; + private static final UUID STORE_ID = UUID.randomUUID(); + private static final int PSS_ID = 0; + private static final LssId LSS_ID = new LssId(PSS_ID); + + private Server server; + private ManagedChannel channel; + private GrpcRS3Client client; + + @BeforeEach + public void setUp() throws IOException { + final var service = new TestGrpcRs3Service( + STORE_ID, + LSS_ID, + PSS_ID, + new WindowKeyValueStore() + ); + this.server = InProcessServerBuilder + .forName(SERVER_NAME) + .addService(service) + .build() + .start(); + this.channel = Mockito.spy(InProcessChannelBuilder + .forName(SERVER_NAME) + .directExecutor() + .build()); + this.client = new GrpcRS3Client( + new PssStubsProvider(this.channel), + Time.SYSTEM, + RETRY_TIMEOUT_MS + ); + } + + @AfterEach + public void tearDown() { + this.channel.shutdownNow(); + this.server.shutdownNow(); + this.client.close(); + } + + @Test + public void shouldPutAndGet() { + final var key = windowedKey("foo", 100L); + final var value = "bar"; + writeWalSegment( + 10L, + Collections.singletonList(windowedPut(key, value)) + ); + + final var getResult = client.windowedGet( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(5L), + key + ); + assertThat(getResult.isPresent(), is(true)); + final var resultValue = getResult.get(); + assertThat(new String(resultValue, StandardCharsets.UTF_8), equalTo(value)); + } + + + @Test + public void shouldDelete() { + final var key = windowedKey("foo", 100L); + writeWalSegment( + 10L, + Collections.singletonList(windowedPut(key, "bar")) + ); + + assertThat(client.windowedGet( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(5L), + key + ).isPresent(), is(true)); + + writeWalSegment( + 10L, + Collections.singletonList(windowedDelete(key)) + ); + + assertThat(client.windowedGet( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(5L), + key + ).isPresent(), is(false)); + } + + @Test + public void shouldScanValuesInTimeWindowRange() { + writeWalSegment( + 10L, + Arrays.asList( + windowedPut(windowedKey("a", 100L), "1"), + windowedPut(windowedKey("b", 100L), "2"), + windowedPut(windowedKey("a", 200L), "3"), + windowedPut(windowedKey("b", 150L), "4"), + windowedPut(windowedKey("c", 200L), "5"), + windowedPut(windowedKey("a", 300L), "6") + ) + ); + + final var range = new Range<>( + RangeBound.inclusive(windowedKey("a", 100L)), + RangeBound.exclusive(windowedKey("a", 300L)) + ); + + try (final var iter = client.windowedRange( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.empty(), + range + )) { + assertNext(iter, windowedKey("a", 100L), "1"); + assertNext(iter, windowedKey("a", 200L), "3"); + assertThat(iter.hasNext(), is(false)); + } + } + + private void assertNext( + KeyValueIterator iter, + WindowedKey key, + String value + ) { + assertThat(iter.hasNext(), is(true)); + final var keyValue = iter.next(); + assertThat(keyValue.key, is(key)); + assertThat(Bytes.wrap(keyValue.value), is(Bytes.wrap(value.getBytes(StandardCharsets.UTF_8)))); + } + + private static WindowedKey windowedKey(String key, long windowTimestamp) { + return new WindowedKey( + key.getBytes(StandardCharsets.UTF_8), + windowTimestamp + ); + } + + private static WindowedPut windowedPut( + WindowedKey key, + String value + ) { + return new WindowedPut( + key.key.get(), + value.getBytes(StandardCharsets.UTF_8), + 0L, + key.windowStartMs + ); + } + + private static WindowedDelete windowedDelete( + WindowedKey key + ) { + return new WindowedDelete( + key.key.get(), + key.windowStartMs + ); + } + + private void writeWalSegment(long endOffset, List entries) { + final var sendRecv = client.writeWalSegmentAsync( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.empty(), + endOffset + ); + + entries.forEach(entry -> sendRecv.sender().sendNext(entry)); + sendRecv.sender().finish(); + + final var flushedOffset = sendRecv + .completion() + .toCompletableFuture() + .join(); + assertThat(flushedOffset, is(Optional.of(endOffset))); + } + + static class WindowKeyValueStore implements TestGrpcRs3Service.KeyValueStore { + private final ConcurrentSkipListMap table = new ConcurrentSkipListMap<>(); + + @Override + public void put(final Rs3.KeyValue kvProto) { + if (!kvProto.hasWindowKv()) { + throw new UnsupportedOperationException("Unsupported kv type"); + } + final var windowKv = kvProto.getWindowKv(); + final var windowKey = new WindowedKey( + Bytes.wrap(windowKv.getKey().getKey().toByteArray()), + windowKv.getKey().getWindowTimestamp() + ); + final var valueBytes = Bytes.wrap(windowKv.getValue().getValue().toByteArray()); + table.put(windowKey, valueBytes); + } + + @Override + public void delete(final Rs3.Key keyProto) { + if (!keyProto.hasWindowKey()) { + throw new UnsupportedOperationException("Unsupported kv type"); + } + final var windowKey = new WindowedKey( + Bytes.wrap(keyProto.getWindowKey().getKey().toByteArray()), + keyProto.getWindowKey().getWindowTimestamp() + ); + table.remove(windowKey); + } + + @Override + public Optional get(final Rs3.Key keyProto) { + if (!keyProto.hasWindowKey()) { + throw new UnsupportedOperationException("Unsupported kv type"); + } + final var windowKey = new WindowedKey( + Bytes.wrap(keyProto.getWindowKey().getKey().toByteArray()), + keyProto.getWindowKey().getWindowTimestamp() + ); + + final var valueBytes = table.get(windowKey); + if (valueBytes == null) { + return Optional.empty(); + } else { + return Optional.of( + Rs3.KeyValue.newBuilder() + .setWindowKv(GrpcRs3Util.windowKeyValueProto(windowKey, valueBytes.get())) + .build() + ); + } + } + + @Override + public Stream range(final Rs3.Range rangeProto) { + if (!rangeProto.hasWindowRange()) { + throw new UnsupportedOperationException("Unsupported kv type"); + } + + final var windowRange = rangeProto.getWindowRange(); + final var range = new Range<>( + decodeBound(windowRange.getFrom()), + decodeBound(windowRange.getTo()) + ); + + return table.entrySet().stream() + .filter(entry -> range.contains(entry.getKey())) + .map(entry -> Rs3.KeyValue.newBuilder().setWindowKv( + GrpcRs3Util.windowKeyValueProto(entry.getKey(), entry.getValue().get()) + ).build()); + } + + private RangeBound decodeBound(Rs3.WindowBound bound) { + if (bound.getType() == Rs3.BoundType.UNBOUNDED) { + return RangeBound.unbounded(); + } else { + final var windowKey = new WindowedKey( + Bytes.wrap(bound.getKey().getKey().toByteArray()), + bound.getKey().getWindowTimestamp() + ); + if (bound.getType() == Rs3.BoundType.INCLUSIVE) { + return RangeBound.inclusive(windowKey); + } else if (bound.getType() == Rs3.BoundType.EXCLUSIVE) { + return RangeBound.exclusive(windowKey); + } else { + throw new UnsupportedOperationException("Unsupported bound type"); + } + } + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java index 11ca07b11..c2794a2d5 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java @@ -12,8 +12,6 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; -import dev.responsive.kafka.internal.db.rs3.client.Range; -import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.rs3.Rs3; import java.nio.charset.StandardCharsets; @@ -30,31 +28,10 @@ public static Rs3.RangeResult newKeyValueResult(String key) { .build(); } - public static Rs3.RangeResult newEndOfStreamResult() { return Rs3.RangeResult.newBuilder() .setType(Rs3.RangeResult.Type.END_OF_STREAM) .build(); } - public static Range newRangeFromProto(Rs3.RangeRequest req) { - final var range = req.getRange().getBasicRange(); - final var startBound = newRangeBoundFromProto(range.getFrom()); - final var endBound = newRangeBoundFromProto(range.getTo()); - return new Range(startBound, endBound); - } - - private static RangeBound newRangeBoundFromProto(Rs3.BasicBound bound) { - switch (bound.getType()) { - case EXCLUSIVE: - return RangeBound.exclusive(bound.getKey().getKey().toByteArray()); - case INCLUSIVE: - return RangeBound.inclusive(bound.getKey().getKey().toByteArray()); - case UNBOUNDED: - return RangeBound.unbounded(); - default: - throw new IllegalArgumentException(String.format("Unknown range type %s", bound.getType())); - } - } - } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java new file mode 100644 index 000000000..5404eee46 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java @@ -0,0 +1,188 @@ +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.rs3.RS3Grpc; +import dev.responsive.rs3.Rs3; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +class TestGrpcRs3Service extends RS3Grpc.RS3ImplBase { + + interface KeyValueStore { + void put(Rs3.KeyValue kv); + + void delete(Rs3.Key key); + + Optional get(Rs3.Key key); + + Stream range(Rs3.Range range); + } + + private final UUID storeId; + private final LssId lssId; + private final int pssId; + + private final AtomicLong offset = new AtomicLong(0); + private final KeyValueStore store; + + public TestGrpcRs3Service( + final UUID storeId, + final LssId lssId, + final int pssId, + final KeyValueStore store + ) { + this.storeId = storeId; + this.lssId = lssId; + this.pssId = pssId; + this.store = store; + } + + @Override + public void getOffsets( + final Rs3.GetOffsetsRequest req, + final StreamObserver responseObserver + ) { + final var storeId = new UUID( + req.getStoreId().getHigh(), + req.getStoreId().getLow() + ); + if (req.getPssId() != pssId + || req.getLssId().getId() != lssId.id() + || !storeId.equals(this.storeId)) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + } + + final var currentOffset = offset.get(); + final var result = Rs3.GetOffsetsResult + .newBuilder() + .setFlushedOffset(GrpcRs3Util.walOffsetProto(currentOffset)) + .setWrittenOffset(GrpcRs3Util.walOffsetProto(currentOffset)) + .build(); + responseObserver.onNext(result); + responseObserver.onCompleted(); + } + + @Override + public void get( + final Rs3.GetRequest req, + final StreamObserver responseObserver + ) { + final var storeId = new UUID( + req.getStoreId().getHigh(), + req.getStoreId().getLow() + ); + if (req.getPssId() != pssId + || req.getLssId().getId() != lssId.id() + || !storeId.equals(this.storeId)) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + return; + } + + if (req.getExpectedWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + return; + } + } + + final var resultBldr = Rs3.GetResult.newBuilder(); + final var kv = store.get(req.getKey()); + kv.ifPresent(resultBldr::setResult); + responseObserver.onNext(resultBldr.build()); + responseObserver.onCompleted(); + } + + @Override + public void range( + final Rs3.RangeRequest req, + final StreamObserver responseObserver + ) { + final var storeId = new UUID( + req.getStoreId().getHigh(), + req.getStoreId().getLow() + ); + if (req.getPssId() != pssId + || req.getLssId().getId() != lssId.id() + || !storeId.equals(this.storeId)) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + return; + } + + if (req.getExpectedWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + return; + } + } + + store.range(req.getRange()).forEach(kv -> { + final var keyValueResult = Rs3.RangeResult.newBuilder() + .setType(Rs3.RangeResult.Type.RESULT) + .setResult(kv) + .build(); + responseObserver.onNext(keyValueResult); + }); + + final var endOfStream = Rs3.RangeResult.newBuilder() + .setType(Rs3.RangeResult.Type.END_OF_STREAM) + .build(); + responseObserver.onNext(endOfStream); + responseObserver.onCompleted(); + } + + @Override + public StreamObserver writeWALSegmentStream( + final StreamObserver responseObserver + ) { + return new StreamObserver<>() { + @Override + public void onNext(final Rs3.WriteWALSegmentRequest req) { + final var storeId = new UUID( + req.getStoreId().getHigh(), + req.getStoreId().getLow() + ); + if (req.getPssId() != pssId + || req.getLssId().getId() != lssId.id() + || !storeId.equals(TestGrpcRs3Service.this.storeId)) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + } + + if (req.getExpectedWrittenOffset().getIsWritten()) { + if (offset.get() < req.getExpectedWrittenOffset().getOffset()) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + return; + } + } + + TestGrpcRs3Service.this.offset.getAndUpdate( + current -> Math.max(current, req.getEndOffset()) + ); + if (req.hasPut()) { + store.put(req.getPut().getKv()); + } else if (req.hasDelete()) { + store.delete(req.getDelete().getKey()); + } + } + + @Override + public void onError(final Throwable throwable) { + responseObserver.onError(throwable); + } + + @Override + public void onCompleted() { + final var result = Rs3.WriteWALSegmentResult + .newBuilder() + .setFlushedOffset(GrpcRs3Util.walOffsetProto(offset.get())) + .build(); + responseObserver.onNext(result); + responseObserver.onCompleted(); + } + }; + } +}