Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
832bb61
Initial implementation of RS3 window table
hachikuji Mar 31, 2025
e30d0ab
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 3, 2025
d50d841
Remove unneeded comment
hachikuji Apr 3, 2025
db3e73d
Test cases for serde
hachikuji Apr 3, 2025
68e1718
Update kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs…
hachikuji Apr 3, 2025
8d47b4d
use more final variables
hachikuji Apr 4, 2025
2770de1
review comments
hachikuji Apr 4, 2025
0cb4b5e
Remove TODO
hachikuji Apr 4, 2025
d47f03a
Update kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs…
hachikuji Apr 4, 2025
4452308
reduce visibility on RS3Writer
hachikuji Apr 4, 2025
a1e7916
Add more info to fail message
hachikuji Apr 4, 2025
2a6e247
Add basic writer test for window table
hachikuji Apr 4, 2025
8b5d2f5
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 5, 2025
7c8fdd1
Fix breakage from merge
hachikuji Apr 5, 2025
e46e96f
Add range implementations for window table
hachikuji Apr 8, 2025
b385f5e
Update rs3 proto
hachikuji Apr 8, 2025
03bdf57
Fix failing tests
hachikuji Apr 8, 2025
d210126
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 15, 2025
3e7ab01
Reset opentelemetry to main branch
hachikuji Apr 15, 2025
2a1e26f
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 23, 2025
c158a38
Fix breakage wip
hachikuji Apr 23, 2025
b07c812
Closer to compiling
hachikuji Apr 24, 2025
a9d7615
compiling again
hachikuji Apr 24, 2025
173b798
Fix checkstyle
hachikuji Apr 24, 2025
cfa3e71
Pass through default ttl/use stream time clock
hachikuji Apr 25, 2025
e188cd8
Fix failing end to end tests
hachikuji Apr 25, 2025
873b644
Implement windowed end-to-end test
hachikuji Apr 25, 2025
0728022
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 25, 2025
e2ae90c
fix merge breakage
hachikuji Apr 25, 2025
9a8045e
Fix remaining broken tests
hachikuji Apr 25, 2025
68e9fa8
Fix window value iteration with test case
hachikuji Apr 25, 2025
f80d98d
Merge remote-tracking branch 'origin/main' into rs3-window-operations
hachikuji Apr 25, 2025
b93a539
Get rid of TODO
hachikuji Apr 25, 2025
4c5ee03
Don't commit jwik database
hachikuji Apr 25, 2025
db8fe92
Use wall clock time for now
hachikuji Apr 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added kafka-client/.jqwik-database
Binary file not shown.
1 change: 1 addition & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ dependencies {
testImplementation("io.grpc:grpc-inprocess:${libs.versions.grpc.orNull}")
testImplementation("software.amazon.awssdk:kms:2.20.0")
testImplementation("software.amazon.awssdk:sso:2.20.0")
testImplementation("net.jqwik:jqwik:1.9.2")
}
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from 92841a to da6498
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public class ResponsiveWindowBytesStoreSupplier implements WindowBytesStoreSuppl

public ResponsiveWindowBytesStoreSupplier(final ResponsiveWindowParams params) {
this.params = params;
this.segmentIntervalMs = computeSegmentInterval(params.retentionPeriod(), params.numSegments());
this.segmentIntervalMs = computeSegmentInterval(
params.retentionPeriodMs(),
params.numSegments()
);
}

@Override
Expand Down Expand Up @@ -57,7 +60,7 @@ public boolean retainDuplicates() {

@Override
public long retentionPeriod() {
return params.retentionPeriod();
return params.retentionPeriodMs();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public long windowSize() {
return windowSizeMs;
}

public long retentionPeriod() {
public long retentionPeriodMs() {
return retentionPeriodMs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import java.util.Objects;
import org.apache.kafka.common.utils.Bytes;

public class PssTablePartitioner implements TablePartitioner<Bytes, Integer> {
public abstract class PssTablePartitioner<K> implements TablePartitioner<K, Integer> {
private final PssPartitioner pssPartitioner;

public PssTablePartitioner(final PssPartitioner pssPartitioner) {
this.pssPartitioner = Objects.requireNonNull(pssPartitioner);
}

public abstract byte[] serialize(K key);

@Override
public Integer tablePartition(int kafkaPartition, Bytes key) {
return pssPartitioner.pss(key.get(), new LssId(kafkaPartition));
public Integer tablePartition(int kafkaPartition, K key) {
final var serializedKey = serialize(key);
return pssPartitioner.pss(serializedKey, new LssId(kafkaPartition));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,23 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException;
import dev.responsive.kafka.internal.db.rs3.client.StreamSender;
import dev.responsive.kafka.internal.db.rs3.client.StreamSenderMessageReceiver;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RS3KVFlushManager extends KVFlushManager {
private static final Logger LOG = LoggerFactory.getLogger(RS3KVFlushManager.class);

private final UUID storeId;
private final RS3Client rs3Client;
private final LssId lssId;
private final RS3KVTable table;
private final HashMap<Integer, Optional<Long>> writtenOffsets;
private final Map<Integer, Optional<Long>> writtenOffsets;
private final int kafkaPartition;
private final PssPartitioner pssPartitioner;
private final HashMap<Integer, RS3KVWriter> writers = new HashMap<>();
Expand All @@ -53,7 +43,7 @@ public RS3KVFlushManager(
final RS3Client rs3Client,
final LssId lssId,
final RS3KVTable table,
final HashMap<Integer, Optional<Long>> writtenOffsets,
final Map<Integer, Optional<Long>> writtenOffsets,
final int kafkaPartition,
final PssPartitioner pssPartitioner
) {
Expand All @@ -73,7 +63,12 @@ public String tableName() {

@Override
public TablePartitioner<Bytes, Integer> partitioner() {
return new PssTablePartitioner(pssPartitioner);
return new PssTablePartitioner<>(pssPartitioner) {
@Override
public byte[] serialize(final Bytes key) {
return key.get();
}
};
}

@Override
Expand Down Expand Up @@ -132,8 +127,10 @@ public RemoteWriteResult<Integer> updateOffset(final long consumedOffset) {

@Override
public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) {
// TODO: fill me in with info about last written offsets
return "";
return String.format("<tablePartition=%d, batchOffset=%d, persistedOffset=%d, "
+ "storeId=%s, lssId=%d>>",
failedTablePartition, batchOffset, table.lastWrittenOffset(kafkaPartition),
storeId, lssId.id());
}

@Override
Expand Down Expand Up @@ -162,136 +159,43 @@ public CompletionStage<RemoteWriteResult<Integer>> flush() {
}
}

private static final class RS3StreamFactory {
private final UUID storeId;
private final RS3Client rs3Client;
private final int pssId;
private final LssId lssId;
private final long endOffset;
private final Optional<Long> expectedWrittenOffset;

private RS3StreamFactory(
final UUID storeId,
final RS3Client rs3Client,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset
) {
this.storeId = storeId;
this.rs3Client = rs3Client;
this.pssId = pssId;
this.lssId = lssId;
this.endOffset = endOffset;
this.expectedWrittenOffset = expectedWrittenOffset;
}

StreamSenderMessageReceiver<WalEntry, Optional<Long>> writeWalSegmentAsync() {
return rs3Client.writeWalSegmentAsync(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset
);
}

Optional<Long> writeWalSegmentSync(List<WalEntry> entries) {
return rs3Client.writeWalSegment(
storeId,
lssId,
pssId,
expectedWrittenOffset,
endOffset,
entries
);
}

}

private static final class RS3KVWriter implements RemoteWriter<Bytes, Integer> {
private final RS3StreamFactory streamFactory;
private static final class RS3KVWriter extends RS3Writer<Bytes> {
private final RS3KVTable table;
private final int kafkaPartition;
private final List<WalEntry> retryBuffer = new ArrayList<>();
private final StreamSenderMessageReceiver<WalEntry, Optional<Long>> sendRecv;

private RS3KVWriter(
final UUID storeId,
final RS3Client rs3Client,
final RS3KVTable table,
final RS3KVTable rs3Table,
final int pssId,
final LssId lssId,
final long endOffset,
final Optional<Long> expectedWrittenOffset,
final int kafkaPartition
) {
this.table = Objects.requireNonNull(table);
this.streamFactory = new RS3StreamFactory(
storeId,
rs3Client,
pssId,
lssId,
endOffset,
expectedWrittenOffset
);
this.kafkaPartition = kafkaPartition;
this.sendRecv = streamFactory.writeWalSegmentAsync();
}

long endOffset() {
return streamFactory.endOffset;
super(storeId, rs3Client, pssId, lssId, endOffset, expectedWrittenOffset, kafkaPartition);
this.table = rs3Table;
}

@Override
public void insert(final Bytes key, final byte[] value, final long timestampMs) {
maybeSendNext(table.insert(kafkaPartition, key, value, timestampMs));
}

@Override
public void delete(final Bytes key) {
maybeSendNext(table.delete(kafkaPartition, key));
}

private void maybeSendNext(WalEntry entry) {
retryBuffer.add(entry);
ifActiveStream(sender -> sender.sendNext(entry));
}

private void ifActiveStream(Consumer<StreamSender<WalEntry>> streamConsumer) {
if (sendRecv.isActive()) {
try {
streamConsumer.accept(sendRecv.sender());
} catch (final RS3TransientException e) {
// Retry the stream in flush()
}
}
protected WalEntry createInsert(
final Bytes key,
final byte[] value,
final long timestampMs
) {
return table.insert(
kafkaPartition(),
key,
value,
timestampMs
);
}

@Override
public CompletionStage<RemoteWriteResult<Integer>> flush() {
ifActiveStream(StreamSender::finish);

return sendRecv.completion().handle((result, throwable) -> {
Optional<Long> flushedOffset = result;

var cause = throwable;
if (throwable instanceof CompletionException) {
cause = throwable.getCause();
}

if (cause instanceof RS3TransientException) {
flushedOffset = streamFactory.writeWalSegmentSync(retryBuffer);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (cause != null) {
throw new RuntimeException(throwable);
}

LOG.debug("last flushed offset for pss/lss {}/{} is {}",
streamFactory.pssId, streamFactory.lssId, flushedOffset);
return RemoteWriteResult.success(kafkaPartition);
});
protected WalEntry createDelete(final Bytes key) {
return table.delete(
kafkaPartition(),
key
);
}
}

Expand Down
Loading
Loading