From 3963ae06b0ebbb14bbc47321aabe8046104f316b Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 26 Mar 2025 20:26:32 -0700 Subject: [PATCH 1/9] all --- .../db/rs3/client/MeteredRS3Client.java | 8 ++ .../internal/db/rs3/client/RS3Client.java | 2 + .../kafka/internal/db/rs3/client/Table.java | 43 ++++++ .../db/rs3/client/grpc/GrpcRS3Client.java | 35 +++-- .../db/rs3/client/grpc/PssStubsProvider.java | 9 ++ .../kafka/internal/utils/Utils.java | 20 +++ .../db/rs3/client/grpc/GrpcRS3ClientTest.java | 131 +++++++++++++++--- 7 files changed, 212 insertions(+), 36 deletions(-) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Table.java diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 963321843..6a962de98 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -90,6 +90,14 @@ public Optional get( return result; } + @Override + public List listTables() { + final Instant start = Instant.now(); + final List
result = delegate.listTables(); + getSensor.record(Duration.between(start, Instant.now()).toNanos()); + return result; + } + public void close() { this.metrics.removeSensor(GET_SENSOR_NAME); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index f1190adc0..03cdc03d7 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -44,5 +44,7 @@ Optional get( byte[] key ); + List
listTables(); + void close(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Table.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Table.java new file mode 100644 index 000000000..fed9697fd --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Table.java @@ -0,0 +1,43 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import java.util.List; +import java.util.UUID; + +public class Table { + + private final UUID storeId; + private final List pssIds; + + public Table(final UUID storeId, final List pssIds) { + this.storeId = storeId; + this.pssIds = pssIds; + } + + public UUID storeId() { + return storeId; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Table table = (Table) o; + + if (!storeId.equals(table.storeId)) { + return false; + } + return pssIds.equals(table.pssIds); + } + + @Override + public int hashCode() { + int result = storeId.hashCode(); + result = 31 * result + pssIds.hashCode(); + return result; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 30bc9bf13..b4800b8f1 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -12,6 +12,10 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; +import static dev.responsive.kafka.internal.utils.Utils.uuidProtoToUuid; +import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto; + import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -22,6 +26,7 @@ import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; +import dev.responsive.kafka.internal.db.rs3.client.Table; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; @@ -31,6 +36,7 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.Time; @@ -57,7 +63,7 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); final Rs3.GetOffsetsRequest request = Rs3.GetOffsetsRequest.newBuilder() - .setStoreId(uuidProto(storeId)) + .setStoreId(uuidToUuidProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .build(); @@ -130,7 +136,7 @@ public StreamSenderMessageReceiver> writeWalSegmentAsyn final var streamSender = new GrpcStreamSender( entry -> { final var entryBuilder = Rs3.WriteWALSegmentRequest.newBuilder() - .setStoreId(uuidProto(storeId)) + .setStoreId(uuidToUuidProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setEndOffset(endOffset) @@ -209,7 +215,7 @@ public Optional get( final byte[] key ) { final var requestBuilder = Rs3.GetRequest.newBuilder() - .setStoreId(uuidProto(storeId)) + .setStoreId(uuidToUuidProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setKey(ByteString.copyFrom(key)); @@ -229,17 +235,20 @@ public Optional get( return Optional.of(keyValue.getValue().toByteArray()); } - private Rs3.UUID uuidProto(final UUID uuid) { - return Rs3.UUID.newBuilder() - .setHigh(uuid.getMostSignificantBits()) - .setLow(uuid.getLeastSignificantBits()) - .build(); - } + @Override + public List
listTables() { + final var request = Rs3.ListTablesRequest.newBuilder().build(); + final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub(); - private Rs3.LSSId lssIdProto(final LssId lssId) { - return Rs3.LSSId.newBuilder() - .setId(lssId.id()) - .build(); + final Rs3.ListTablesResult result = withRetry( + () -> stub.listTables(request), + () -> "ListTables()" + ); + + return result.getTablesList() + .stream() + .map(t -> new Table(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList())) + .collect(Collectors.toList()); } private void addWalEntryToSegment( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java index 7b9235130..1d1390476 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/PssStubsProvider.java @@ -20,11 +20,16 @@ class PssStubsProvider { private static final Logger LOG = LoggerFactory.getLogger(PssStubsProvider.class); private final ManagedChannel channel; + private final Stubs globalStubs; private final ConcurrentMap stubs = new ConcurrentHashMap<>(); @VisibleForTesting PssStubsProvider(final ManagedChannel channel) { this.channel = channel; + this.globalStubs = new Stubs( + RS3Grpc.newBlockingStub(channel), + RS3Grpc.newStub(channel) + ); } static PssStubsProvider connect( @@ -60,6 +65,10 @@ Stubs stubs(final UUID storeId, final int pssId) { }); } + Stubs globalStubs() { + return globalStubs; + } + @VisibleForTesting static class Stubs { private final RS3Grpc.RS3BlockingStub syncStub; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java index 053ed7b33..dc9cff5ef 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java @@ -12,6 +12,9 @@ package dev.responsive.kafka.internal.utils; +import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.rs3.Rs3; +import java.util.UUID; import java.util.regex.Pattern; import org.apache.kafka.common.utils.Bytes; import org.slf4j.Logger; @@ -126,4 +129,21 @@ public static Bytes incrementWithoutOverflow(final Bytes input) { return null; } } + + public static UUID uuidProtoToUuid(final Rs3.UUID uuid) { + return new UUID(uuid.getHigh(), uuid.getLow()); + } + + public static Rs3.UUID uuidToUuidProto(final UUID uuid) { + return Rs3.UUID.newBuilder() + .setHigh(uuid.getMostSignificantBits()) + .setLow(uuid.getLeastSignificantBits()) + .build(); + } + + public static Rs3.LSSId lssIdProto(final LssId lssId) { + return Rs3.LSSId.newBuilder() + .setId(lssId.id()) + .build(); + } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index b55c0bb57..a3ba447db 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -12,7 +12,10 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; +import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; +import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -21,6 +24,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,6 +36,10 @@ import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; +import dev.responsive.rs3.Rs3.KeyValue; +import dev.responsive.rs3.Rs3.ListTablesRequest; +import dev.responsive.rs3.Rs3.ListTablesResult; +import dev.responsive.rs3.Rs3.Table; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -55,6 +63,7 @@ class GrpcRS3ClientTest { private static final UUID STORE_ID = new UUID(100, 200); private static final LssId LSS_ID = new LssId(10); private static final int PSS_ID = 1; + private static final int PSS_ID_2 = 2; @Mock private RS3Grpc.RS3BlockingStub stub; @@ -78,6 +87,10 @@ public void setup() { stub, asyncStub )); + lenient().when(stubs.globalStubs()).thenReturn(new PssStubsProvider.Stubs( + stub, + asyncStub + )); client = new GrpcRS3Client(stubs, time, retryTimeoutMs); } @@ -239,7 +252,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto(put1)) @@ -248,7 +261,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto(put2)) @@ -278,7 +291,7 @@ public void shouldWriteWalEntriesWithExpectedWrittenOffsetNone() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(GrpcRS3Client.WAL_OFFSET_NONE) .setEndOffset(20) .setPut(putProto(put1)) @@ -396,7 +409,7 @@ public void shouldWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -406,7 +419,7 @@ public void shouldWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(1))) @@ -459,7 +472,7 @@ public void shouldRetryWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -469,7 +482,7 @@ public void shouldRetryWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(1))) @@ -520,7 +533,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInOnNext() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -570,7 +583,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInFinish() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -631,7 +644,7 @@ public void shouldGetWithExpectedWrittenOffset() { verify(stub).get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setExpectedWrittenOffset(123L) .setKey(ByteString.copyFromUtf8("foo")) .build() @@ -658,7 +671,7 @@ public void shouldGet() { verify(stub).get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidProto(STORE_ID)) + .setStoreId(uuidToUuidProto(STORE_ID)) .setKey(ByteString.copyFromUtf8("foo")) .build() ); @@ -730,22 +743,94 @@ public void shouldTimeoutGet() { assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); } - private StreamObserver verifyWalSegmentResultObserver() { - verify(asyncStub).writeWALSegmentStream(writeWALSegmentResultObserverCaptor.capture()); - return writeWALSegmentResultObserverCaptor.getValue(); + @Test + public void shouldListTables() { + // given: + when(stub.listTables(any())).thenReturn( + ListTablesResult.newBuilder() + .addTables(Table.newBuilder() + .setStoreId(uuidToUuidProto(STORE_ID)) + .addAllPssIds(List.of(PSS_ID, PSS_ID_2)) + .build() + ).build() + ); + + // when: + final var result = client.listTables(); + + // then: + final var expected = new dev.responsive.kafka.internal.db.rs3.client.Table( + STORE_ID, List.of(PSS_ID, PSS_ID_2) + ); + assertThat(result.size(), is(1)); + assertThat(result.get(0), equalTo(expected)); + verify(stub).listTables(Rs3.ListTablesRequest.newBuilder().build()); + } + + @Test + public void shouldHandleEmptyTablesList() { + // given: + when(stub.listTables(any())) + .thenReturn(ListTablesResult.newBuilder().build()); + + // when: + final var result = client.listTables(); + + // then: + assertThat(result.size(), is(0)); + verify(stub).listTables(Rs3.ListTablesRequest.newBuilder().build()); + } + + @Test + public void shouldRetryListTables() { + // given: + when(stub.listTables(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) + .thenReturn(ListTablesResult.newBuilder().build()); + + // when: + final var result = client.listTables(); + + // then: + assertThat(result.size(), is(0)); + verify(stub, times(2)).listTables(Rs3.ListTablesRequest.newBuilder().build()); } - private Rs3.UUID uuidProto(final UUID uuid) { - return Rs3.UUID.newBuilder() - .setHigh(uuid.getMostSignificantBits()) - .setLow(uuid.getLeastSignificantBits()) - .build(); + @Test + public void shouldPropagateUnexpectedExceptionsFromListTables() { + // given: + when(stub.listTables(any())) + .thenThrow(new StatusRuntimeException(Status.UNKNOWN)); + + // when: + final RS3Exception exception = assertThrows( + RS3Exception.class, + () -> client.listTables() + ); + + // then: + assertThat(exception.getCause(), instanceOf(StatusRuntimeException.class)); + assertThat(((StatusRuntimeException) exception.getCause()).getStatus(), is(Status.UNKNOWN)); } - private Rs3.LSSId lssIdProto(final LssId lssId) { - return Rs3.LSSId.newBuilder() - .setId(lssId.id()) - .build(); + @Test + public void shouldTimeoutListTables() { + // given: + var startTimeMs = time.milliseconds(); + when(stub.listTables(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); + + // when: + assertThrows(RS3TimeoutException.class, () -> client.listTables()); + + // then: + var endTimeMs = time.milliseconds(); + assertThat(endTimeMs - startTimeMs, is(retryTimeoutMs)); + } + + private StreamObserver verifyWalSegmentResultObserver() { + verify(asyncStub).writeWALSegmentStream(writeWALSegmentResultObserverCaptor.capture()); + return writeWALSegmentResultObserverCaptor.getValue(); } private Rs3.WriteWALSegmentRequest.Put putProto(final Put put) { From a3abc7b46ea4256ac844bb28d9b164aef4d5d8d2 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 26 Mar 2025 20:29:30 -0700 Subject: [PATCH 2/9] checktyle --- .../kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index a3ba447db..4c4d4a707 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -36,8 +36,6 @@ import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; -import dev.responsive.rs3.Rs3.KeyValue; -import dev.responsive.rs3.Rs3.ListTablesRequest; import dev.responsive.rs3.Rs3.ListTablesResult; import dev.responsive.rs3.Rs3.Table; import io.grpc.Status; From db8413cc951eccf3b371e47a4b4467a9f5c8e3c8 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 26 Mar 2025 21:35:15 -0700 Subject: [PATCH 3/9] flip table to store --- .../internal/db/rs3/client/MeteredRS3Client.java | 4 ++-- .../kafka/internal/db/rs3/client/RS3Client.java | 2 +- .../db/rs3/client/{Table.java => Store.java} | 10 +++++----- .../internal/db/rs3/client/grpc/GrpcRS3Client.java | 6 +++--- .../db/rs3/client/grpc/GrpcRS3ClientTest.java | 13 +++++++------ 5 files changed, 18 insertions(+), 17 deletions(-) rename kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/{Table.java => Store.java} (75%) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 6a962de98..7e02208a9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -91,9 +91,9 @@ public Optional get( } @Override - public List
listTables() { + public List listStores() { final Instant start = Instant.now(); - final List
result = delegate.listTables(); + final List result = delegate.listStores(); getSensor.record(Duration.between(start, Instant.now()).toNanos()); return result; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index 03cdc03d7..9baa2b191 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -44,7 +44,7 @@ Optional get( byte[] key ); - List
listTables(); + List listStores(); void close(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Table.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java similarity index 75% rename from kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Table.java rename to kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java index fed9697fd..01239d6cd 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Table.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java @@ -3,12 +3,12 @@ import java.util.List; import java.util.UUID; -public class Table { +public class Store { private final UUID storeId; private final List pssIds; - public Table(final UUID storeId, final List pssIds) { + public Store(final UUID storeId, final List pssIds) { this.storeId = storeId; this.pssIds = pssIds; } @@ -26,12 +26,12 @@ public boolean equals(final Object o) { return false; } - final Table table = (Table) o; + final Store store = (Store) o; - if (!storeId.equals(table.storeId)) { + if (!storeId.equals(store.storeId)) { return false; } - return pssIds.equals(table.pssIds); + return pssIds.equals(store.pssIds); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index b4800b8f1..41c380658 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -26,7 +26,7 @@ import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; -import dev.responsive.kafka.internal.db.rs3.client.Table; +import dev.responsive.kafka.internal.db.rs3.client.Store; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; @@ -236,7 +236,7 @@ public Optional get( } @Override - public List
listTables() { + public List listStores() { final var request = Rs3.ListTablesRequest.newBuilder().build(); final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub(); @@ -247,7 +247,7 @@ public List
listTables() { return result.getTablesList() .stream() - .map(t -> new Table(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList())) + .map(t -> new Store(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList())) .collect(Collectors.toList()); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index 4c4d4a707..34545eb7d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -33,6 +33,7 @@ import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; +import dev.responsive.kafka.internal.db.rs3.client.Store; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; @@ -754,10 +755,10 @@ public void shouldListTables() { ); // when: - final var result = client.listTables(); + final var result = client.listStores(); // then: - final var expected = new dev.responsive.kafka.internal.db.rs3.client.Table( + final var expected = new Store( STORE_ID, List.of(PSS_ID, PSS_ID_2) ); assertThat(result.size(), is(1)); @@ -772,7 +773,7 @@ public void shouldHandleEmptyTablesList() { .thenReturn(ListTablesResult.newBuilder().build()); // when: - final var result = client.listTables(); + final var result = client.listStores(); // then: assertThat(result.size(), is(0)); @@ -787,7 +788,7 @@ public void shouldRetryListTables() { .thenReturn(ListTablesResult.newBuilder().build()); // when: - final var result = client.listTables(); + final var result = client.listStores(); // then: assertThat(result.size(), is(0)); @@ -803,7 +804,7 @@ public void shouldPropagateUnexpectedExceptionsFromListTables() { // when: final RS3Exception exception = assertThrows( RS3Exception.class, - () -> client.listTables() + () -> client.listStores() ); // then: @@ -819,7 +820,7 @@ public void shouldTimeoutListTables() { .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); // when: - assertThrows(RS3TimeoutException.class, () -> client.listTables()); + assertThrows(RS3TimeoutException.class, () -> client.listStores()); // then: var endTimeMs = time.milliseconds(); From 08d8fa7b8c347b54104f628af964eda7217eb2ed Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 26 Mar 2025 21:41:11 -0700 Subject: [PATCH 4/9] include protocol changes --- kafka-client/src/main/external-protos/rs3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index c03fb0c8a..a1b3ef24f 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit c03fb0c8a3d3b8521340434bc9a5043f231f3758 +Subproject commit a1b3ef24f3f8a5bbf2fb5320b44d6db49159ef7d From fd5e15d41a96e07338da8efb49357a30bd8d1dcc Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 27 Mar 2025 14:21:55 -0700 Subject: [PATCH 5/9] fix maybe? --- kafka-client/src/main/external-protos/rs3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index a1b3ef24f..58c1dc25f 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit a1b3ef24f3f8a5bbf2fb5320b44d6db49159ef7d +Subproject commit 58c1dc25f32951b8fc00344d1ac13cf9a2d26f3e From ffda3eb892d9172bd42b4b11c769d0b607e2910c Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 27 Mar 2025 14:51:32 -0700 Subject: [PATCH 6/9] add proto --- kafka-client/src/main/external-protos/rs3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 58c1dc25f..a677273e7 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 58c1dc25f32951b8fc00344d1ac13cf9a2d26f3e +Subproject commit a677273e7181256541c3e3d1e3425135252c830a From 26b4147dce1170e55e6a5877b91183f013b60d23 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 27 Mar 2025 14:54:32 -0700 Subject: [PATCH 7/9] proto again --- kafka-client/src/main/external-protos/rs3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index a677273e7..7f8cb9a48 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit a677273e7181256541c3e3d1e3425135252c830a +Subproject commit 7f8cb9a48adf8f812c17cb57997464761cbd0a1e From 494c84206cd1792584f33964b9968628d73d81f6 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 27 Mar 2025 15:45:01 -0700 Subject: [PATCH 8/9] rs3 merged --- kafka-client/src/main/external-protos/rs3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 7f8cb9a48..eab53946a 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 7f8cb9a48adf8f812c17cb57997464761cbd0a1e +Subproject commit eab53946afd1c8e6ef704435338854df3d949e7f From ac22c8a529d0f38ebad3aa5b0669e53c971f3231 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 27 Mar 2025 15:58:49 -0700 Subject: [PATCH 9/9] table to store more fixes --- .../db/rs3/client/grpc/GrpcRS3Client.java | 12 +++--- .../db/rs3/client/grpc/GrpcRS3ClientTest.java | 41 +++++++++---------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 41c380658..08f6d9e95 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -25,8 +25,8 @@ import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; -import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; import dev.responsive.kafka.internal.db.rs3.client.Store; +import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; @@ -237,15 +237,15 @@ public Optional get( @Override public List listStores() { - final var request = Rs3.ListTablesRequest.newBuilder().build(); + final var request = Rs3.ListStoresRequest.newBuilder().build(); final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub(); - final Rs3.ListTablesResult result = withRetry( - () -> stub.listTables(request), - () -> "ListTables()" + final Rs3.ListStoresResult result = withRetry( + () -> stub.listStores(request), + () -> "ListStores()" ); - return result.getTablesList() + return result.getStoresList() .stream() .map(t -> new Store(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList())) .collect(Collectors.toList()); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index 34545eb7d..a1ff5ee7d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -33,12 +33,11 @@ import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; -import dev.responsive.kafka.internal.db.rs3.client.Store; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; -import dev.responsive.rs3.Rs3.ListTablesResult; -import dev.responsive.rs3.Rs3.Table; +import dev.responsive.rs3.Rs3.ListStoresResult; +import dev.responsive.rs3.Rs3.Store; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -743,11 +742,11 @@ public void shouldTimeoutGet() { } @Test - public void shouldListTables() { + public void shouldListStores() { // given: - when(stub.listTables(any())).thenReturn( - ListTablesResult.newBuilder() - .addTables(Table.newBuilder() + when(stub.listStores(any())).thenReturn( + ListStoresResult.newBuilder() + .addStores(Store.newBuilder() .setStoreId(uuidToUuidProto(STORE_ID)) .addAllPssIds(List.of(PSS_ID, PSS_ID_2)) .build() @@ -758,47 +757,47 @@ public void shouldListTables() { final var result = client.listStores(); // then: - final var expected = new Store( + final var expected = new dev.responsive.kafka.internal.db.rs3.client.Store( STORE_ID, List.of(PSS_ID, PSS_ID_2) ); assertThat(result.size(), is(1)); assertThat(result.get(0), equalTo(expected)); - verify(stub).listTables(Rs3.ListTablesRequest.newBuilder().build()); + verify(stub).listStores(Rs3.ListStoresRequest.newBuilder().build()); } @Test - public void shouldHandleEmptyTablesList() { + public void shouldHandleEmptyStoresList() { // given: - when(stub.listTables(any())) - .thenReturn(ListTablesResult.newBuilder().build()); + when(stub.listStores(any())) + .thenReturn(ListStoresResult.newBuilder().build()); // when: final var result = client.listStores(); // then: assertThat(result.size(), is(0)); - verify(stub).listTables(Rs3.ListTablesRequest.newBuilder().build()); + verify(stub).listStores(Rs3.ListStoresRequest.newBuilder().build()); } @Test - public void shouldRetryListTables() { + public void shouldRetryListStores() { // given: - when(stub.listTables(any())) + when(stub.listStores(any())) .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) - .thenReturn(ListTablesResult.newBuilder().build()); + .thenReturn(ListStoresResult.newBuilder().build()); // when: final var result = client.listStores(); // then: assertThat(result.size(), is(0)); - verify(stub, times(2)).listTables(Rs3.ListTablesRequest.newBuilder().build()); + verify(stub, times(2)).listStores(Rs3.ListStoresRequest.newBuilder().build()); } @Test - public void shouldPropagateUnexpectedExceptionsFromListTables() { + public void shouldPropagateUnexpectedExceptionsFromListStores() { // given: - when(stub.listTables(any())) + when(stub.listStores(any())) .thenThrow(new StatusRuntimeException(Status.UNKNOWN)); // when: @@ -813,10 +812,10 @@ public void shouldPropagateUnexpectedExceptionsFromListTables() { } @Test - public void shouldTimeoutListTables() { + public void shouldTimeoutListStores() { // given: var startTimeMs = time.milliseconds(); - when(stub.listTables(any())) + when(stub.listStores(any())) .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); // when: