Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from 5b91fb to f385e7
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ public class ResponsiveConfig extends AbstractConfig {
public static final String RS3_PORT_CONFIG = "responsive.rs3.port";
private static final String RS3_PORT_DOC = "The port to use when connecting to RS3.";

public static final String RS3_LOGICAL_STORE_MAPPING_CONFIG = "responsive.rs3.logical.store.mapping";
public static final String RS3_LOGICAL_STORE_MAPPING_DOC = "Mapping from table name to RS3 logical store ID (e.g. 'table:b1a45157-e2f0-4698-be0e-5bf3a9b8e9d1,...')";

public static final String RS3_TLS_ENABLED_CONFIG = "responsive.rs3.tls.enabled";
private static final String RS3_TLS_ENABLED_DOC = "Enables/disable tls for rs3 connection";
public static final boolean RS3_TLS_ENABLED_DEFAULT = true;
Expand Down Expand Up @@ -632,12 +629,6 @@ public class ResponsiveConfig extends AbstractConfig {
50051,
Importance.MEDIUM,
RS3_PORT_DOC
).define(
RS3_LOGICAL_STORE_MAPPING_CONFIG,
Type.STRING,
"",
Importance.HIGH,
RS3_LOGICAL_STORE_MAPPING_DOC
).define(
RS3_TLS_ENABLED_CONFIG,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public class RS3KVTable implements RemoteKVTable<WalEntry> {
private RS3KVFlushManager flushManager;

public RS3KVTable(
final String name,
final String storeName,
final UUID storeId,
final RS3Client rs3Client,
final PssPartitioner pssPartitioner,
final ResponsiveMetrics responsiveMetrics,
final ResponsiveMetrics.MetricScopeBuilder scopeBuilder
) {
this.name = Objects.requireNonNull(name);
this.name = Objects.requireNonNull(storeName);
this.storeId = Objects.requireNonNull(storeId);
this.rs3Client = new MeteredRS3Client(
Objects.requireNonNull(rs3Client),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@

package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreOptions.ClockType;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.ClockType;
import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,51 +33,24 @@ public class RS3TableFactory {
private final GrpcRS3Client.Connector connector;

// kafka store names to track which stores we've created in RS3
private final Set<String> createdStores = new ConcurrentSkipListSet<>();
private final Map<String, UUID> createdStores = new ConcurrentHashMap<>();

public RS3TableFactory(GrpcRS3Client.Connector connector) {
this.connector = connector;
}

public RemoteKVTable<WalEntry> kvTable(
final String storeName,
final ResponsiveConfig config,
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveMetrics responsiveMetrics,
final ResponsiveMetrics.MetricScopeBuilder scopeBuilder,
final Supplier<Integer> computeNumKafkaPartitions
) {
final Map<String, String> storeIdMapping = config.getMap(
ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG);
final String storeIdHex = storeIdMapping.get(storeName);
if (storeIdHex == null) {
throw new ConfigException("Failed to find store ID mapping for table " + storeName);
}

final UUID storeId = UUID.fromString(storeIdHex);

final var rs3Client = connector.connect();

if (!createdStores.contains(storeName)) {
final int kafkaPartitions = computeNumKafkaPartitions.get();

final Optional<Long> defaultTtl =
ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()
? Optional.of(ttlResolver.get().defaultTtl().duration().toMillis())
: Optional.empty();

final var options = new CreateStoreOptions(
ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(),
defaultTtl,
Optional.empty()
);

final var pss_ids = rs3Client.createStore(storeId, kafkaPartitions, options);
LOG.info("Created store {} with {} logical shards and {} physical shards",
storeName, kafkaPartitions, pss_ids.size());

createdStores.add(storeName);
}
final UUID storeId = createdStores.computeIfAbsent(storeName, n -> createStore(
storeName, ttlResolver, computeNumKafkaPartitions.get(), rs3Client
));

final PssPartitioner pssPartitioner = new PssDirectPartitioner();
return new RS3KVTable(
Expand All @@ -92,6 +63,31 @@ public RemoteKVTable<WalEntry> kvTable(
);
}

public static UUID createStore(
final String storeName,
final Optional<TtlResolver<?, ?>> ttlResolver,
final int numKafkaPartitions,
final RS3Client rs3Client
) {

final Optional<Long> defaultTtl =
ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()
? Optional.of(ttlResolver.get().defaultTtl().duration().toMillis())
: Optional.empty();

final var options = new CreateStoreOptions(
ttlResolver.isPresent() ? Optional.of(ClockType.WALL_CLOCK) : Optional.empty(),
defaultTtl,
Optional.empty()
);

final var result = rs3Client.createStore(storeName, numKafkaPartitions, options);
LOG.info("Created store {} ({}) with {} logical shards and {} physical shards",
storeName, result.storeId(), numKafkaPartitions, result.pssIds().size());

return result.storeId();
}

public void close() {
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package dev.responsive.kafka.internal.db.rs3.client;

import dev.responsive.rs3.Rs3;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

public class CreateStoreTypes {

public enum ClockType {
WALL_CLOCK,
STREAM_TIME
}

public static class CreateStoreOptions {

private final Optional<ClockType> clockType;
private final Optional<Long> defaultTtl;
private final Optional<Integer> filterBitsPerKey;


public CreateStoreOptions(
final Optional<ClockType> clockType,
final Optional<Long> defaultTtl,
final Optional<Integer> filterBitsPerKey
) {
this.clockType = clockType;
this.defaultTtl = defaultTtl;
this.filterBitsPerKey = filterBitsPerKey;
}

public Optional<ClockType> clockType() {
return clockType;
}

public Optional<Long> defaultTtl() {
return defaultTtl;
}

public Optional<Integer> filterBitsPerKey() {
return filterBitsPerKey;
}

public Rs3.CreateStoreOptions toProto() {
final var builder = Rs3.CreateStoreOptions.newBuilder();
clockType.ifPresent(
type -> builder.setClockType(Rs3.CreateStoreOptions.ClockType.forNumber(type.ordinal()))
);
defaultTtl.ifPresent(builder::setDefaultTtl);
filterBitsPerKey.ifPresent(builder::setFilterBitsPerKey);
return builder.build();
}

@Override
public String toString() {
return "CreateStoreOptions{"
+ "clockType=" + clockType
+ ", defaultTtl=" + defaultTtl
+ ", filterBitsPerKey=" + filterBitsPerKey
+ '}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final CreateStoreOptions that = (CreateStoreOptions) o;

if (!clockType.equals(that.clockType)) {
return false;
}
if (!defaultTtl.equals(that.defaultTtl)) {
return false;
}
return filterBitsPerKey.equals(that.filterBitsPerKey);
}

@Override
public int hashCode() {
int result = clockType.hashCode();
result = 31 * result + defaultTtl.hashCode();
result = 31 * result + filterBitsPerKey.hashCode();
return result;
}
}

public static class CreateStoreResult {
private final UUID storeId;
private final List<Integer> pssIds;

public CreateStoreResult(final UUID storeId, final List<Integer> pssIds) {
this.storeId = storeId;
this.pssIds = pssIds;
}

public UUID storeId() {
return storeId;
}

public List<Integer> pssIds() {
return pssIds;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final CreateStoreResult that = (CreateStoreResult) o;

if (!storeId.equals(that.storeId)) {
return false;
}
return pssIds.equals(that.pssIds);
}

@Override
public int hashCode() {
int result = storeId.hashCode();
result = 31 * result + pssIds.hashCode();
return result;
}
}

}
Loading
Loading