diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index c03fb0c8a..eab53946a 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit c03fb0c8a3d3b8521340434bc9a5043f231f3758 +Subproject commit eab53946afd1c8e6ef704435338854df3d949e7f diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 963321843..7e02208a9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -90,6 +90,14 @@ public Optional get( return result; } + @Override + public List listStores() { + final Instant start = Instant.now(); + final List result = delegate.listStores(); + getSensor.record(Duration.between(start, Instant.now()).toNanos()); + return result; + } + public void close() { this.metrics.removeSensor(GET_SENSOR_NAME); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index f1190adc0..9baa2b191 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -44,5 +44,7 @@ Optional get( byte[] key ); + List listStores(); + void close(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java new file mode 100644 index 000000000..01239d6cd --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java @@ -0,0 +1,43 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.List; +import java.util.UUID; + +public class Store { + + private final UUID storeId; + private final List pssIds; + + public Store(final UUID storeId, final List pssIds) { + this.storeId = storeId; + this.pssIds = pssIds; + } + + public UUID storeId() { + return storeId; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Store store = (Store) o; + + if (!storeId.equals(store.storeId)) { + return false; + } + return pssIds.equals(store.pssIds); + } + + @Override + public int hashCode() { + int result = storeId.hashCode(); + result = 31 * result + pssIds.hashCode(); + return result; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 30bc9bf13..08f6d9e95 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -12,6 +12,10 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; +import static dev.responsive.kafka.internal.utils.Utils.uuidProtoToUuid; +import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto; + import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -21,6 +25,7 @@ import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.kafka.internal.db.rs3.client.Store; import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; @@ -31,6 +36,7 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.Time; @@ -57,7 +63,7 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); final Rs3.GetOffsetsRequest request = Rs3.GetOffsetsRequest.newBuilder() - .setStoreId(uuidProto(storeId)) + .setStoreId(uuidToUuidProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .build(); @@ -130,7 +136,7 @@ public StreamSenderMessageReceiver> writeWalSegmentAsyn final var streamSender = new GrpcStreamSender( entry -> { final var entryBuilder = Rs3.WriteWALSegmentRequest.newBuilder() - .setStoreId(uuidProto(storeId)) + .setStoreId(uuidToUuidProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setEndOffset(endOffset) @@ -209,7 +215,7 @@ public Optional get( final byte[] key ) { final var requestBuilder = Rs3.GetRequest.newBuilder() - .setStoreId(uuidProto(storeId)) + .setStoreId(uuidToUuidProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setKey(ByteString.copyFrom(key)); @@ -229,17 +235,20 @@ public Optional get( return Optional.of(keyValue.getValue().toByteArray()); } - private Rs3.UUID uuidProto(final UUID uuid) { - return Rs3.UUID.newBuilder() - .setHigh(uuid.getMostSignificantBits()) - .setLow(uuid.getLeastSignificantBits()) - .build(); - } + @Override + public List listStores() { + final var request = Rs3.ListStoresRequest.newBuilder().build(); + final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub(); - private Rs3.LSSId lssIdProto(final LssId lssId) { - return Rs3.LSSId.newBuilder() - .setId(lssId.id()) - .build(); + final Rs3.ListStoresResult result = withRetry( + () -> stub.listStores(request), + () -> "ListStores()" + ); + + return result.getStoresList() + .stream() + .map(t -> new Store(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList())) + .collect(Collectors.toList()); } private void addWalEntryToSegment( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java index 7b9235130..1d1390476 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java @@ -20,11 +20,16 @@ class PssStubsProvider { private static final Logger LOG = LoggerFactory.getLogger(PssStubsProvider.class); private final ManagedChannel channel; + private final Stubs globalStubs; private final ConcurrentMap stubs = new ConcurrentHashMap<>(); @VisibleForTesting PssStubsProvider(final ManagedChannel channel) { this.channel = channel; + this.globalStubs = new Stubs( + RS3Grpc.newBlockingStub(channel), + RS3Grpc.newStub(channel) + ); } static PssStubsProvider connect( @@ -60,6 +65,10 @@ Stubs stubs(final UUID storeId, final int pssId) { }); } + Stubs globalStubs() { + return globalStubs; + } + @VisibleForTesting static class Stubs { private final RS3Grpc.RS3BlockingStub syncStub; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java index 053ed7b33..dc9cff5ef 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java @@ -12,6 +12,9 @@ package dev.responsive.kafka.internal.utils; +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.rs3.Rs3; +import java.util.UUID; import java.util.regex.Pattern; import org.apache.kafka.common.utils.Bytes; import org.slf4j.Logger; @@ -126,4 +129,21 @@ public static Bytes incrementWithoutOverflow(final Bytes input) { return null; } } + + public static UUID uuidProtoToUuid(final Rs3.UUID uuid) { + return new UUID(uuid.getHigh(), uuid.getLow()); + } + + public static Rs3.UUID uuidToUuidProto(final UUID uuid) { + return Rs3.UUID.newBuilder() + .setHigh(uuid.getMostSignificantBits()) + .setLow(uuid.getLeastSignificantBits()) + .build(); + } + + public static Rs3.LSSId lssIdProto(final LssId lssId) { + return Rs3.LSSId.newBuilder() + .setId(lssId.id()) + .build(); + } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index b55c0bb57..a1ff5ee7d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -12,7 +12,10 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; +import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -21,6 +24,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,6 +36,8 @@ import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; +import dev.responsive.rs3.Rs3.ListStoresResult; +import dev.responsive.rs3.Rs3.Store; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -55,6 +61,7 @@ class GrpcRS3ClientTest { private static final UUID STORE_ID = new UUID(100, 200); private static final LssId LSS_ID = new LssId(10); private static final int PSS_ID = 1; + private static final int PSS_ID_2 = 2; @Mock private RS3Grpc.RS3BlockingStub stub; @@ -78,6 +85,10 @@ public void setup() { stub, asyncStub )); + lenient().when(stubs.globalStubs()).thenReturn(new PssStubsProvider.Stubs( + stub, + asyncStub + )); client = new GrpcRS3Client(stubs, time, retryTimeoutMs); } @@ -239,7 +250,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto(put1)) @@ -248,7 +259,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto(put2)) @@ -278,7 +289,7 @@ public void shouldWriteWalEntriesWithExpectedWrittenOffsetNone() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(GrpcRS3Client.WAL_OFFSET_NONE) .setEndOffset(20) .setPut(putProto(put1)) @@ -396,7 +407,7 @@ public void shouldWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -406,7 +417,7 @@ public void shouldWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(1))) @@ -459,7 +470,7 @@ public void shouldRetryWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -469,7 +480,7 @@ public void shouldRetryWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(1))) @@ -520,7 +531,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInOnNext() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -570,7 +581,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInFinish() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -631,7 +642,7 @@ public void shouldGetWithExpectedWrittenOffset() { verify(stub).get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(123L) .setKey(ByteString.copyFromUtf8("foo")) .build() @@ -658,7 +669,7 @@ public void shouldGet() { verify(stub).get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setKey(ByteString.copyFromUtf8("foo")) .build() ); @@ -730,22 +741,94 @@ public void shouldTimeoutGet() { assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); } - private StreamObserver verifyWalSegmentResultObserver() { - verify(asyncStub).writeWALSegmentStream(writeWALSegmentResultObserverCaptor.capture()); - return writeWALSegmentResultObserverCaptor.getValue(); + @Test + public void shouldListStores() { + // given: + when(stub.listStores(any())).thenReturn( + ListStoresResult.newBuilder() + .addStores(Store.newBuilder() + .setStoreId(uuidToUuidProto(STORE_ID)) + .addAllPssIds(List.of(PSS_ID, PSS_ID_2)) + .build() + ).build() + ); + + // when: + final var result = client.listStores(); + + // then: + final var expected = new dev.responsive.kafka.internal.db.rs3.client.Store( + STORE_ID, List.of(PSS_ID, PSS_ID_2) + ); + assertThat(result.size(), is(1)); + assertThat(result.get(0), equalTo(expected)); + verify(stub).listStores(Rs3.ListStoresRequest.newBuilder().build()); + } + + @Test + public void shouldHandleEmptyStoresList() { + // given: + when(stub.listStores(any())) + .thenReturn(ListStoresResult.newBuilder().build()); + + // when: + final var result = client.listStores(); + + // then: + assertThat(result.size(), is(0)); + verify(stub).listStores(Rs3.ListStoresRequest.newBuilder().build()); + } + + @Test + public void shouldRetryListStores() { + // given: + when(stub.listStores(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) + .thenReturn(ListStoresResult.newBuilder().build()); + + // when: + final var result = client.listStores(); + + // then: + assertThat(result.size(), is(0)); + verify(stub, times(2)).listStores(Rs3.ListStoresRequest.newBuilder().build()); } - private Rs3.UUID uuidProto(final UUID uuid) { - return Rs3.UUID.newBuilder() - .setHigh(uuid.getMostSignificantBits()) - .setLow(uuid.getLeastSignificantBits()) - .build(); + @Test + public void shouldPropagateUnexpectedExceptionsFromListStores() { + // given: + when(stub.listStores(any())) + .thenThrow(new StatusRuntimeException(Status.UNKNOWN)); + + // when: + final RS3Exception exception = assertThrows( + RS3Exception.class, + () -> client.listStores() + ); + + // then: + assertThat(exception.getCause(), instanceOf(StatusRuntimeException.class)); + assertThat(((StatusRuntimeException) exception.getCause()).getStatus(), is(Status.UNKNOWN)); } - private Rs3.LSSId lssIdProto(final LssId lssId) { - return Rs3.LSSId.newBuilder() - .setId(lssId.id()) - .build(); + @Test + public void shouldTimeoutListStores() { + // given: + var startTimeMs = time.milliseconds(); + when(stub.listStores(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); + + // when: + assertThrows(RS3TimeoutException.class, () -> client.listStores()); + + // then: + var endTimeMs = time.milliseconds(); + assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); + } + + private StreamObserver verifyWalSegmentResultObserver() { + verify(asyncStub).writeWALSegmentStream(writeWALSegmentResultObserverCaptor.capture()); + return writeWALSegmentResultObserverCaptor.getValue(); } private Rs3.WriteWALSegmentRequest.Put putProto(final Put put) {