diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java index 6bc592782..d0a732011 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java @@ -17,10 +17,12 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.SlateDbStorageOptions; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; +import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema; import dev.responsive.kafka.internal.stores.TtlResolver; import java.time.Duration; import java.util.Map; @@ -33,6 +35,10 @@ public class RS3TableFactory { private static final Logger LOG = LoggerFactory.getLogger(RS3TableFactory.class); + + // TODO: move this and make it configurable + private static final int DEFAULT_FACT_STORE_FILTER_BITS = 20; + private final GrpcRS3Client.Connector connector; // kafka store names to track which stores we've created in RS3 @@ -47,9 +53,9 @@ public RemoteKVTable kvTable( final Optional> ttlResolver, final ResponsiveMetrics responsiveMetrics, final ResponsiveMetrics.MetricScopeBuilder scopeBuilder, - final Supplier computeNumKafkaPartitions + final Supplier computeNumKafkaPartitions, + final KVSchema schema ) { - final Optional defaultTtl = ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite() ? Optional.of(ttlResolver.get().defaultTtl().duration()) @@ -57,17 +63,23 @@ public RemoteKVTable kvTable( final Optional clockType = ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(); + final Optional storageOptions = schema == KVSchema.FACT + ? Optional.of(new SlateDbStorageOptions(Optional.of(DEFAULT_FACT_STORE_FILTER_BITS))) + : Optional.empty(); final var rs3Client = connector.connect(); - final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore( + final UUID storeId = createdStores.computeIfAbsent( storeName, - CreateStoreTypes.StoreType.BASIC, - clockType, - defaultTtl, - computeNumKafkaPartitions.get(), - rs3Client - )); + n -> createStore( + storeName, + CreateStoreTypes.StoreType.BASIC, + clockType, + defaultTtl, + storageOptions, + computeNumKafkaPartitions.get(), + rs3Client + )); final PssPartitioner pssPartitioner = new PssDirectPartitioner(); return new RS3KVTable( @@ -93,6 +105,7 @@ public RemoteWindowTable windowTable( CreateStoreTypes.StoreType.WINDOW, Optional.of(ClockType.WALL_CLOCK), Optional.of(defaultTtl), + Optional.empty(), computeNumKafkaPartitions.get(), rs3Client )); @@ -113,6 +126,7 @@ public static UUID createStore( final CreateStoreTypes.StoreType storeType, final Optional clockType, final Optional defaultTtl, + final Optional storageOptions, final int numKafkaPartitions, final RS3Client rs3Client ) { @@ -121,7 +135,7 @@ public static UUID createStore( storeType, clockType, defaultTtl.map(Duration::toMillis), - Optional.empty() + storageOptions ); final var result = rs3Client.createStore(storeName, options); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java index bf2faa68d..469dca508 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/CreateStoreTypes.java @@ -96,7 +96,8 @@ public static class SlateDbStorageOptions { private final Optional filterBitsPerKey; public SlateDbStorageOptions( - final Optional filterBitsPerKey) { + final Optional filterBitsPerKey + ) { this.filterBitsPerKey = Objects.requireNonNull(filterBitsPerKey); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index 604dcbe81..aa026a871 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -279,7 +279,8 @@ private static RemoteKVTable createRS3( ttlResolver, responsiveMetrics, scopeBuilder, - () -> numPartitionsForKafkaTopic(sessionClients.admin(), changelogTopicName) + () -> numPartitionsForKafkaTopic(sessionClients.admin(), changelogTopicName), + params.schemaType() ); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java index 12e6caf63..2a0937efd 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactoryTest.java @@ -24,10 +24,12 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; +import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.SlateDbStorageOptions; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client.Connector; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; +import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema; import java.time.Duration; import java.util.List; import java.util.Map; @@ -70,7 +72,7 @@ public void setup() { } @Test - public void testBasicTableMapping() { + public void testBasicKeyValueTableMapping() { final UUID storeId = new UUID(100, 200); final String tableName = "test-table"; final int partitions = 5; @@ -84,7 +86,8 @@ public void testBasicTableMapping() { NO_TTL, metrics, scopeBuilder, - () -> partitions + () -> partitions, + KVSchema.KEY_VALUE ); assertEquals(tableName, rs3Table.name()); assertEquals(storeId, rs3Table.storeId()); @@ -99,6 +102,38 @@ public void testBasicTableMapping() { verify(client).createStore(tableName, expectedOptions); } + @Test + public void testFactTableMapping() { + final UUID storeId = new UUID(100, 200); + final String tableName = "test-table"; + final int partitions = 5; + + when(client.createStore(anyString(), any(CreateStoreOptions.class))) + .thenReturn(new CreateStoreResult(storeId, List.of(1, 2, 3, 4, 5))); + + final RS3TableFactory factory = newTestFactory(); + final RS3KVTable rs3Table = (RS3KVTable) factory.kvTable( + tableName, + NO_TTL, + metrics, + scopeBuilder, + () -> partitions, + KVSchema.FACT + ); + assertEquals(tableName, rs3Table.name()); + assertEquals(storeId, rs3Table.storeId()); + + final var expectedStorageOptions = Optional.of(new SlateDbStorageOptions(Optional.of(20))); + final var expectedOptions = new CreateStoreOptions( + partitions, + CreateStoreTypes.StoreType.BASIC, + Optional.empty(), + Optional.empty(), + expectedStorageOptions + ); + verify(client).createStore(tableName, expectedOptions); + } + @Test public void testWindowTableMapping() { final UUID storeId = new UUID(100, 200);