diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 5b91fbdfe..f385e7902 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 5b91fbdfe66348658cb2948eed4278bf3d9d1ef6 +Subproject commit f385e79026aac033a409794dcd6b755eedbe4429 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 228449594..5e0f70f14 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -133,9 +133,6 @@ public class ResponsiveConfig extends AbstractConfig { public static final String RS3_PORT_CONFIG = "responsive.rs3.port"; private static final String RS3_PORT_DOC = "The port to use when connecting to RS3."; - public static final String RS3_LOGICAL_STORE_MAPPING_CONFIG = "responsive.rs3.logical.store.mapping"; - public static final String RS3_LOGICAL_STORE_MAPPING_DOC = "Mapping from table name to RS3 logical store ID (e.g. 'table:b1a45157-e2f0-4698-be0e-5bf3a9b8e9d1,...')"; - public static final String RS3_TLS_ENABLED_CONFIG = "responsive.rs3.tls.enabled"; private static final String RS3_TLS_ENABLED_DOC = "Enables/disable tls for rs3 connection"; public static final boolean RS3_TLS_ENABLED_DEFAULT = true; @@ -632,12 +629,6 @@ public class ResponsiveConfig extends AbstractConfig { 50051, Importance.MEDIUM, RS3_PORT_DOC - ).define( - RS3_LOGICAL_STORE_MAPPING_CONFIG, - Type.STRING, - "", - Importance.HIGH, - RS3_LOGICAL_STORE_MAPPING_DOC ).define( RS3_TLS_ENABLED_CONFIG, Type.BOOLEAN, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java index 3a531021e..14c4a17ab 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java @@ -48,14 +48,14 @@ public class RS3KVTable implements RemoteKVTable { private RS3KVFlushManager flushManager; public RS3KVTable( - final String name, + final String storeName, final UUID storeId, final RS3Client rs3Client, final PssPartitioner pssPartitioner, final ResponsiveMetrics responsiveMetrics, final ResponsiveMetrics.MetricScopeBuilder scopeBuilder ) { - this.name = Objects.requireNonNull(name); + this.name = Objects.requireNonNull(storeName); this.storeId = Objects.requireNonNull(storeId); this.rs3Client = new MeteredRS3Client( Objects.requireNonNull(rs3Client), diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java index 581ed4ad7..3c901be7a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java @@ -12,21 +12,19 @@ package dev.responsive.kafka.internal.db.rs3; -import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.RemoteKVTable; -import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions; -import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions.ClockType; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.stores.TtlResolver; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; -import org.apache.kafka.common.config.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +33,7 @@ public class RS3TableFactory { private final GrpcRS3Client.Connector connector; // kafka store names to track which stores we've created in RS3 - private final Set createdStores = new ConcurrentSkipListSet<>(); + private final Map createdStores = new ConcurrentHashMap<>(); public RS3TableFactory(GrpcRS3Client.Connector connector) { this.connector = connector; @@ -43,43 +41,16 @@ public RS3TableFactory(GrpcRS3Client.Connector connector) { public RemoteKVTable kvTable( final String storeName, - final ResponsiveConfig config, final Optional> ttlResolver, final ResponsiveMetrics responsiveMetrics, final ResponsiveMetrics.MetricScopeBuilder scopeBuilder, final Supplier computeNumKafkaPartitions ) { - final Map storeIdMapping = config.getMap( - ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG); - final String storeIdHex = storeIdMapping.get(storeName); - if (storeIdHex == null) { - throw new ConfigException("Failed to find store ID mapping for table " + storeName); - } - - final UUID storeId = UUID.fromString(storeIdHex); - final var rs3Client = connector.connect(); - if (!createdStores.contains(storeName)) { - final int kafkaPartitions = computeNumKafkaPartitions.get(); - - final Optional defaultTtl = - ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite() - ? Optional.of(ttlResolver.get().defaultTtl().duration().toMillis()) - : Optional.empty(); - - final var options = new CreateStoreOptions( - ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(), - defaultTtl, - Optional.empty() - ); - - final var pss_ids = rs3Client.createStore(storeId, kafkaPartitions, options); - LOG.info("Created store {} with {} logical shards and {} physical shards", - storeName, kafkaPartitions, pss_ids.size()); - - createdStores.add(storeName); - } + final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore( + storeName, ttlResolver, computeNumKafkaPartitions.get(), rs3Client + )); final PssPartitioner pssPartitioner = new PssDirectPartitioner(); return new RS3KVTable( @@ -92,6 +63,31 @@ public RemoteKVTable kvTable( ); } + public static UUID createStore( + final String storeName, + final Optional> ttlResolver, + final int numKafkaPartitions, + final RS3Client rs3Client + ) { + + final Optional defaultTtl = + ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite() + ? Optional.of(ttlResolver.get().defaultTtl().duration().toMillis()) + : Optional.empty(); + + final var options = new CreateStoreOptions( + ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(), + defaultTtl, + Optional.empty() + ); + + final var result = rs3Client.createStore(storeName, numKafkaPartitions, options); + LOG.info("Created store {} ({}) with {} logical shards and {} physical shards", + storeName, result.storeId(), numKafkaPartitions, result.pssIds().size()); + + return result.storeId(); + } + public void close() { } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreOptions.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreOptions.java deleted file mode 100644 index ea9790616..000000000 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreOptions.java +++ /dev/null @@ -1,86 +0,0 @@ -package dev.responsive.kafka.internal.db.rs3.client; - -import dev.responsive.rs3.Rs3; -import java.util.Optional; - -public class CreateStoreOptions { - - public enum ClockType { - WALL_CLOCK, - STREAM_TIME - } - - private final Optional clockType; - private final Optional defaultTtl; - private final Optional filterBitsPerKey; - - - public CreateStoreOptions( - final Optional clockType, - final Optional defaultTtl, - final Optional filterBitsPerKey - ) { - this.clockType = clockType; - this.defaultTtl = defaultTtl; - this.filterBitsPerKey = filterBitsPerKey; - } - - public Optional clockType() { - return clockType; - } - - public Optional defaultTtl() { - return defaultTtl; - } - - public Optional filterBitsPerKey() { - return filterBitsPerKey; - } - - public Rs3.CreateStoreOptions toProto() { - final var builder = Rs3.CreateStoreOptions.newBuilder(); - clockType.ifPresent( - type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal())) - ); - defaultTtl.ifPresent(builder::setDefaultTtl); - filterBitsPerKey.ifPresent(builder::setFilterBitsPerKey); - return builder.build(); - } - - @Override - public String toString() { - return "CreateStoreOptions{" - + "clockType=" + clockType - + ", defaultTtl=" + defaultTtl - + ", filterBitsPerKey=" + filterBitsPerKey - + '}'; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final CreateStoreOptions that = (CreateStoreOptions) o; - - if (!clockType.equals(that.clockType)) { - return false; - } - if (!defaultTtl.equals(that.defaultTtl)) { - return false; - } - return filterBitsPerKey.equals(that.filterBitsPerKey); - } - - @Override - public int hashCode() { - int result = clockType.hashCode(); - result = 31 * result + defaultTtl.hashCode(); - result = 31 * result + filterBitsPerKey.hashCode(); - return result; - } -} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java new file mode 100644 index 000000000..7f5df9785 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java @@ -0,0 +1,134 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import dev.responsive.rs3.Rs3; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +public class CreateStoreTypes { + + public enum ClockType { + WALL_CLOCK, + STREAM_TIME + } + + public static class CreateStoreOptions { + + private final Optional clockType; + private final Optional defaultTtl; + private final Optional filterBitsPerKey; + + + public CreateStoreOptions( + final Optional clockType, + final Optional defaultTtl, + final Optional filterBitsPerKey + ) { + this.clockType = clockType; + this.defaultTtl = defaultTtl; + this.filterBitsPerKey = filterBitsPerKey; + } + + public Optional clockType() { + return clockType; + } + + public Optional defaultTtl() { + return defaultTtl; + } + + public Optional filterBitsPerKey() { + return filterBitsPerKey; + } + + public Rs3.CreateStoreOptions toProto() { + final var builder = Rs3.CreateStoreOptions.newBuilder(); + clockType.ifPresent( + type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal())) + ); + defaultTtl.ifPresent(builder::setDefaultTtl); + filterBitsPerKey.ifPresent(builder::setFilterBitsPerKey); + return builder.build(); + } + + @Override + public String toString() { + return "CreateStoreOptions{" + + "clockType=" + clockType + + ", defaultTtl=" + defaultTtl + + ", filterBitsPerKey=" + filterBitsPerKey + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CreateStoreOptions that = (CreateStoreOptions) o; + + if (!clockType.equals(that.clockType)) { + return false; + } + if (!defaultTtl.equals(that.defaultTtl)) { + return false; + } + return filterBitsPerKey.equals(that.filterBitsPerKey); + } + + @Override + public int hashCode() { + int result = clockType.hashCode(); + result = 31 * result + defaultTtl.hashCode(); + result = 31 * result + filterBitsPerKey.hashCode(); + return result; + } + } + + public static class CreateStoreResult { + private final UUID storeId; + private final List pssIds; + + public CreateStoreResult(final UUID storeId, final List pssIds) { + this.storeId = storeId; + this.pssIds = pssIds; + } + + public UUID storeId() { + return storeId; + } + + public List pssIds() { + return pssIds; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CreateStoreResult that = (CreateStoreResult) o; + + if (!storeId.equals(that.storeId)) { + return false; + } + return pssIds.equals(that.pssIds); + } + + @Override + public int hashCode() { + int result = storeId.hashCode(); + result = 31 * result + pssIds.hashCode(); + return result; + } + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 9cc2ca3d7..1fdf6cbda 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -1,5 +1,7 @@ package dev.responsive.kafka.internal.db.rs3.client; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import java.time.Duration; import java.time.Instant; @@ -117,12 +119,12 @@ public List listStores() { } @Override - public List createStore( - final UUID storeId, + public CreateStoreResult createStore( + final String storeName, final int logicalShards, final CreateStoreOptions options ) { - return delegate.createStore(storeId, logicalShards, options); + return delegate.createStore(storeName, logicalShards, options); } public void close() { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index 63b83cedd..d4c59c351 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -12,6 +12,8 @@ package dev.responsive.kafka.internal.db.rs3.client; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -57,8 +59,8 @@ KeyValueIterator range( List listStores(); - List createStore( - UUID storeId, + CreateStoreResult createStore( + String storeId, int logicalShards, CreateStoreOptions options ); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java index e5c475b10..2fd3b41de 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/Store.java @@ -5,14 +5,20 @@ public class Store { + private final String storeName; private final UUID storeId; private final List pssIds; - public Store(final UUID storeId, final List pssIds) { + public Store(final String storeName, final UUID storeId, final List pssIds) { + this.storeName = storeName; this.storeId = storeId; this.pssIds = List.copyOf(pssIds); } + public String storeName() { + return storeName; + } + public UUID storeId() { return storeId; } @@ -21,6 +27,15 @@ public List pssIds() { return pssIds; } + @Override + public String toString() { + return "Store{" + + "storeName='" + storeName + '\'' + + ", storeId=" + storeId + + ", pssIds=" + pssIds + + '}'; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -32,6 +47,9 @@ public boolean equals(final Object o) { final Store store = (Store) o; + if (!storeName.equals(store.storeName)) { + return false; + } if (!storeId.equals(store.storeId)) { return false; } @@ -40,7 +58,8 @@ public boolean equals(final Object o) { @Override public int hashCode() { - int result = storeId.hashCode(); + int result = storeName.hashCode(); + result = 31 * result + storeId.hashCode(); result = 31 * result + pssIds.hashCode(); return result; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index b33d6b642..82ff880de 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -13,13 +13,14 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; -import static dev.responsive.kafka.internal.utils.Utils.uuidProtoToUuid; -import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto; +import static dev.responsive.kafka.internal.utils.Utils.uuidFromProto; +import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import dev.responsive.kafka.api.config.ResponsiveConfig; -import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.Put; @@ -72,7 +73,7 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); final Rs3.GetOffsetsRequest request = Rs3.GetOffsetsRequest.newBuilder() - .setStoreId(uuidToUuidProto(storeId)) + .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .build(); @@ -149,7 +150,7 @@ public StreamSenderMessageReceiver> writeWalSegmentAsyn final var streamSender = new GrpcStreamSender( entry -> { final var entryBuilder = Rs3.WriteWALSegmentRequest.newBuilder() - .setStoreId(uuidToUuidProto(storeId)) + .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setEndOffset(endOffset) @@ -229,7 +230,7 @@ public KeyValueIterator range( RangeBound to ) { final var requestBuilder = Rs3.RangeRequest.newBuilder() - .setStoreId(uuidToUuidProto(storeId)) + .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setTo(protoBound(to)); @@ -250,7 +251,7 @@ public Optional get( final byte[] key ) { final var requestBuilder = Rs3.GetRequest.newBuilder() - .setStoreId(uuidToUuidProto(storeId)) + .setStoreId(uuidToProto(storeId)) .setLssId(lssIdProto(lssId)) .setPssId(pssId) .setKey(ByteString.copyFrom(key)); @@ -282,18 +283,18 @@ public List listStores() { return result.getStoresList() .stream() - .map(t -> new Store(uuidProtoToUuid(t.getStoreId()), t.getPssIdsList())) + .map(t -> new Store(t.getStoreName(), uuidFromProto(t.getStoreId()), t.getPssIdsList())) .collect(Collectors.toList()); } @Override - public List createStore( - final UUID storeId, + public CreateStoreResult createStore( + final String storeName, final int logicalShards, final CreateStoreOptions options ) { final var request = Rs3.CreateStoreRequest.newBuilder() - .setStoreId(uuidToUuidProto(storeId)) + .setStoreName(storeName) .setLogicalShards(logicalShards) .setOptions(options.toProto()) .build(); @@ -301,12 +302,12 @@ public List createStore( final Rs3.CreateStoreResult result = withRetry( () -> stub.createStore(request), - () -> "CreateStore(storeId=" + storeId + () -> "CreateStore(storeName=" + storeName + ", logicalShards=" + logicalShards + ", createStoreOptions=" + options + ")" ); - return result.getPssIdsList(); + return new CreateStoreResult(uuidFromProto(result.getStoreId()), result.getPssIdsList()); } private void addWalEntryToSegment( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index 647b7aad0..604dcbe81 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -123,7 +123,6 @@ public static PartitionedOperations create( sessionClients, changelog.topic(), ttlResolver, - config, responsiveMetrics, scopeBuilder ); @@ -272,13 +271,11 @@ private static RemoteKVTable createRS3( final SessionClients sessionClients, final String changelogTopicName, final Optional> ttlResolver, - final ResponsiveConfig config, final ResponsiveMetrics responsiveMetrics, final ResponsiveMetrics.MetricScopeBuilder scopeBuilder ) { return sessionClients.rs3TableFactory().kvTable( params.name().tableName(), - config, ttlResolver, responsiveMetrics, scopeBuilder, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java index dc9cff5ef..3c84eadaf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java @@ -130,11 +130,11 @@ public static Bytes incrementWithoutOverflow(final Bytes input) { } } - public static UUID uuidProtoToUuid(final Rs3.UUID uuid) { + public static UUID uuidFromProto(final Rs3.UUID uuid) { return new UUID(uuid.getHigh(), uuid.getLow()); } - public static Rs3.UUID uuidToUuidProto(final UUID uuid) { + public static Rs3.UUID uuidToProto(final UUID uuid) { return Rs3.UUID.newBuilder() .setHigh(uuid.getMostSignificantBits()) .setLow(uuid.getLeastSignificantBits()) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java index 79eabb885..30411cbe2 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java @@ -2,29 +2,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; -import java.util.UUID; import org.junit.jupiter.api.Test; class ResponsiveConfigTest { - @Test - public void testRs3TableMapping() { - String l1 = UUID.randomUUID().toString(); - String l2 = UUID.randomUUID().toString(); - - Properties props = new Properties(); - props.setProperty(ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG, "t1:" + l1 + ",t2:" + l2); - - final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(props); - Map expectedMapping = new HashMap<>(); - expectedMapping.put("t1", l1); - expectedMapping.put("t2", l2); - assertEquals(expectedMapping, config.getMap(ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG)); - } - @Test public void testRs3RetryTimeoutConfig() { var props = new Properties(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java index a1ae1d977..c185d67e7 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java @@ -14,27 +14,25 @@ import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import dev.responsive.kafka.api.config.ResponsiveConfig; -import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client.Connector; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Metrics; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -72,49 +70,31 @@ public void setup() { @Test public void testTableMapping() { - final var table = "test-table"; - final var uuid = "b1a45157-e2f0-4698-be0e-5bf3a9b8e9d1"; + final UUID storeId = new UUID(100, 200); + final String tableName = "test-table"; final int partitions = 5; - final var config = mock(ResponsiveConfig.class); - when(config.getMap(ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG)) - .thenReturn(Collections.singletonMap(table, uuid)); - - when(client.createStore(any(UUID.class), anyInt(), any(CreateStoreOptions.class))) - .thenReturn(List.of(1, 2, 3, 4, 5)); + when(client.createStore(anyString(), anyInt(), any(CreateStoreOptions.class))) + .thenReturn(new CreateStoreResult(storeId, List.of(1, 2, 3, 4, 5))); final RS3TableFactory factory = newTestFactory(); final RS3KVTable rs3Table = (RS3KVTable) factory.kvTable( - table, - config, + tableName, NO_TTL, metrics, scopeBuilder, () -> partitions ); - assertEquals(uuid, rs3Table.storedId().toString()); + assertEquals(tableName, rs3Table.name()); + assertEquals(storeId, rs3Table.storedId()); verify(client).createStore( - UUID.fromString(uuid), + tableName, partitions, new CreateStoreOptions(Optional.empty(), Optional.empty(), Optional.empty()) ); } - @Test - public void testMissingTableMapping() { - final String table = "test-table"; - final ResponsiveConfig config = mock(ResponsiveConfig.class); - when(config.getMap(ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG)) - .thenReturn(Collections.emptyMap()); - - final RS3TableFactory factory = newTestFactory(); - assertThrows( - ConfigException.class, - () -> factory.kvTable(table, config, NO_TTL, metrics, scopeBuilder, () -> 5) - ); - } - private RS3TableFactory newTestFactory() { final var connector = mock(Connector.class); lenient().when(connector.connect()).thenReturn(client); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index fe1b7f246..aa1cc1c75 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -14,7 +14,7 @@ import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newEndOfStreamResult; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; -import static dev.responsive.kafka.internal.utils.Utils.uuidToUuidProto; +import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -32,8 +32,9 @@ import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; -import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions; -import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions.ClockType; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; @@ -68,6 +69,7 @@ @ExtendWith(MockitoExtension.class) class GrpcRS3ClientTest { + private static final String STORE_NAME = "my_store"; private static final UUID STORE_ID = new UUID(100, 200); private static final LssId LSS_ID = new LssId(10); private static final int PSS_ID = 1; @@ -260,7 +262,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto(put1)) @@ -269,7 +271,7 @@ public void shouldWriteWalEntriesWithExpectedFields() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto(put2)) @@ -299,7 +301,7 @@ public void shouldWriteWalEntriesWithExpectedWrittenOffsetNone() { verify(writeWALSegmentRequestObserver).onNext(Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(GrpcRS3Client.WAL_OFFSET_NONE) .setEndOffset(20) .setPut(putProto(put1)) @@ -417,7 +419,7 @@ public void shouldWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -427,7 +429,7 @@ public void shouldWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(1))) @@ -480,7 +482,7 @@ public void shouldRetryWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -490,7 +492,7 @@ public void shouldRetryWriteWalSegmentSync() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(1))) @@ -541,7 +543,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInOnNext() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -591,7 +593,7 @@ public void shouldRetryWriteWalSegmentSyncWithErrorInFinish() { Rs3.WriteWALSegmentRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(15L) .setEndOffset(20) .setPut(putProto((Put) entries.get(0))) @@ -652,7 +654,7 @@ public void shouldGetWithExpectedWrittenOffset() { verify(stub).get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setExpectedWrittenOffset(123L) .setKey(ByteString.copyFromUtf8("foo")) .build() @@ -679,7 +681,7 @@ public void shouldGet() { verify(stub).get(Rs3.GetRequest.newBuilder() .setLssId(lssIdProto(LSS_ID)) .setPssId(PSS_ID) - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreId(uuidToProto(STORE_ID)) .setKey(ByteString.copyFromUtf8("foo")) .build() ); @@ -877,7 +879,8 @@ public void shouldListStores() { when(stub.listStores(any())).thenReturn( ListStoresResult.newBuilder() .addStores(Store.newBuilder() - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreName(STORE_NAME) + .setStoreId(uuidToProto(STORE_ID)) .addAllPssIds(List.of(PSS_ID, PSS_ID_2)) .build() ).build() @@ -888,8 +891,9 @@ public void shouldListStores() { // then: final var expected = new dev.responsive.kafka.internal.db.rs3.client.Store( - STORE_ID, List.of(PSS_ID, PSS_ID_2) + STORE_NAME, STORE_ID, List.of(PSS_ID, PSS_ID_2) ); + assertThat(result.size(), is(1)); assertThat(result.get(0), equalTo(expected)); verify(stub).listStores(Rs3.ListStoresRequest.newBuilder().build()); @@ -964,6 +968,7 @@ public void shouldCreateStore() { when(stub.createStore(any())) .thenReturn(CreateStoreResult .newBuilder() + .setStoreId(uuidToProto(STORE_ID)) .addAllPssIds(pss_ids) .build() ); @@ -975,12 +980,12 @@ public void shouldCreateStore() { ); // when: - final var result = client.createStore(STORE_ID, logicalShards, options); + final var result = client.createStore(STORE_NAME, logicalShards, options); // then: - assertThat(result, equalTo(pss_ids)); + assertThat(result, equalTo(new CreateStoreTypes.CreateStoreResult(STORE_ID, pss_ids))); verify(stub).createStore(Rs3.CreateStoreRequest.newBuilder() - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreName(STORE_NAME) .setLogicalShards(logicalShards) .setOptions(options.toProto()).build()); } @@ -994,6 +999,7 @@ public void shouldRetryCreateStore() { .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) .thenReturn(CreateStoreResult .newBuilder() + .setStoreId(uuidToProto(STORE_ID)) .addAllPssIds(pss_ids) .build() ); @@ -1005,12 +1011,12 @@ public void shouldRetryCreateStore() { ); // when: - final var result = client.createStore(STORE_ID, logicalShards, options); + final var result = client.createStore(STORE_NAME, logicalShards, options); // then: - assertThat(result, equalTo(pss_ids)); + assertThat(result, equalTo(new CreateStoreTypes.CreateStoreResult(STORE_ID, pss_ids))); verify(stub, times(2)).createStore(Rs3.CreateStoreRequest.newBuilder() - .setStoreId(uuidToUuidProto(STORE_ID)) + .setStoreName(STORE_NAME) .setLogicalShards(logicalShards) .setOptions(options.toProto()).build()); } @@ -1031,7 +1037,7 @@ public void shouldPropagateUnexpectedExceptionsFromCreateStore() { // when: final RS3Exception exception = assertThrows( RS3Exception.class, - () -> client.createStore(STORE_ID, logicalShards, options) + () -> client.createStore(STORE_NAME, logicalShards, options) ); // then: @@ -1056,7 +1062,7 @@ public void shouldTimeoutCreateStore() { // when: assertThrows( RS3TimeoutException.class, - () -> client.createStore(STORE_ID, logicalShards, options) + () -> client.createStore(STORE_NAME, logicalShards, options) ); // then: