diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 55c8b0abe..92841abf7 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 55c8b0abe6708b3238c2ffbaf335ae366da6adf0 +Subproject commit 92841abf7f807ad22cd8336ec36879049ac170a0 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index e71ac8274..09a6e75a0 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -112,7 +112,7 @@ public KeyValueIterator range( } @Override - public List listStores() { + public List listStores() { return delegate.listStores(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index 760e167fc..a9b7e7614 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -56,7 +56,7 @@ KeyValueIterator range( Range range ); - List listStores(); + List listStores(); CreateStoreResult createStore( String storeId, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java deleted file mode 100644 index 2fd3b41de..000000000 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java +++ /dev/null @@ -1,66 +0,0 @@ -package dev.responsive.kafka.internal.db.rs3.client; - -import java.util.List; -import java.util.UUID; - -public class Store { - - private final String storeName; - private final UUID storeId; - private final List pssIds; - - public Store(final String storeName, final UUID storeId, final List pssIds) { - this.storeName = storeName; - this.storeId = storeId; - this.pssIds = List.copyOf(pssIds); - } - - public String storeName() { - return storeName; - } - - public UUID storeId() { - return storeId; - } - - public List pssIds() { - return pssIds; - } - - @Override - public String toString() { - return "Store{" - + "storeName='" + storeName + '\'' - + ", storeId=" + storeId - + ", pssIds=" + pssIds - + '}'; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final Store store = (Store) o; - - if (!storeName.equals(store.storeName)) { - return false; - } - if (!storeId.equals(store.storeId)) { - return false; - } - return pssIds.equals(store.pssIds); - } - - @Override - public int hashCode() { - int result = storeName.hashCode(); - result = 31 * result + storeId.hashCode(); - result = 31 * result + pssIds.hashCode(); - return result; - } -} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/StoreInfo.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/StoreInfo.java new file mode 100644 index 000000000..634d63529 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/StoreInfo.java @@ -0,0 +1,113 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.StoreType; +import java.util.List; +import java.util.UUID; + +public class StoreInfo { + + public enum Status { + CREATING, + READY, + DELETED + } + + private final String storeName; + private final UUID storeId; + private final StoreType storeType; + private final Status status; + private final List pssIds; + private final String createOptions; + + public StoreInfo( + final String storeName, + final UUID storeId, + final StoreType storeType, + final Status status, + final List pssIds, + final String createOptions + ) { + this.storeName = storeName; + this.storeId = storeId; + this.storeType = storeType; + this.status = status; + this.pssIds = List.copyOf(pssIds); + this.createOptions = createOptions; + } + + public String storeName() { + return storeName; + } + + public UUID storeId() { + return storeId; + } + + public StoreType storeType() { + return storeType; + } + + public Status status() { + return status; + } + + public List pssIds() { + return pssIds; + } + + public String createOptions() { + return createOptions; + } + + @Override + public String toString() { + return "StoreInfo{" + + "storeName='" + storeName + '\'' + + ", storeId=" + storeId + + ", storeType=" + storeType + + ", status=" + status + + ", pssIds=" + pssIds + + ", createOptions=" + createOptions + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final StoreInfo storeInfo = (StoreInfo) o; + + if (!storeName.equals(storeInfo.storeName)) { + return false; + } + if (!storeId.equals(storeInfo.storeId)) { + return false; + } + if (!storeType.equals(storeInfo.storeType)) { + return false; + } + if (!status.equals(storeInfo.status)) { + return false; + } + if (!pssIds.equals(storeInfo.pssIds)) { + return false; + } + return createOptions.equals(storeInfo.createOptions); + } + + @Override + public int hashCode() { + int result = storeName.hashCode(); + result = 31 * result + storeId.hashCode(); + result = 31 * result + storeType.hashCode(); + result = 31 * result + status.hashCode(); + result = 31 * result + pssIds.hashCode(); + result = 31 * result + createOptions.hashCode(); + return result; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 577a471b6..111bd0003 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -15,6 +15,8 @@ import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicDeleteProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeStatusFromProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeTypeFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; @@ -34,7 +36,7 @@ import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; import dev.responsive.kafka.internal.db.rs3.client.Range; import dev.responsive.kafka.internal.db.rs3.client.RangeBound; -import dev.responsive.kafka.internal.db.rs3.client.Store; +import dev.responsive.kafka.internal.db.rs3.client.StoreInfo; import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; @@ -272,7 +274,7 @@ public Optional get( } @Override - public List listStores() { + public List listStores() { final var request = Rs3.ListStoresRequest.newBuilder().build(); final RS3Grpc.RS3BlockingStub stub = stubs.globalStubs().syncStub(); @@ -283,7 +285,14 @@ public List listStores() { return result.getStoresList() .stream() - .map(t -> new Store(t.getStoreName(), uuidFromProto(t.getStoreId()), t.getPssIdsList())) + .map(t -> new StoreInfo( + t.getStoreName(), + uuidFromProto(t.getStoreId()), + storeTypeFromProto(t.getStoreType()), + storeStatusFromProto(t.getStatus()), + t.getPssIdsList(), + t.getOptions()) + ) .collect(Collectors.toList()); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java index e46f9be4c..39ab5143d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java @@ -18,6 +18,7 @@ import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; +import dev.responsive.kafka.internal.db.rs3.client.StoreInfo; import dev.responsive.rs3.Rs3; import io.grpc.Status; import io.grpc.StatusException; @@ -102,7 +103,7 @@ public static Rs3.WriteWALSegmentRequest.Delete basicDeleteProto(final Delete de .build(); } - public static Rs3.StoreType storeTypeProto(final CreateStoreTypes.StoreType storeType) { + public static Rs3.StoreType storeTypeToProto(final CreateStoreTypes.StoreType storeType) { switch (storeType) { case BASIC: return Rs3.StoreType.BASIC; @@ -113,12 +114,36 @@ public static Rs3.StoreType storeTypeProto(final CreateStoreTypes.StoreType stor } } + public static CreateStoreTypes.StoreType storeTypeFromProto(final Rs3.StoreType storeType) { + switch (storeType) { + case BASIC: + return CreateStoreTypes.StoreType.BASIC; + case WINDOW: + return CreateStoreTypes.StoreType.WINDOW; + default: + throw new IllegalArgumentException("Unknown store type: " + storeType); + } + } + + public static StoreInfo.Status storeStatusFromProto(final Rs3.StoreInfo.Status status) { + switch (status) { + case CREATING: + return StoreInfo.Status.CREATING; + case READY: + return StoreInfo.Status.READY; + case DELETING: + return StoreInfo.Status.DELETED; + default: + throw new IllegalArgumentException("Unknown store status: " + status); + } + } + public static Rs3.CreateStoreOptions createStoreOptionsProto( CreateStoreTypes.CreateStoreOptions options ) { final var builder = Rs3.CreateStoreOptions.newBuilder(); builder.setLogicalShards(options.logicalShards()); - builder.setStoreType(storeTypeProto(options.storeType())); + builder.setStoreType(storeTypeToProto(options.storeType())); options.clockType().ifPresent( type -> builder.setClockType(Rs3.ClockType.forNumber(type.ordinal())) ); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index 94fcfa131..b449e69ed 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -36,17 +36,18 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.StoreType; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.Range; +import dev.responsive.kafka.internal.db.rs3.client.StoreInfo; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.rs3.RS3Grpc; import dev.responsive.rs3.Rs3; import dev.responsive.rs3.Rs3.CreateStoreResult; import dev.responsive.rs3.Rs3.ListStoresResult; -import dev.responsive.rs3.Rs3.StoreInfo; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -879,11 +880,15 @@ public void shouldTimeoutAfterObserverOnError() { @Test public void shouldListStores() { // given: + final String createOptions = "opts-stub"; when(stub.listStores(any())).thenReturn( ListStoresResult.newBuilder() - .addStores(StoreInfo.newBuilder() + .addStores(Rs3.StoreInfo.newBuilder() .setStoreName(STORE_NAME) .setStoreId(uuidToProto(STORE_ID)) + .setStatus(Rs3.StoreInfo.Status.READY) + .setStoreType(Rs3.StoreType.BASIC) + .setOptions(createOptions) .addAllPssIds(List.of(PSS_ID, PSS_ID_2)) .build() ).build() @@ -893,9 +898,9 @@ public void shouldListStores() { final var result = client.listStores(); // then: - final var expected = new dev.responsive.kafka.internal.db.rs3.client.Store( - STORE_NAME, STORE_ID, List.of(PSS_ID, PSS_ID_2) - ); + final var expected = new StoreInfo( + STORE_NAME, STORE_ID, StoreType.BASIC, StoreInfo.Status.READY, List.of(PSS_ID, PSS_ID_2), + createOptions); assertThat(result.size(), is(1)); assertThat(result.get(0), equalTo(expected)); @@ -919,15 +924,30 @@ public void shouldHandleEmptyStoresList() { @Test public void shouldRetryListStores() { // given: + final String createOptions = "opts-stub"; when(stub.listStores(any())) .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) - .thenReturn(ListStoresResult.newBuilder().build()); + .thenReturn(ListStoresResult.newBuilder() + .addStores(Rs3.StoreInfo.newBuilder() + .setStoreName(STORE_NAME) + .setStoreId(uuidToProto(STORE_ID)) + .setStatus(Rs3.StoreInfo.Status.READY) + .setStoreType(Rs3.StoreType.BASIC) + .setOptions(createOptions) + .addAllPssIds(List.of(PSS_ID, PSS_ID_2)) + .build() + ).build()); // when: final var result = client.listStores(); // then: - assertThat(result.size(), is(0)); + final var expected = new StoreInfo( + STORE_NAME, STORE_ID, StoreType.BASIC, StoreInfo.Status.READY, List.of(PSS_ID, PSS_ID_2), + createOptions); + + assertThat(result.size(), is(1)); + assertThat(result.get(0), equalTo(expected)); verify(stub, times(2)).listStores(Rs3.ListStoresRequest.newBuilder().build()); }