diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java index a0c59b562..f648b7306 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java @@ -18,10 +18,14 @@ import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration; +import dev.responsive.kafka.internal.utils.MergeKeyValueIterator; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -133,7 +137,21 @@ public KeyValueIterator range( final Bytes to, final long streamTimeMs ) { - throw new UnsupportedOperationException(); + final RangeBound fromBound = RangeBound.inclusive(from.get()); + final RangeBound toBound = RangeBound.exclusive(to.get()); + final List> pssIters = new ArrayList<>(); + + for (int pssId : pssPartitioner.pssForLss(this.lssId)) { + pssIters.add(rs3Client.range( + storeId, + lssId, + pssId, + flushManager.writtenOffset(pssId), + fromBound, + toBound + )); + } + return new MergeKeyValueIterator<>(pssIters); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 7e02208a9..c2755b595 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -9,6 +9,8 @@ import java.util.UUID; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueIterator; public class MeteredRS3Client implements RS3Client { public static final String GROUP_NAME = "rs3-table-metrics"; @@ -90,6 +92,25 @@ public Optional get( return result; } + @Override + public KeyValueIterator range( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset, + final RangeBound from, + final RangeBound to + ) { + return delegate.range( + storeId, + lssId, + pssId, + expectedWrittenOffset, + from, + to + ); + } + @Override public List listStores() { final Instant start = Instant.now(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index 9baa2b191..4aa1586a1 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -15,6 +15,8 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueIterator; public interface RS3Client { CurrentOffsets getCurrentOffsets(UUID storeId, LssId lssId, int pssId); @@ -44,6 +46,15 @@ Optional get( byte[] key ); + KeyValueIterator range( + UUID storeId, + LssId lssId, + int pssId, + Optional expectedWrittenOffset, + RangeBound from, + RangeBound to + ); + List listStores(); void close(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3TransientException.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3TransientException.java index 2d8070ad7..439657b0e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3TransientException.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3TransientException.java @@ -18,4 +18,8 @@ public class RS3TransientException extends RS3Exception { public RS3TransientException(final Throwable cause) { super(cause); } + + public RS3TransientException(final String message) { + super(message); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java new file mode 100644 index 000000000..2f0b21191 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Range.java @@ -0,0 +1,64 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.Arrays; + +public class Range { + private final RangeBound start; + private final RangeBound end; + + public Range(RangeBound start, RangeBound end) { + this.start = start; + this.end = end; + } + + public RangeBound start() { + return start; + } + + public RangeBound end() { + return end; + } + + public boolean contains(byte[] key) { + return greaterThanStartBound(key) && lessThanEndBound(key); + } + + public boolean greaterThanStartBound(byte[] key) { + return start.map(new RangeBound.Mapper<>() { + @Override + public Boolean map(final RangeBound.InclusiveBound b) { + return Arrays.compare(b.key(), key) <= 0; + } + + @Override + public Boolean map(final RangeBound.ExclusiveBound b) { + return Arrays.compare(b.key(), key) < 0; + } + + @Override + public Boolean map(final RangeBound.Unbounded b) { + return true; + } + }); + } + + public boolean lessThanEndBound(byte[] key) { + return end.map(new RangeBound.Mapper<>() { + @Override + public Boolean map(final RangeBound.InclusiveBound b) { + return Arrays.compare(b.key(), key) >= 0; + } + + @Override + public Boolean map(final RangeBound.ExclusiveBound b) { + return Arrays.compare(b.key(), key) > 0; + } + + @Override + public Boolean map(final RangeBound.Unbounded b) { + return true; + } + }); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RangeBound.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RangeBound.java new file mode 100644 index 000000000..fc8b6a3e0 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RangeBound.java @@ -0,0 +1,121 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.Arrays; +import java.util.Objects; + +public interface RangeBound { + + T map(Mapper mapper); + + static Unbounded unbounded() { + return Unbounded.INSTANCE; + } + + static InclusiveBound inclusive(byte[] key) { + return new InclusiveBound(key); + } + + static ExclusiveBound exclusive(byte[] key) { + return new ExclusiveBound(key); + } + + class InclusiveBound implements RangeBound { + private final byte[] key; + + public InclusiveBound(final byte[] key) { + this.key = key; + } + + public byte[] key() { + return key; + } + + @Override + public T map(final Mapper mapper) { + return mapper.map(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final InclusiveBound that = (InclusiveBound) o; + return Objects.deepEquals(key, that.key); + } + + @Override + public int hashCode() { + return Arrays.hashCode(key); + } + } + + class ExclusiveBound implements RangeBound { + private final byte[] key; + + public ExclusiveBound(final byte[] key) { + this.key = key; + } + + public byte[] key() { + return key; + } + + @Override + public T map(final Mapper mapper) { + return mapper.map(this); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ExclusiveBound that = (ExclusiveBound) o; + return Objects.deepEquals(key, that.key); + } + + @Override + public int hashCode() { + return Arrays.hashCode(key); + } + } + + class Unbounded implements RangeBound { + private static final Unbounded INSTANCE = new Unbounded(); + + private Unbounded() {} + + @Override + public T map(final Mapper mapper) { + return mapper.map(this); + } + } + + interface Mapper { + T map(InclusiveBound b); + + T map(ExclusiveBound b); + + T map(Unbounded b); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java new file mode 100644 index 000000000..ccc2d65bc --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIterator.java @@ -0,0 +1,218 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import com.google.protobuf.ByteString; +import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; +import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.rs3.Rs3; +import io.grpc.stub.StreamObserver; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Internal iterator implementation which supports retries using RS3's asynchronous + * Range API. + */ +public class GrpcKeyValueIterator implements KeyValueIterator { + private static final Logger LOG = LoggerFactory.getLogger(GrpcKeyValueIterator.class); + + private final GrpcRangeRequestProxy requestProxy; + private final GrpcMessageQueue queue; + private RangeBound startBound; + private RangeResultObserver resultObserver; + + public GrpcKeyValueIterator( + RangeBound initialStartBound, + GrpcRangeRequestProxy requestProxy + ) { + this.requestProxy = requestProxy; + this.queue = new GrpcMessageQueue<>(); + this.startBound = initialStartBound; + sendRangeRequest(); + } + + private void sendRangeRequest() { + // Note that backoff on retry is handled internally by the request proxy + resultObserver = new RangeResultObserver(); + requestProxy.send(startBound, resultObserver); + } + + @Override + public boolean hasNext() { + return peekNextKeyValue().isPresent(); + } + + @Override + public KeyValue next() { + final var nextKeyValue = peekNextKeyValue(); + if (nextKeyValue.isPresent()) { + queue.poll(); + final var keyValue = nextKeyValue.get(); + this.startBound = RangeBound.exclusive(keyValue.key.get()); + return keyValue; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void close() { + if (resultObserver != null) { + resultObserver.cancel(); + } + } + + Optional> peekNextKeyValue() { + while (true) { + try { + final var message = queue.peek(); + return tryUnwrapKeyValue(message); + } catch (RS3TransientException e) { + queue.poll(); + sendRangeRequest(); + } catch (RuntimeException e) { + // Leave unexpected errors in the queue so that they will continue + // to be propagated. + throw e; + } + } + } + + private Optional> tryUnwrapKeyValue(final Message message) { + return message.map(new Mapper<>() { + @Override + public Optional> map(final EndOfStream endOfStream) { + return Optional.empty(); + } + + @Override + public Optional> map(final StreamError error) { + throw GrpcRs3Util.wrapThrowable(error.exception); + } + + @Override + public Optional> map(final Result result) { + final var key = Bytes.wrap(result.key.toByteArray()); + final var value = result.value.toByteArray(); + return Optional.of(new KeyValue<>(key, value)); + } + }); + } + + @Override + public Bytes peekNextKey() { + return peekNextKeyValue() + .map(bytesKeyValue -> bytesKeyValue.key) + .orElse(null); + } + + private class RangeResultObserver implements StreamObserver { + private final AtomicReference error = new AtomicReference<>(); + + @Override + public void onNext(final Rs3.RangeResult rangeResult) { + if (error.get() != null) { + LOG.debug("Failed to send range result since the observer has already failed"); + } else if (rangeResult.getType() == Rs3.RangeResult.Type.END_OF_STREAM) { + queue.put(new EndOfStream()); + } else { + final var result = rangeResult.getResult(); + queue.put(new Result(result.getKey(), result.getValue())); + } + } + + @Override + public void onError(final Throwable throwable) { + if (this.error.compareAndSet(null, throwable)) { + queue.put(new StreamError(throwable)); + } else { + LOG.debug("Failed to record error since the observer has already failed", throwable); + } + } + + @Override + public void onCompleted() { + // We treat this as a transient error because we are looking for the explicit + // END_OF_STREAM result that is sent by the server. + onError(new StreamCompletedException("onCompleted fired")); + } + + public void cancel() { + this.error.compareAndSet(null, new RS3Exception("Range result observer cancelled")); + } + } + + private interface Message { + T map(Mapper mapper); + } + + private static class EndOfStream implements Message { + @Override + public T map(final Mapper mapper) { + return mapper.map(this); + } + } + + private static class Result implements Message { + private final ByteString key; + private final ByteString value; + + private Result(final ByteString key, final ByteString value) { + this.key = key; + this.value = value; + } + + @Override + public T map(final Mapper mapper) { + return mapper.map(this); + } + } + + private static class StreamError implements Message { + private final Throwable exception; + + StreamError(final Throwable error) { + this.exception = error; + } + + @Override + public T map(final Mapper mapper) { + return mapper.map(this); + } + } + + private static class StreamCompletedException extends RS3TransientException { + private static final long serialVersionUID = 0L; + + public StreamCompletedException(final String message) { + super(message); + } + } + + private interface Mapper { + T map(EndOfStream endOfStream); + + T map(StreamError error); + + T map(Result result); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcMessageQueue.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcMessageQueue.java new file mode 100644 index 000000000..e03400cdb --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcMessageQueue.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class GrpcMessageQueue { + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private T next = null; + + void put(T message) { + try { + queue.put(message); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RS3Exception(e); + } + } + + T poll() { + var next = this.next; + if (next == null) { + next = takeFromQueue(); + } + this.next = null; + return next; + } + + T peek() { + var next = this.next; + if (next == null) { + next = takeFromQueue(); + } + this.next = next; + return next; + } + + private T takeFromQueue() { + try { + return queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RS3Exception(e); + } + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 08f6d9e95..a1ebc2523 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -25,11 +25,13 @@ import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.kafka.internal.db.rs3.client.Store; import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; +import io.grpc.stub.StreamObserver; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -37,8 +39,10 @@ import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.KeyValueIterator; public class GrpcRS3Client implements RS3Client { public static final long WAL_OFFSET_NONE = Long.MAX_VALUE; @@ -47,6 +51,10 @@ public class GrpcRS3Client implements RS3Client { private final Time time; private final long retryTimeoutMs; + // Using Kafka default backoff settings initially. We can pull them up + // if there is ever strong reason. + private final ExponentialBackoff backoff = new ExponentialBackoff(50, 2, 1000, 0.2); + @VisibleForTesting GrpcRS3Client(final PssStubsProvider stubs, final Time time, final long retryTimeoutMs) { this.stubs = Objects.requireNonNull(stubs); @@ -81,6 +89,14 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f ); } + private void withRetry(Runnable grpcOperation, Supplier opDescription) { + final Supplier voidSupplier = () -> { + grpcOperation.run(); + return null; + }; + withRetry(voidSupplier, opDescription); + } + private T withRetry(Supplier grpcOperation, Supplier opDescription) { final var deadlineMs = time.milliseconds() + retryTimeoutMs; return withRetryDeadline(grpcOperation, deadlineMs, opDescription); @@ -91,10 +107,6 @@ private T withRetryDeadline( long deadlineMs, Supplier opDescription ) { - // Using Kafka default backoff settings initially. We can pull them up - // if there is ever strong reason. - final var backoff = new ExponentialBackoff(50, 2, 1000, 0.2); - var retries = 0; long currentTimeMs; @@ -206,6 +218,28 @@ private Optional tryWriteWalSegment( return result; } + @Override + public KeyValueIterator range( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset, + RangeBound from, + RangeBound to + ) { + final var requestBuilder = Rs3.RangeRequest.newBuilder() + .setStoreId(uuidToUuidProto(storeId)) + .setLssId(lssIdProto(lssId)) + .setPssId(pssId) + .setTo(protoBound(to)); + expectedWrittenOffset.ifPresent(requestBuilder::setExpectedWrittenOffset); + final Supplier rangeDescription = + () -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")"; + final var asyncStub = stubs.stubs(storeId, pssId).asyncStub(); + final var rangeProxy = new RangeProxy(requestBuilder, asyncStub, rangeDescription); + return new GrpcKeyValueIterator(from, rangeProxy); + } + @Override public Optional get( final UUID storeId, @@ -273,6 +307,83 @@ private void checkField(final Supplier check, final String field) { } } + private Rs3.Bound protoBound(RangeBound bound) { + return bound.map(new RangeBound.Mapper<>() { + @Override + public Rs3.Bound map(final RangeBound.InclusiveBound b) { + return Rs3.Bound.newBuilder() + .setType(Rs3.Bound.Type.INCLUSIVE) + .setKey(ByteString.copyFrom(b.key())) + .build(); + } + + @Override + public Rs3.Bound map(final RangeBound.ExclusiveBound b) { + return Rs3.Bound.newBuilder() + .setType(Rs3.Bound.Type.EXCLUSIVE) + .setKey(ByteString.copyFrom(b.key())) + .build(); + } + + @Override + public Rs3.Bound map(final RangeBound.Unbounded b) { + return Rs3.Bound.newBuilder() + .setType(Rs3.Bound.Type.UNBOUNDED) + .build(); + } + }); + } + + private class RangeProxy implements GrpcRangeRequestProxy { + private final Rs3.RangeRequest.Builder requestBuilder; + private final RS3Grpc.RS3Stub stub; + private final Supplier opDescription; + private int attempts = 0; + private long deadlineMs = Long.MAX_VALUE; // Set upon the first retry + + private RangeProxy( + final Rs3.RangeRequest.Builder requestBuilder, + final RS3Grpc.RS3Stub stub, + final Supplier opDescription + ) { + this.requestBuilder = requestBuilder; + this.stub = stub; + this.opDescription = opDescription; + } + + @Override + public void send(final RangeBound start, final StreamObserver resultObserver) { + requestBuilder.setFrom(protoBound(start)); + + while (true) { + try { + final var currentTimeMs = time.milliseconds(); + if (currentTimeMs >= deadlineMs) { + throw new RS3TimeoutException("Timeout while attempting operation " + + opDescription.get()); + } + + if (attempts > 0) { + deadlineMs = Math.min(deadlineMs, currentTimeMs + retryTimeoutMs); + time.sleep(Math.min( + backoff.backoff(attempts), + Math.max(0, deadlineMs - currentTimeMs)) + ); + } + + attempts += 1; + stub.range(requestBuilder.build(), resultObserver); + return; + } catch (final Throwable t) { + var wrappedException = GrpcRs3Util.wrapThrowable(t); + if (!(wrappedException instanceof RS3TransientException)) { + throw wrappedException; + } + } + } + } + } + public static class Connector { private final Time time; private final String host; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java new file mode 100644 index 000000000..0b3c41aaf --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRangeRequestProxy.java @@ -0,0 +1,37 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + + + +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.rs3.Rs3; +import io.grpc.stub.StreamObserver; + +/** + * Helper class for sending and retrying Range requests to RS3. As new key-values + * are observed through the `StreamObserver`, the start of the bound should be + * updated. If the observer encounters an error, then it can retry with the updated + * start bound. + */ +public interface GrpcRangeRequestProxy { + /** + * Send a range request with an updated start bound. The results will be passed + * through to result observer. If a transient error is encountered through the + * observer, the caller can retry this operation with an updated `startBound`. + * + * @param start The updated start bound based on key-values seen with `resultObserver` + * @param resultObserver An observer for key-value results and the end of stream marker + */ + void send(RangeBound start, StreamObserver resultObserver); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java index 7af377c28..669881f63 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java @@ -25,12 +25,16 @@ public static RuntimeException wrapThrowable(Throwable t) { final var statusOpt = getGrpcStatus(t); if (statusOpt.isPresent()) { final var status = statusOpt.get(); - if (status.getCode() == Status.Code.UNAVAILABLE - || status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { + if (isRetriable(status)) { return new RS3TransientException(t); - } else { - return new RS3Exception(t); + } else if (status.getCode() == Status.Code.CANCELLED) { + final var cause = t.getCause(); + final var causeStatus = getGrpcStatus(cause); + if (causeStatus.map(GrpcRs3Util::isRetriable).orElse(false)) { + return new RS3TransientException(t); + } } + return new RS3Exception(t); } else if (t instanceof RuntimeException) { return (RuntimeException) t; } else { @@ -38,6 +42,11 @@ public static RuntimeException wrapThrowable(Throwable t) { } } + private static boolean isRetriable(Status status) { + return status.getCode() == Status.Code.UNAVAILABLE + || status.getCode() == Status.Code.RESOURCE_EXHAUSTED; + } + private static Optional getGrpcStatus(Throwable t) { if (t instanceof StatusException) { return Optional.of(((StatusException) t).getStatus()); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Iterators.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Iterators.java index af200c13c..cd457073d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Iterators.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Iterators.java @@ -108,7 +108,7 @@ public static KeyValueIterator, byte[]> windowedKey( public static , V> KeyValueIterator wrapped( final List> delegates ) { - return new MultiPartitionRangeIterator<>(delegates); + return new MergeKeyValueIterator<>(delegates); } public static KeyValueIterator mapKeys( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/MultiPartitionRangeIterator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/MergeKeyValueIterator.java similarity index 93% rename from kafka-client/src/main/java/dev/responsive/kafka/internal/utils/MultiPartitionRangeIterator.java rename to kafka-client/src/main/java/dev/responsive/kafka/internal/utils/MergeKeyValueIterator.java index 628af2244..4618887f5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/MultiPartitionRangeIterator.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/MergeKeyValueIterator.java @@ -21,12 +21,12 @@ // basically the same exact thing but with one remote partition iterator and // one local commit buffer iterator, assuming we can extract all the commit buffer // specific implementation details (which should probably be done anyways) -public class MultiPartitionRangeIterator, V> +public class MergeKeyValueIterator, V> implements KeyValueIterator { private final PriorityQueue nextResults = new PriorityQueue<>(); - public MultiPartitionRangeIterator(final List> delegates) { + public MergeKeyValueIterator(final List> delegates) { for (final var iter : delegates) { if (iter.hasNext()) { final NextResult next = new NextResult(iter.next(), iter); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java new file mode 100644 index 000000000..e2ec20da3 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcKeyValueIteratorTest.java @@ -0,0 +1,152 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newEndOfStreamResult; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newKeyValueResult; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; + +import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.rs3.Rs3; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class GrpcKeyValueIteratorTest { + + @Mock + private GrpcRangeRequestProxy requestProxy; + + @Test + @SuppressWarnings("unchecked") + public void shouldIterateKeyValueResults() { + final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + Mockito.doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onNext(newKeyValueResult("a")); + observer.onNext(newKeyValueResult("b")); + observer.onNext(newKeyValueResult("c")); + observer.onNext(newEndOfStreamResult()); + observer.onCompleted(); + return null; + }).when(requestProxy).send(eq(startBound), any()); + + try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + assertNextKey(iter, "a"); + assertNextKey(iter, "b"); + assertNextKey(iter, "c"); + assertThat(iter.hasNext(), is(false)); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldRetryRangeRequestAfterTransientFailure() { + final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + Mockito.doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onNext(newKeyValueResult("a")); + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return null; + }).when(requestProxy).send(eq(startBound), any()); + + final var retryStartBound = RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)); + Mockito.doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onNext(newKeyValueResult("b")); + observer.onNext(newKeyValueResult("c")); + observer.onNext(newEndOfStreamResult()); + observer.onCompleted(); + return null; + }).when(requestProxy).send(eq(retryStartBound), any()); + + try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + assertNextKey(iter, "a"); + assertNextKey(iter, "b"); + assertNextKey(iter, "c"); + assertThat(iter.hasNext(), is(false)); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldRetryAfterUnexpectedStreamCompletion() { + final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + Mockito.doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onNext(newKeyValueResult("a")); + observer.onCompleted(); + return null; + }).when(requestProxy).send(eq(startBound), any()); + + final var retryStartBound = RangeBound.exclusive("a".getBytes(StandardCharsets.UTF_8)); + Mockito.doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onNext(newKeyValueResult("b")); + observer.onNext(newKeyValueResult("c")); + observer.onNext(newEndOfStreamResult()); + observer.onCompleted(); + return null; + }).when(requestProxy).send(eq(retryStartBound), any()); + + try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + assertNextKey(iter, "a"); + assertNextKey(iter, "b"); + assertNextKey(iter, "c"); + assertThat(iter.hasNext(), is(false)); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldPropagateUnexpectedFailures() { + final var startBound = RangeBound.inclusive("a".getBytes(StandardCharsets.UTF_8)); + Mockito.doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onNext(newKeyValueResult("a")); + observer.onError(new StatusRuntimeException(Status.UNKNOWN)); + return null; + }).when(requestProxy).send(eq(startBound), any()); + + try (final var iter = new GrpcKeyValueIterator(startBound, requestProxy)) { + assertNextKey(iter, "a"); + final var rs3Exception = assertThrows(RS3Exception.class, iter::next); + assertThat(rs3Exception.getCause(), instanceOf(StatusRuntimeException.class)); + final var statusException = (StatusRuntimeException) rs3Exception.getCause(); + assertThat(statusException.getStatus().getCode(), is(Status.Code.UNKNOWN)); + } + } + + private void assertNextKey(GrpcKeyValueIterator iter, String key) { + assertThat(iter.hasNext(), is(true)); + final var keyValue = iter.next(); + final var keyBytes = keyValue.key.get(); + final var keyString = new String(keyBytes, StandardCharsets.UTF_8); + assertThat(keyString, is(key)); + } + + +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcMessageQueueTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcMessageQueueTest.java new file mode 100644 index 000000000..a55329000 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcMessageQueueTest.java @@ -0,0 +1,43 @@ +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.util.concurrent.Executors; +import org.junit.jupiter.api.Test; + +class GrpcMessageQueueTest { + + @Test + public void shouldPropagateAllValuesInOrder() throws Exception { + final var queue = new GrpcMessageQueue(); + final var executor = Executors.newFixedThreadPool(2); + final var producer = executor.submit(() -> { + for (int i = 0; i < 100; i++) { + queue.put(i); + } + return true; + }); + final var consumer = executor.submit(() -> { + for (int i = 0; i < 100; i++) { + assertThat(queue.peek(), is(i)); + assertThat(queue.poll(), is(i)); + } + return true; + }); + executor.shutdown(); + assertThat(producer.get(), is(true)); + assertThat(consumer.get(), is(true)); + } + + @Test + public void shouldBlockPollWhileWaitingValue() throws Exception { + final var queue = new GrpcMessageQueue(); + final var executor = Executors.newFixedThreadPool(1); + final var consumer = executor.submit(queue::poll); + assertThat(consumer.isDone(), is(false)); + queue.put(5); + assertThat(consumer.get(), is(5)); + } + +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java index f004e0120..ddf14fda1 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientEndToEndTest.java @@ -9,6 +9,7 @@ import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.Put; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; import io.grpc.ClientCall; @@ -20,13 +21,17 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.KeyValueIterator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -102,6 +107,109 @@ public void shouldPutAndGet() { assertThat(resultValue, equalTo(value)); } + @Test + public void shouldScanAllKeyValues() { + writeWalSegment(5L, Arrays.asList( + buildPut("a", "foo"), + buildPut("b", "bar"), + buildPut("c", "baz") + )); + + final var iter = client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(5L), + RangeBound.unbounded(), + RangeBound.unbounded() + ); + + assertNext(iter, "a", "foo"); + assertNext(iter, "b", "bar"); + assertNext(iter, "c", "baz"); + assertThat(iter.hasNext(), is(false)); + } + + private void writeWalSegment(long endOffset, List puts) { + final var sendRecv = client.writeWalSegmentAsync( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.empty(), + endOffset + ); + + puts.forEach(put -> sendRecv.sender().sendNext(put)); + sendRecv.sender().finish(); + + final var flushedOffset = sendRecv + .completion() + .toCompletableFuture() + .join(); + assertThat(flushedOffset, is(Optional.of(endOffset))); + } + + @Test + public void shouldScanKeyValuesInBoundedRange() { + writeWalSegment(10L, Arrays.asList( + buildPut("a", "foo"), + buildPut("b", "bar"), + buildPut("c", "baz"), + buildPut("d", "raz"), + buildPut("e", "taz") + )); + + final var iter = client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(10L), + RangeBound.inclusive("b".getBytes(StandardCharsets.UTF_8)), + RangeBound.exclusive("e".getBytes(StandardCharsets.UTF_8)) + ); + + assertNext(iter, "b", "bar"); + assertNext(iter, "c", "baz"); + assertNext(iter, "d", "raz"); + assertThat(iter.hasNext(), is(false)); + } + + @Test + public void shouldRetryRangeWithNetworkInterruption() { + writeWalSegment(5L, Arrays.asList( + buildPut("a", "foo"), + buildPut("b", "bar"), + buildPut("c", "baz") + )); + + Mockito.doAnswer(invocation -> { + @SuppressWarnings("unchecked") + final var call = (ClientCall) + invocation.callRealMethod(); + final var callSpy = Mockito.spy(call); + Mockito.doThrow(new StatusRuntimeException(Status.UNAVAILABLE)) + .when(callSpy) + .sendMessage(any()); + return callSpy; + }).doCallRealMethod() + .when(channel) + .newCall(eq(RS3Grpc.getRangeMethod()), any()); + + final var iter = client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(5L), + RangeBound.unbounded(), + RangeBound.unbounded() + ); + + assertNext(iter, "a", "foo"); + assertNext(iter, "b", "bar"); + assertNext(iter, "c", "baz"); + assertThat(iter.hasNext(), is(false)); + } + @Test public void shouldRetryPutWithNetworkInterruption() { Mockito.doAnswer(invocation -> { @@ -129,9 +237,31 @@ public void shouldRetryPutWithNetworkInterruption() { assertThat(flushedOffset, is(Optional.of(5L))); } + private void assertNext( + KeyValueIterator iter, + String key, + String value + ) { + assertThat(iter.hasNext(), is(true)); + final var keyValue = iter.next(); + assertThat(keyValue.key, is(utf8Bytes(key))); + assertThat(Bytes.wrap(keyValue.value), is(utf8Bytes(value))); + } + + private Bytes utf8Bytes(String s) { + final var bytes = s.getBytes(StandardCharsets.UTF_8); + return Bytes.wrap(bytes); + } + + private Put buildPut(String key, String value) { + final var keyBytes = utf8Bytes(key).get(); + final var valueBytes = value == null ? null : utf8Bytes(value).get(); + return new Put(keyBytes, valueBytes); + } + static class TestRs3Service extends RS3Grpc.RS3ImplBase { private final AtomicLong offset = new AtomicLong(0); - private final ConcurrentMap table = new ConcurrentHashMap<>(); + private final ConcurrentSkipListMap table = new ConcurrentSkipListMap<>(); @Override public void getOffsets( @@ -183,8 +313,13 @@ public void get( final var keyValueResult = Rs3.KeyValue .newBuilder() - .setKey(req.getKey()) - .setValue(table.get(req.getKey())); + .setKey(req.getKey()); + final var key = Bytes.wrap(req.getKey().toByteArray()); + final var value = table.get(key); + if (value != null) { + keyValueResult.setValue(ByteString.copyFrom(value.get())); + } + final var result = Rs3.GetResult .newBuilder() .setResult(keyValueResult) @@ -193,6 +328,55 @@ public void get( responseObserver.onCompleted(); } + @Override + public void range( + final Rs3.RangeRequest req, + final StreamObserver responseObserver + ) { + final var storeId = new UUID( + req.getStoreId().getHigh(), + req.getStoreId().getLow() + ); + if (req.getPssId() != PSS_ID + || req.getLssId().getId() != LSS_ID.id() + || !storeId.equals(STORE_ID)) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + return; + } + + if (req.getExpectedWrittenOffset() != GrpcRS3Client.WAL_OFFSET_NONE) { + if (offset.get() < req.getExpectedWrittenOffset()) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); + return; + } + } + + final var range = GrpsRs3TestUtil.newRangeFromProto(req); + for (final var keyValueEntry : table.entrySet()) { + if (!range.contains(keyValueEntry.getKey().get())) { + continue; + } + + final var keyValue = Rs3.KeyValue + .newBuilder() + .setKey(ByteString.copyFrom(keyValueEntry.getKey().get())) + .setValue(ByteString.copyFrom(keyValueEntry.getValue().get())); + + final var keyValueResult = Rs3.RangeResult.newBuilder() + .setType(Rs3.RangeResult.Type.RESULT) + .setResult(keyValue) + .build(); + + responseObserver.onNext(keyValueResult); + } + + final var endOfStream = Rs3.RangeResult.newBuilder() + .setType(Rs3.RangeResult.Type.END_OF_STREAM) + .build(); + responseObserver.onNext(endOfStream); + responseObserver.onCompleted(); + } + @Override public StreamObserver writeWALSegmentStream( final StreamObserver responseObserver @@ -221,10 +405,13 @@ public void onNext(final Rs3.WriteWALSegmentRequest req) { current -> Math.max(current, req.getEndOffset()) ); final var put = req.getPut(); + final var keyBytes = Bytes.wrap(put.getKey().toByteArray()); + if (put.hasValue()) { - table.put(put.getKey(), put.getValue()); + final var valueBytes = Bytes.wrap(put.getValue().toByteArray()); + table.put(keyBytes, valueBytes); } else { - table.remove(put.getKey()); + table.remove(keyBytes); } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index a1ff5ee7d..0ebce797c 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -12,15 +12,18 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newEndOfStreamResult; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.reset; @@ -33,6 +36,7 @@ import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; @@ -47,6 +51,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.common.utils.MockTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -741,6 +746,126 @@ public void shouldTimeoutGet() { assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); } + @Test + @SuppressWarnings("unchecked") + public void shouldRetryRangeRequest() { + final var attemptCount = new AtomicInteger(0); + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + if (attemptCount.incrementAndGet() < 3) { + throw new StatusRuntimeException(Status.UNAVAILABLE); + } else { + observer.onNext(newEndOfStreamResult()); + } + return null; + }).when(asyncStub).range(any(), any()); + + try (final var iter = client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + RangeBound.unbounded(), + RangeBound.unbounded() + )) { + assertThat(iter.hasNext(), is(false)); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldRetryAfterObserverOnError() { + final var attemptCount = new AtomicInteger(0); + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + if (attemptCount.getAndIncrement() == 0) { + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + } else { + observer.onNext(newEndOfStreamResult()); + } + return null; + }).when(asyncStub).range(any(), any()); + + final var startTimeMs = time.milliseconds(); + try (final var iter = client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + RangeBound.unbounded(), + RangeBound.unbounded() + )) { + assertThat(iter.hasNext(), is(false)); + } + // Expect some backoff after the retry. + assertThat(time.milliseconds(), greaterThan(startTimeMs)); + } + + @Test + public void shouldTimeoutRangeRequest() { + doThrow(new StatusRuntimeException(Status.UNAVAILABLE)) + .when(asyncStub) + .range(any(), any()); + + final var startTimeMs = time.milliseconds(); + assertThrows(RS3TimeoutException.class, () -> client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + RangeBound.unbounded(), + RangeBound.unbounded() + )); + var endTimeMs = time.milliseconds(); + assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); + } + + @Test + public void shouldPropagateUnexpectedErrorInRangeRequest() { + doThrow(new StatusRuntimeException(Status.UNKNOWN)) + .when(asyncStub) + .range(any(), any()); + + final var rs3Exception = assertThrows(RS3Exception.class, () -> client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + RangeBound.unbounded(), + RangeBound.unbounded() + )); + + assertThat(rs3Exception.getCause(), is(instanceOf(StatusRuntimeException.class))); + assertThat( + ((StatusRuntimeException) rs3Exception.getCause()).getStatus().getCode(), + is(Status.Code.UNKNOWN) + ); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldTimeoutAfterObserverOnError() { + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return null; + }).when(asyncStub).range(any(), any()); + + final var startTimeMs = time.milliseconds(); + try ( + final var iter = client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + RangeBound.unbounded(), + RangeBound.unbounded() + ) + ) { + assertThrows(RS3TimeoutException.class, iter::hasNext); + } + } + @Test public void shouldListStores() { // given: @@ -826,6 +951,32 @@ public void shouldTimeoutListStores() { assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); } + @Test + @SuppressWarnings("unchecked") + public void shouldPropagateUnexpectedExceptionFromObserverOnError() { + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1, StreamObserver.class); + observer.onError(new StatusRuntimeException(Status.UNKNOWN)); + return null; + }).when(asyncStub).range(any(), any()); + + try (final var iter = client.range( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(123L), + RangeBound.unbounded(), + RangeBound.unbounded() + )) { + final var rs3Exception = assertThrows(RS3Exception.class, iter::hasNext); + assertThat(rs3Exception.getCause(), is(instanceOf(StatusRuntimeException.class))); + assertThat( + ((StatusRuntimeException) rs3Exception.getCause()).getStatus().getCode(), + is(Status.Code.UNKNOWN) + ); + } + } + private StreamObserver verifyWalSegmentResultObserver() { verify(asyncStub).writeWALSegmentStream(writeWALSegmentResultObserverCaptor.capture()); return writeWALSegmentResultObserverCaptor.getValue(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java new file mode 100644 index 000000000..3d8730ffa --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpsRs3TestUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import com.google.protobuf.ByteString; +import dev.responsive.kafka.internal.db.rs3.client.Range; +import dev.responsive.kafka.internal.db.rs3.client.RangeBound; +import dev.responsive.rs3.Rs3; + +public class GrpsRs3TestUtil { + + public static Rs3.RangeResult newKeyValueResult(String key) { + final var keyValue = Rs3.KeyValue.newBuilder() + .setKey(ByteString.copyFromUtf8(key)) + .setValue(ByteString.copyFromUtf8("dummy")) + .build(); + return Rs3.RangeResult.newBuilder() + .setType(Rs3.RangeResult.Type.RESULT) + .setResult(keyValue) + .build(); + } + + + public static Rs3.RangeResult newEndOfStreamResult() { + return Rs3.RangeResult.newBuilder() + .setType(Rs3.RangeResult.Type.END_OF_STREAM) + .build(); + } + + public static Range newRangeFromProto(Rs3.RangeRequest req) { + final var startBound = newRangeBoundFromProto(req.getFrom()); + final var endBound = newRangeBoundFromProto(req.getTo()); + return new Range(startBound, endBound); + } + + private static RangeBound newRangeBoundFromProto(Rs3.Bound bound) { + switch (bound.getType()) { + case EXCLUSIVE: + return RangeBound.exclusive(bound.getKey().toByteArray()); + case INCLUSIVE: + return RangeBound.inclusive(bound.getKey().toByteArray()); + case UNBOUNDED: + return RangeBound.unbounded(); + default: + throw new IllegalArgumentException(String.format("Unknown range type %s", bound.getType())); + } + } + +}