Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
832bb61
Initial implementation of RS3 window table
hachikuji Mar 31, 2025
e30d0ab
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 3, 2025
d50d841
Remove unneeded comment
hachikuji Apr 3, 2025
db3e73d
Test cases for serde
hachikuji Apr 3, 2025
68e1718
Update kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs…
hachikuji Apr 3, 2025
8d47b4d
use more final variables
hachikuji Apr 4, 2025
2770de1
review comments
hachikuji Apr 4, 2025
0cb4b5e
Remove TODO
hachikuji Apr 4, 2025
d47f03a
Update kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs…
hachikuji Apr 4, 2025
4452308
reduce visibility on RS3Writer
hachikuji Apr 4, 2025
a1e7916
Add more info to fail message
hachikuji Apr 4, 2025
2a6e247
Add basic writer test for window table
hachikuji Apr 4, 2025
8b5d2f5
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 5, 2025
7c8fdd1
Fix breakage from merge
hachikuji Apr 5, 2025
e46e96f
Add range implementations for window table
hachikuji Apr 8, 2025
b385f5e
Update rs3 proto
hachikuji Apr 8, 2025
03bdf57
Fix failing tests
hachikuji Apr 8, 2025
d210126
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 15, 2025
3e7ab01
Reset opentelemetry to main branch
hachikuji Apr 15, 2025
2a1e26f
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 23, 2025
c158a38
Fix breakage wip
hachikuji Apr 23, 2025
b07c812
Closer to compiling
hachikuji Apr 24, 2025
a9d7615
compiling again
hachikuji Apr 24, 2025
173b798
Fix checkstyle
hachikuji Apr 24, 2025
cfa3e71
Pass through default ttl/use stream time clock
hachikuji Apr 25, 2025
e188cd8
Fix failing end to end tests
hachikuji Apr 25, 2025
873b644
Implement windowed end-to-end test
hachikuji Apr 25, 2025
0728022
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 25, 2025
e2ae90c
fix merge breakage
hachikuji Apr 25, 2025
9a8045e
Fix remaining broken tests
hachikuji Apr 25, 2025
68e9fa8
Fix window value iteration with test case
hachikuji Apr 25, 2025
f80d98d
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 25, 2025
b93a539
Get rid of TODO
hachikuji Apr 25, 2025
4c5ee03
Don't commit jwik database
hachikuji Apr 25, 2025
db8fe92
Use wall clock time for now
hachikuji Apr 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ dependencies {
testImplementation("io.grpc:grpc-inprocess:${libs.versions.grpc.orNull}")
testImplementation("software.amazon.awssdk:kms:2.20.0")
testImplementation("software.amazon.awssdk:sso:2.20.0")
testImplementation("net.jqwik:jqwik:1.9.2")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import java.util.Objects;
import org.apache.kafka.common.utils.Bytes;

public class PssTablePartitioner implements TablePartitioner<Bytes, Integer> {
public abstract class PssTablePartitioner<K> implements TablePartitioner<K, Integer> {
private final PssPartitioner pssPartitioner;

public PssTablePartitioner(final PssPartitioner pssPartitioner) {
this.pssPartitioner = Objects.requireNonNull(pssPartitioner);
}

public abstract byte[] serialize(K key);

@Override
public Integer tablePartition(int kafkaPartition, Bytes key) {
return pssPartitioner.pss(key.get(), new LssId(kafkaPartition));
public Integer tablePartition(int kafkaPartition, K key) {
final var serializedKey = serialize(key);
return pssPartitioner.pss(serializedKey, new LssId(kafkaPartition));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,23 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSender;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RS3KVFlushManager extends KVFlushManager {
private static final Logger LOG = LoggerFactory.getLogger(RS3KVFlushManager.class);

private final UUID storeId;
private final RS3Client rs3Client;
private final LssId lssId;
private final RS3KVTable table;
private final HashMap<Integer, Optional<Long>> writtenOffsets;
private final Map<Integer, Optional<Long>> writtenOffsets;
private final int kafkaPartition;
private final PssPartitioner pssPartitioner;
private final HashMap<Integer, RS3KVWriter> writers = new HashMap<>();
Expand All @@ -53,7 +43,7 @@ public RS3KVFlushManager(
final RS3Client rs3Client,
final LssId lssId,
final RS3KVTable table,
final HashMap<Integer, Optional<Long>> writtenOffsets,
final Map<Integer, Optional<Long>> writtenOffsets,
final int kafkaPartition,
final PssPartitioner pssPartitioner
) {
Expand All @@ -73,7 +63,12 @@ public String tableName() {

@Override
public TablePartitioner<Bytes, Integer> partitioner() {
return new PssTablePartitioner(pssPartitioner);
return new PssTablePartitioner<>(pssPartitioner) {
@Override
public byte[] serialize(final Bytes key) {
return key.get();
}
};
}

@Override
Expand Down Expand Up @@ -162,136 +157,43 @@ public CompletionStage<RemoteWriteResult<Integer>> flush() {
}
}

private static final class RS3StreamFactory {
private final UUID storeId;
private final RS3Client rs3Client;
private final int pssId;
private final LssId lssId;
private final long endOffset;
private final Optional<Long> expectedWrittenOffset;

private RS3StreamFactory(
final UUID storeId,
final RS3Client rs3Client,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset
) {
this.storeId = storeId;
this.rs3Client = rs3Client;
this.pssId = pssId;
this.lssId = lssId;
this.endOffset = endOffset;
this.expectedWrittenOffset = expectedWrittenOffset;
}

StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync() {
return rs3Client.writeWalSegmentAsync(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset
);
}

Optional<Long> writeWalSegmentSync(List<WalEntry> entries) {
return rs3Client.writeWalSegment(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset,
entries
);
}

}

private static final class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final RS3StreamFactory streamFactory;
private static final class RS3KVWriter extends RS3Writer<Bytes> {
private final RS3KVTable table;
private final int kafkaPartition;
private final List<WalEntry> retryBuffer = new ArrayList<>();
private final StreamSenderMessageReceiver<WalEntry, Optional<Long>> sendRecv;

private RS3KVWriter(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final RS3KVTable rs3Table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
) {
this.table = Objects.requireNonNull(table);
this.streamFactory = new RS3StreamFactory(
storeId,
rs3Client,
pssId,
lssId,
endOffset,
expectedWrittenOffset
);
this.kafkaPartition = kafkaPartition;
this.sendRecv = streamFactory.writeWalSegmentAsync();
}

long endOffset() {
return streamFactory.endOffset;
super(storeId, rs3Client, pssId, lssId, endOffset, expectedWrittenOffset, kafkaPartition);
this.table = rs3Table;
}

@Override
public void insert(final Bytes key, final byte[] value, final long timestampMs) {
maybeSendNext(table.insert(kafkaPartition, key, value, timestampMs));
}

@Override
public void delete(final Bytes key) {
maybeSendNext(table.delete(kafkaPartition, key));
}

private void maybeSendNext(WalEntry entry) {
retryBuffer.add(entry);
ifActiveStream(sender -> sender.sendNext(entry));
}

private void ifActiveStream(Consumer<StreamSender<WalEntry>> streamConsumer) {
if (sendRecv.isActive()) {
try {
streamConsumer.accept(sendRecv.sender());
} catch (final RS3TransientException e) {
// Retry the stream in flush()
}
}
protected WalEntry createInsert(
final Bytes key,
final byte[] value,
final long timestampMs
) {
return table.insert(
kafkaPartition(),
key,
value,
timestampMs
);
}

@Override
public CompletionStage<RemoteWriteResult<Integer>> flush() {
ifActiveStream(StreamSender::finish);

return sendRecv.completion().handle((result, throwable) -> {
Optional<Long> flushedOffset = result;

var cause = throwable;
if (throwable instanceof CompletionException) {
cause = throwable.getCause();
}

if (cause instanceof RS3TransientException) {
flushedOffset = streamFactory.writeWalSegmentSync(retryBuffer);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (cause != null) {
throw new RuntimeException(throwable);
}

LOG.debug("last flushed offset for pss/lss {}/{} is {}",
streamFactory.pssId, streamFactory.lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
});
protected WalEntry createDelete(final Bytes key) {
return table.delete(
kafkaPartition(),
key
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
import dev.responsive.kafka.internal.db.KVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.LssMetadata;
import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand All @@ -38,6 +37,7 @@ public class RS3KVTable implements RemoteKVTable<WalEntry> {
private final String name;
private final UUID storeId;
private final RS3Client rs3Client;
private final RS3ClientUtil rs3ClientUtil;
private final PssPartitioner pssPartitioner;
private LssId lssId;
private Long fetchOffset = ResponsiveStoreRegistration.NO_COMMITTED_OFFSET;
Expand All @@ -58,6 +58,7 @@ public RS3KVTable(
Objects.requireNonNull(responsiveMetrics),
Objects.requireNonNull(scopeBuilder)
);
this.rs3ClientUtil = new RS3ClientUtil(storeId, rs3Client, pssPartitioner);
this.pssPartitioner = Objects.requireNonNull(pssPartitioner);
}

Expand All @@ -74,40 +75,14 @@ public KVFlushManager init(final int kafkaPartition) {

this.lssId = new LssId(kafkaPartition);

// TODO: we should write an empty segment periodically to any PSS that we haven't
// written to to bump the written offset
final HashMap<Integer, Optional<Long>> lastWrittenOffset = new HashMap<>();
for (final int pss : pssPartitioner.pssForLss(this.lssId)) {
final var offsets = rs3Client.getCurrentOffsets(storeId, lssId, pss);
lastWrittenOffset.put(pss, offsets.writtenOffset());
}
final var fetchOffsetOrMinusOne = lastWrittenOffset.values().stream()
.map(v -> v.orElse(-1L))
.min(Long::compare)
.orElse(-1L);
if (fetchOffsetOrMinusOne == -1) {
this.fetchOffset = ResponsiveStoreRegistration.NO_COMMITTED_OFFSET;
} else {
this.fetchOffset = fetchOffsetOrMinusOne;
}

final var writtenOffsetsStr = lastWrittenOffset.entrySet().stream()
.map(e -> String.format("%s -> %s",
e.getKey(),
e.getValue().map(Object::toString).orElse("none")))
.collect(Collectors.joining(","));
LOG.info("restore rs3 kv table from offset {} for {}. recorded written offsets: {}",
fetchOffset,
kafkaPartition,
writtenOffsetsStr
);

flushManager = new RS3KVFlushManager(
LssMetadata lssMetadata = rs3ClientUtil.fetchLssMetadata(lssId);
this.fetchOffset = lssMetadata.lastWrittenOffset();
this.flushManager = new RS3KVFlushManager(
storeId,
rs3Client,
lssId,
this,
lastWrittenOffset,
lssMetadata.writtenOffsets(),
kafkaPartition,
pssPartitioner
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.RemoteWindowTable;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
Expand All @@ -34,15 +35,9 @@ public RemoteKVTable<WalEntry> kvTable(
final ResponsiveMetrics responsiveMetrics,
final ResponsiveMetrics.MetricScopeBuilder scopeBuilder
) {
Map<String, String> storeIdMapping = config.getMap(
ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG);
final String storeIdHex = storeIdMapping.get(name);
if (storeIdHex == null) {
throw new ConfigException("Failed to find store ID mapping for table " + name);
}

final UUID storeId = UUID.fromString(storeIdHex);
final PssPartitioner pssPartitioner = new PssDirectPartitioner();
final var storeId = lookupStoreId(config, name);
final var pssPartitioner = new PssDirectPartitioner();
final var rs3Client = connector.connect();
return new RS3KVTable(
name,
Expand All @@ -54,6 +49,35 @@ public RemoteKVTable<WalEntry> kvTable(
);
}

public RemoteWindowTable<WalEntry> windowTable(
final String name,
final ResponsiveConfig config,
final ResponsiveMetrics responsiveMetrics,
final ResponsiveMetrics.MetricScopeBuilder scopeBuilder
) {
final var storeId = lookupStoreId(config, name);
final var pssPartitioner = new PssDirectPartitioner();
final var rs3Client = connector.connect();
return new RS3WindowTable(
name,
storeId,
rs3Client,
pssPartitioner,
responsiveMetrics,
scopeBuilder
);
}

private UUID lookupStoreId(ResponsiveConfig config, String name) {
Map<String, String> storeIdMapping = config.getMap(
ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG);
final String storeIdHex = storeIdMapping.get(name);
if (storeIdHex == null) {
throw new ConfigException("Failed to find store ID mapping for table " + name);
}
return UUID.fromString(storeIdHex);
}

public void close() {
}
}
Loading
Loading