Skip to content

Commit 194ba6b

Browse files
authored
Pass through event timestamp for windowed key values (#459)
Follow-up for responsivedev/rs3#112. We added the `event_timestamp` to the window key value so that the server could track stream time for the purpose of TTL expiration. This patch passes through the timestamp in the GRPC API for writing segments.
1 parent 339c583 commit 194ba6b

File tree

3 files changed

+113
-1
lines changed

3 files changed

+113
-1
lines changed
Submodule rs3 updated from 125749a to d724b01

kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/WalEntryPutWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public void visit(final WindowedPut windowedPut) {
5151
.setKey(ByteString.copyFrom(windowedPut.key()))
5252
.setWindowTimestamp(windowedPut.windowTimestamp());
5353
final var valueProto = Rs3.WindowValue.newBuilder()
54+
.setEventTimestamp(windowedPut.timestamp())
5455
.setValue(ByteString.copyFrom(windowedPut.value()));
5556
final var kvProto = Rs3.WindowKeyValue.newBuilder()
5657
.setKey(keyProto)
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package dev.responsive.kafka.internal.db.rs3.client.grpc;
2+
3+
import static org.hamcrest.MatcherAssert.assertThat;
4+
import static org.hamcrest.Matchers.is;
5+
6+
import dev.responsive.kafka.internal.db.rs3.client.Delete;
7+
import dev.responsive.kafka.internal.db.rs3.client.Put;
8+
import dev.responsive.kafka.internal.db.rs3.client.WindowedDelete;
9+
import dev.responsive.kafka.internal.db.rs3.client.WindowedPut;
10+
import dev.responsive.rs3.Rs3;
11+
import java.nio.charset.StandardCharsets;
12+
import org.junit.jupiter.api.Test;
13+
14+
class WalEntryPutWriterTest {
15+
16+
@Test
17+
public void shouldCreatePutRequest() {
18+
final var builder = Rs3.WriteWALSegmentRequest.newBuilder();
19+
final var writer = new WalEntryPutWriter(builder);
20+
21+
final var key = "foo";
22+
final var value = "bar";
23+
writer.visit(new Put(
24+
key.getBytes(StandardCharsets.UTF_8),
25+
value.getBytes(StandardCharsets.UTF_8)
26+
));
27+
28+
final var request = builder.build();
29+
assertThat(request.hasPut(), is(true));
30+
31+
final var kv = request.getPut().getKv();
32+
assertThat(kv.hasBasicKv(), is(true));
33+
34+
final var windowKv = kv.getBasicKv();
35+
assertThat(windowKv.getKey().getKey().toStringUtf8(), is(key));
36+
assertThat(windowKv.getValue().getValue().toStringUtf8(), is(value));
37+
}
38+
39+
@Test
40+
public void shouldCreateWindowedPutRequest() {
41+
final var builder = Rs3.WriteWALSegmentRequest.newBuilder();
42+
final var writer = new WalEntryPutWriter(builder);
43+
44+
final var key = "foo";
45+
final var value = "bar";
46+
final var timestamp = 15L;
47+
final var windowStartTimeMs = 10L;
48+
writer.visit(new WindowedPut(
49+
key.getBytes(StandardCharsets.UTF_8),
50+
value.getBytes(StandardCharsets.UTF_8),
51+
timestamp,
52+
windowStartTimeMs
53+
));
54+
55+
final var request = builder.build();
56+
assertThat(request.hasPut(), is(true));
57+
58+
final var kv = request.getPut().getKv();
59+
assertThat(kv.hasWindowKv(), is(true));
60+
61+
final var windowKv = kv.getWindowKv();
62+
assertThat(windowKv.getKey().getKey().toStringUtf8(), is(key));
63+
assertThat(windowKv.getKey().getWindowTimestamp(), is(windowStartTimeMs));
64+
assertThat(windowKv.getValue().getValue().toStringUtf8(), is(value));
65+
assertThat(windowKv.getValue().getEventTimestamp(), is(timestamp));
66+
}
67+
68+
@Test
69+
public void shouldCreateDeleteRequest() {
70+
final var builder = Rs3.WriteWALSegmentRequest.newBuilder();
71+
final var writer = new WalEntryPutWriter(builder);
72+
73+
final var key = "foo";
74+
75+
writer.visit(new Delete(key.getBytes(StandardCharsets.UTF_8)));
76+
77+
final var request = builder.build();
78+
assertThat(request.hasDelete(), is(true));
79+
80+
final var builtKey = request.getDelete().getKey();
81+
assertThat(builtKey.hasBasicKey(), is(true));
82+
83+
final var windowKey = builtKey.getBasicKey();
84+
assertThat(windowKey.getKey().toStringUtf8(), is(key));
85+
}
86+
87+
@Test
88+
public void shouldCreateWindowedDeleteRequest() {
89+
final var builder = Rs3.WriteWALSegmentRequest.newBuilder();
90+
final var writer = new WalEntryPutWriter(builder);
91+
92+
final var key = "foo";
93+
final var windowStartTimeMs = 10L;
94+
95+
writer.visit(new WindowedDelete(
96+
key.getBytes(StandardCharsets.UTF_8),
97+
windowStartTimeMs
98+
));
99+
100+
final var request = builder.build();
101+
assertThat(request.hasDelete(), is(true));
102+
103+
final var builtKey = request.getDelete().getKey();
104+
assertThat(builtKey.hasWindowKey(), is(true));
105+
106+
final var windowKey = builtKey.getWindowKey();
107+
assertThat(windowKey.getKey().toStringUtf8(), is(key));
108+
assertThat(windowKey.getWindowTimestamp(), is(windowStartTimeMs));
109+
}
110+
111+
}

0 commit comments

Comments
 (0)