From f7f200859aa24aa1a5aec7181e0f333e456ab5e7 Mon Sep 17 00:00:00 2001 From: Jason Gustafson <12502538+hachikuji@users.noreply.github.com> Date: Fri, 9 May 2025 10:07:00 -0700 Subject: [PATCH] Pass through event timestamp for windowed key values --- kafka-client/src/main/external-protos/rs3 | 2 +- .../db/rs3/client/grpc/WalEntryPutWriter.java | 1 + .../client/grpc/WalEntryPutWriterTest.java | 111 ++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriterTest.java diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index fb2d8f80c..d724b0183 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit fb2d8f80c9a976f7999a131a75c74d9b82bdcc4c +Subproject commit d724b0183fc26554fdc79bf4626a67857f6fea03 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java index 6bf05c277..2a2d0884a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java @@ -51,6 +51,7 @@ public void visit(final WindowedPut windowedPut) { .setKey(ByteString.copyFrom(windowedPut.key())) .setWindowTimestamp(windowedPut.windowTimestamp()); final var valueProto = Rs3.WindowValue.newBuilder() + .setEventTimestamp(windowedPut.timestamp()) .setValue(ByteString.copyFrom(windowedPut.value())); final var kvProto = Rs3.WindowKeyValue.newBuilder() .setKey(keyProto) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriterTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriterTest.java new file mode 100644 index 000000000..5b2034d24 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriterTest.java @@ -0,0 +1,111 @@ +package dev.responsive.kafka.internal.db.rs3.client.grpc; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.internal.db.rs3.client.Delete; +import dev.responsive.kafka.internal.db.rs3.client.Put; +import dev.responsive.kafka.internal.db.rs3.client.WindowedDelete; +import dev.responsive.kafka.internal.db.rs3.client.WindowedPut; +import dev.responsive.rs3.Rs3; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; + +class WalEntryPutWriterTest { + + @Test + public void shouldCreatePutRequest() { + final var builder = Rs3.WriteWALSegmentRequest.newBuilder(); + final var writer = new WalEntryPutWriter(builder); + + final var key = "foo"; + final var value = "bar"; + writer.visit(new Put( + key.getBytes(StandardCharsets.UTF_8), + value.getBytes(StandardCharsets.UTF_8) + )); + + final var request = builder.build(); + assertThat(request.hasPut(), is(true)); + + final var kv = request.getPut().getKv(); + assertThat(kv.hasBasicKv(), is(true)); + + final var windowKv = kv.getBasicKv(); + assertThat(windowKv.getKey().getKey().toStringUtf8(), is(key)); + assertThat(windowKv.getValue().getValue().toStringUtf8(), is(value)); + } + + @Test + public void shouldCreateWindowedPutRequest() { + final var builder = Rs3.WriteWALSegmentRequest.newBuilder(); + final var writer = new WalEntryPutWriter(builder); + + final var key = "foo"; + final var value = "bar"; + final var timestamp = 15L; + final var windowStartTimeMs = 10L; + writer.visit(new WindowedPut( + key.getBytes(StandardCharsets.UTF_8), + value.getBytes(StandardCharsets.UTF_8), + timestamp, + windowStartTimeMs + )); + + final var request = builder.build(); + assertThat(request.hasPut(), is(true)); + + final var kv = request.getPut().getKv(); + assertThat(kv.hasWindowKv(), is(true)); + + final var windowKv = kv.getWindowKv(); + assertThat(windowKv.getKey().getKey().toStringUtf8(), is(key)); + assertThat(windowKv.getKey().getWindowTimestamp(), is(windowStartTimeMs)); + assertThat(windowKv.getValue().getValue().toStringUtf8(), is(value)); + assertThat(windowKv.getValue().getEventTimestamp(), is(timestamp)); + } + + @Test + public void shouldCreateDeleteRequest() { + final var builder = Rs3.WriteWALSegmentRequest.newBuilder(); + final var writer = new WalEntryPutWriter(builder); + + final var key = "foo"; + + writer.visit(new Delete(key.getBytes(StandardCharsets.UTF_8))); + + final var request = builder.build(); + assertThat(request.hasDelete(), is(true)); + + final var builtKey = request.getDelete().getKey(); + assertThat(builtKey.hasBasicKey(), is(true)); + + final var windowKey = builtKey.getBasicKey(); + assertThat(windowKey.getKey().toStringUtf8(), is(key)); + } + + @Test + public void shouldCreateWindowedDeleteRequest() { + final var builder = Rs3.WriteWALSegmentRequest.newBuilder(); + final var writer = new WalEntryPutWriter(builder); + + final var key = "foo"; + final var windowStartTimeMs = 10L; + + writer.visit(new WindowedDelete( + key.getBytes(StandardCharsets.UTF_8), + windowStartTimeMs + )); + + final var request = builder.build(); + assertThat(request.hasDelete(), is(true)); + + final var builtKey = request.getDelete().getKey(); + assertThat(builtKey.hasWindowKey(), is(true)); + + final var windowKey = builtKey.getWindowKey(); + assertThat(windowKey.getKey().toStringUtf8(), is(key)); + assertThat(windowKey.getWindowTimestamp(), is(windowStartTimeMs)); + } + +} \ No newline at end of file