Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_CONNECTION_STRING_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_CONFIG_SETTER_CLASS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_HOSTNAME_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_PORT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_RETRY_TIMEOUT_CONFIG;
Expand All @@ -30,6 +31,7 @@
import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;

import dev.responsive.kafka.api.config.RS3ConfigSetter;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers;
Expand Down Expand Up @@ -552,10 +554,13 @@ public Params build() {
rs3Connector.retryTimeoutMs(responsiveConfig.getLong(RS3_RETRY_TIMEOUT_CONFIG));
rs3Connector.useTls(responsiveConfig.getBoolean(RS3_TLS_ENABLED_CONFIG));

final var configSetter = responsiveConfig.getConfiguredInstance(
RS3_CONFIG_SETTER_CLASS_CONFIG, RS3ConfigSetter.class, responsiveConfig.originals()
);
sessionClients = new SessionClients(
Optional.empty(),
Optional.empty(),
Optional.of(new RS3TableFactory(rs3Connector)),
Optional.of(new RS3TableFactory(rs3Connector, configSetter)),
storageBackend,
admin
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* Copyright 2025 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.api.config;

import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.stores.SchemaTypes.SessionSchema;
import dev.responsive.kafka.internal.stores.SchemaTypes.WindowSchema;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.Configurable;

public interface RS3ConfigSetter extends Configurable {

/**
* Sets the configs for a {@link org.apache.kafka.streams.state.KeyValueStore}
* with the provided name and schema. Can return {@link Optional#empty()} to defer to
* the default settings for the given store type.
*
* @return the configs to override for this store
*/
RS3StoreParams keyValueStoreConfig(String storeName, KVSchema schema);

/**
* Sets the configs for a {@link org.apache.kafka.streams.state.WindowStore}
* with the provided name and schema. Can return {@link Optional#empty()} to defer to
* the default settings for the given store type.
*
* @return the configs to override for this store
*/
RS3StoreParams windowStoreConfig(String storeName, WindowSchema schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could return a more specific type (e.g. RS3WindowStoreParams) in case there are different configurations for the respective store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's probably the right thing to do...I considered that but ended up consolidating them to reduce API surface. But the problem with a generic RS3StoreParams is that it's up to the user to use the correct base/default configs for the given store type. Which will only become worse if we do have different configs for different store types in the furure

i'll go back to the store-specific params


/**
* Sets the configs for a {@link org.apache.kafka.streams.state.SessionStore}
* with the provided name and schema. Can return {@code super.sessionStoreConfig} to defer to
* the default settings for the given store type.
*
* @return the configs to override for this store
*/
RS3StoreParams sessionStoreConfig(String storeName, SessionSchema schema);

@Override
default void configure(final Map<String, ?> configs) {
}

class DefaultRS3ConfigSetter implements RS3ConfigSetter {

@Override
public RS3StoreParams keyValueStoreConfig(final String storeName, final KVSchema schema) {
return RS3StoreParams.defaultKV(schema);
}

@Override
public RS3StoreParams windowStoreConfig(final String storeName, final WindowSchema schema) {
return RS3StoreParams.defaultWindow(schema);
}

@Override
public RS3StoreParams sessionStoreConfig(final String storeName, final SessionSchema schema) {
return RS3StoreParams.defaultSession(schema);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
*
* Copyright 2025 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.api.config;

import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.stores.SchemaTypes.SessionSchema;
import dev.responsive.kafka.internal.stores.SchemaTypes.WindowSchema;
import java.util.Optional;

public class RS3StoreParams {

public static final Optional<Integer> DEFAULT_FACT_STORE_FILTER_BITS = Optional.of(20);

private final Optional<Integer> filterBitsPerKey;

private RS3StoreParams(final Optional<Integer> filterBitsPerKey) {
this.filterBitsPerKey = filterBitsPerKey;
}

public static RS3StoreParams defaultKV(final KVSchema schema) {
final Optional<Integer> defaultFilterBitsPerKey = schema == KVSchema.FACT
? DEFAULT_FACT_STORE_FILTER_BITS
: Optional.empty();
return new RS3StoreParams(defaultFilterBitsPerKey);
}

public static RS3StoreParams defaultWindow(final WindowSchema schema) {
return new RS3StoreParams(Optional.empty());
}

public static RS3StoreParams defaultSession(final SessionSchema schema) {
return new RS3StoreParams(Optional.empty());
}

public RS3StoreParams withFilterBitsPerKey(final int filterBitsPerKey) {
return new RS3StoreParams(Optional.of(filterBitsPerKey));
}

public Optional<Integer> filterBitsPerKey() {
return filterBitsPerKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.RS3ConfigSetter.DefaultRS3ConfigSetter;
import dev.responsive.kafka.internal.db.partitioning.Murmur3Hasher;
import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -141,6 +142,12 @@ public class ResponsiveConfig extends AbstractConfig {
private static final String RS3_RETRY_TIMEOUT_DOC = "Timeout in milliseconds for retries when RS3 endpoint is unavailable";
public static final long RS3_RETRY_TIMEOUT_DEFAULT = 120000;

public static final String RS3_CONFIG_SETTER_CLASS_CONFIG = "responsive.rs3.config.setter.class";
private static final String RS3_CONFIG_SETTER_CLASS_DOC = "Class name implementing RS3ConfigSetter which can be used to"
+ " configure individual rs3 stores by name and store type";
private static final Class<? extends RS3ConfigSetter> RS3_CONFIG_SETTER_CLASS_DEFAULT = DefaultRS3ConfigSetter.class;


// ------------------ ScyllaDB specific configurations ----------------------

public static final String CASSANDRA_USERNAME_CONFIG = "responsive.cassandra.username";
Expand Down Expand Up @@ -642,6 +649,12 @@ public class ResponsiveConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
RS3_RETRY_TIMEOUT_DOC
).define(
RS3_CONFIG_SETTER_CLASS_CONFIG,
Type.CLASS,
RS3_CONFIG_SETTER_CLASS_DEFAULT,
Importance.LOW,
RS3_CONFIG_SETTER_CLASS_DOC
).define(
ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG,
Type.LONG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.api.config.RS3ConfigSetter;
import dev.responsive.kafka.api.config.RS3StoreParams;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.RemoteWindowTable;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes;
Expand All @@ -23,6 +25,7 @@
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.stores.SchemaTypes.WindowSchema;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.time.Duration;
import java.util.Map;
Expand All @@ -36,16 +39,18 @@
public class RS3TableFactory {
private static final Logger LOG = LoggerFactory.getLogger(RS3TableFactory.class);

// TODO: move this and make it configurable
private static final int DEFAULT_FACT_STORE_FILTER_BITS = 20;

private final GrpcRS3Client.Connector connector;
private final RS3ConfigSetter configSetter;

// kafka store names to track which stores we've created in RS3
private final Map<String, UUID> createdStores = new ConcurrentHashMap<>();

public RS3TableFactory(GrpcRS3Client.Connector connector) {
public RS3TableFactory(
final GrpcRS3Client.Connector connector,
final RS3ConfigSetter configSetter
) {
this.connector = connector;
this.configSetter = configSetter;
}

public RemoteKVTable<WalEntry> kvTable(
Expand All @@ -63,9 +68,9 @@ public RemoteKVTable<WalEntry> kvTable(
final Optional<ClockType> clockType = ttlResolver.isPresent()
? Optional.of(ClockType.WALL_CLOCK)
: Optional.empty();
final Optional<SlateDbStorageOptions> storageOptions = schema == KVSchema.FACT
? Optional.of(new SlateDbStorageOptions(Optional.of(DEFAULT_FACT_STORE_FILTER_BITS)))
: Optional.empty();

final RS3StoreParams params = configSetter.keyValueStoreConfig(storeName, schema);
final var storageOptions = new SlateDbStorageOptions(params.filterBitsPerKey());

final var rs3Client = connector.connect();

Expand All @@ -76,7 +81,7 @@ public RemoteKVTable<WalEntry> kvTable(
CreateStoreTypes.StoreType.BASIC,
clockType,
defaultTtl,
storageOptions,
Optional.of(storageOptions),
computeNumKafkaPartitions.get(),
rs3Client
));
Expand All @@ -97,15 +102,20 @@ public RemoteWindowTable<WalEntry> windowTable(
final Duration defaultTtl,
final ResponsiveMetrics responsiveMetrics,
final ResponsiveMetrics.MetricScopeBuilder scopeBuilder,
final Supplier<Integer> computeNumKafkaPartitions
final Supplier<Integer> computeNumKafkaPartitions,
final WindowSchema schema
) {
final var rs3Client = connector.connect();

final RS3StoreParams params = configSetter.windowStoreConfig(storeName, schema);
final var storageOptions = new SlateDbStorageOptions(params.filterBitsPerKey());

final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore(
storeName,
CreateStoreTypes.StoreType.WINDOW,
Optional.of(ClockType.WALL_CLOCK),
Optional.of(defaultTtl),
Optional.empty(),
Optional.of(storageOptions),
computeNumKafkaPartitions.get(),
rs3Client
));
Expand All @@ -121,7 +131,7 @@ public RemoteWindowTable<WalEntry> windowTable(
);
}

public static UUID createStore(
private static UUID createStore(
final String storeName,
final CreateStoreTypes.StoreType storeType,
final Optional<ClockType> clockType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ private static RemoteWindowTable<?> createRs3(
defaultTtl,
responsiveMetrics,
scopeBuilder,
() -> numPartitionsForKafkaTopic(sessionClients.admin(), changelogTopicName)
() -> numPartitionsForKafkaTopic(sessionClients.admin(), changelogTopicName),
params.schemaType()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import dev.responsive.kafka.api.config.RS3ConfigSetter.DefaultRS3ConfigSetter;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult;
Expand All @@ -30,6 +31,7 @@
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.stores.SchemaTypes.WindowSchema;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -150,7 +152,8 @@ public void testWindowTableMapping() {
defaultTtl,
metrics,
scopeBuilder,
() -> partitions
() -> partitions,
WindowSchema.WINDOW
);
assertEquals(tableName, rs3Table.name());
assertEquals(storeId, rs3Table.storeId());
Expand All @@ -168,7 +171,7 @@ public void testWindowTableMapping() {
private RS3TableFactory newTestFactory() {
final var connector = mock(Connector.class);
lenient().when(connector.connect()).thenReturn(client);
return new RS3TableFactory(connector);
return new RS3TableFactory(connector, new DefaultRS3ConfigSetter());
}

}
Loading