Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.SlateDbStorageOptions;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.time.Duration;
import java.util.Map;
Expand All @@ -33,6 +35,10 @@

public class RS3TableFactory {
private static final Logger LOG = LoggerFactory.getLogger(RS3TableFactory.class);

// TODO: move this and make it configurable
private static final int DEFAULT_FACT_STORE_FILTER_BITS = 20;

private final GrpcRS3Client.Connector connector;

// kafka store names to track which stores we've created in RS3
Expand All @@ -47,27 +53,33 @@ public RemoteKVTable<WalEntry> kvTable(
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveMetrics responsiveMetrics,
final ResponsiveMetrics.MetricScopeBuilder scopeBuilder,
final Supplier<Integer> computeNumKafkaPartitions
final Supplier<Integer> computeNumKafkaPartitions,
final KVSchema schema
) {

final Optional<Duration> defaultTtl = ttlResolver.isPresent()
&& ttlResolver.get().defaultTtl().isFinite()
? Optional.of(ttlResolver.get().defaultTtl().duration())
: Optional.empty();
final Optional<ClockType> clockType = ttlResolver.isPresent()
? Optional.of(ClockType.WALL_CLOCK)
: Optional.empty();
final Optional<SlateDbStorageOptions> storageOptions = schema == KVSchema.FACT
? Optional.of(new SlateDbStorageOptions(Optional.of(DEFAULT_FACT_STORE_FILTER_BITS)))
: Optional.empty();

final var rs3Client = connector.connect();

final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore(
final UUID storeId = createdStores.computeIfAbsent(
storeName,
CreateStoreTypes.StoreType.BASIC,
clockType,
defaultTtl,
computeNumKafkaPartitions.get(),
rs3Client
));
n -> createStore(
storeName,
CreateStoreTypes.StoreType.BASIC,
clockType,
defaultTtl,
storageOptions,
computeNumKafkaPartitions.get(),
rs3Client
));

final PssPartitioner pssPartitioner = new PssDirectPartitioner();
return new RS3KVTable(
Expand All @@ -93,6 +105,7 @@ public RemoteWindowTable<WalEntry> windowTable(
CreateStoreTypes.StoreType.WINDOW,
Optional.of(ClockType.WALL_CLOCK),
Optional.of(defaultTtl),
Optional.empty(),
computeNumKafkaPartitions.get(),
rs3Client
));
Expand All @@ -113,6 +126,7 @@ public static UUID createStore(
final CreateStoreTypes.StoreType storeType,
final Optional<ClockType> clockType,
final Optional<Duration> defaultTtl,
final Optional<SlateDbStorageOptions> storageOptions,
final int numKafkaPartitions,
final RS3Client rs3Client
) {
Expand All @@ -121,7 +135,7 @@ public static UUID createStore(
storeType,
clockType,
defaultTtl.map(Duration::toMillis),
Optional.empty()
storageOptions
);

final var result = rs3Client.createStore(storeName, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public static class SlateDbStorageOptions {
private final Optional<Integer> filterBitsPerKey;

public SlateDbStorageOptions(
final Optional<Integer> filterBitsPerKey) {
final Optional<Integer> filterBitsPerKey
) {
this.filterBitsPerKey = Objects.requireNonNull(filterBitsPerKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ private static RemoteKVTable<?> createRS3(
ttlResolver,
responsiveMetrics,
scopeBuilder,
() -> numPartitionsForKafkaTopic(sessionClients.admin(), changelogTopicName)
() -> numPartitionsForKafkaTopic(sessionClients.admin(), changelogTopicName),
params.schemaType()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.SlateDbStorageOptions;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client.Connector;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -70,7 +72,7 @@ public void setup() {
}

@Test
public void testBasicTableMapping() {
public void testBasicKeyValueTableMapping() {
final UUID storeId = new UUID(100, 200);
final String tableName = "test-table";
final int partitions = 5;
Expand All @@ -84,7 +86,8 @@ public void testBasicTableMapping() {
NO_TTL,
metrics,
scopeBuilder,
() -> partitions
() -> partitions,
KVSchema.KEY_VALUE
);
assertEquals(tableName, rs3Table.name());
assertEquals(storeId, rs3Table.storeId());
Expand All @@ -99,6 +102,38 @@ public void testBasicTableMapping() {
verify(client).createStore(tableName, expectedOptions);
}

@Test
public void testFactTableMapping() {
final UUID storeId = new UUID(100, 200);
final String tableName = "test-table";
final int partitions = 5;

when(client.createStore(anyString(), any(CreateStoreOptions.class)))
.thenReturn(new CreateStoreResult(storeId, List.of(1, 2, 3, 4, 5)));

final RS3TableFactory factory = newTestFactory();
final RS3KVTable rs3Table = (RS3KVTable) factory.kvTable(
tableName,
NO_TTL,
metrics,
scopeBuilder,
() -> partitions,
KVSchema.FACT
);
assertEquals(tableName, rs3Table.name());
assertEquals(storeId, rs3Table.storeId());

final var expectedStorageOptions = Optional.of(new SlateDbStorageOptions(Optional.of(20)));
final var expectedOptions = new CreateStoreOptions(
partitions,
CreateStoreTypes.StoreType.BASIC,
Optional.empty(),
Optional.empty(),
expectedStorageOptions
);
verify(client).createStore(tableName, expectedOptions);
}

@Test
public void testWindowTableMapping() {
final UUID storeId = new UUID(100, 200);
Expand Down
Loading