Skip to content

Commit 339c583

Browse files
authored
add wiring for creating checkpoints of responsive kv stores (#458)
* add wiring for creating checkpoints of responsive kv stores - integrate with rs3 protocol for checkpoint creation - rs3 store takes checkpoints - can take a checkpoint from registration * review feedback * consolidate proto utils fns
1 parent 72351ac commit 339c583

File tree

20 files changed

+613
-54
lines changed

20 files changed

+613
-54
lines changed

kafka-client/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ dependencies {
137137
implementation("dev.responsive:controller-api:0.16.0")
138138
implementation(libs.bundles.scylla)
139139
implementation(libs.bundles.commons)
140+
implementation(libs.jackson)
140141
implementation(libs.mongodb.driver.sync)
141142
implementation(libs.bundles.otel)
142143
implementation(libs.bundles.grpc)
Submodule rs3 updated from fb2d8f8 to 125749a

kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ private void onCommit(
4141
final TopicPartition p = e.getKey().getPartition();
4242
for (final ResponsiveStoreRegistration storeRegistration
4343
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
44-
storeRegistration.onCommit().accept(e.getValue());
44+
storeRegistration.callbacks().notifyCommit(e.getValue());
4545
}
4646
}
4747
for (final var e : writtenOffsets.entrySet()) {
4848
final TopicPartition p = e.getKey();
4949
for (final ResponsiveStoreRegistration storeRegistration
5050
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
51-
storeRegistration.onCommit().accept(e.getValue());
51+
storeRegistration.callbacks().notifyCommit(e.getValue());
5252
}
5353
}
5454
}

kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,8 @@ <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefix(
9494
* @return the approximate number of entries for this kafka partition
9595
*/
9696
long approximateNumEntries(int kafkaPartition);
97+
98+
default byte[] checkpoint() {
99+
throw new UnsupportedOperationException("checkpoints not supported for this store type");
100+
}
97101
}

kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import dev.responsive.kafka.internal.db.rs3.client.LssId;
1919
import dev.responsive.kafka.internal.db.rs3.client.LssMetadata;
2020
import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client;
21+
import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint;
2122
import dev.responsive.kafka.internal.db.rs3.client.Put;
2223
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
2324
import dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil;
@@ -30,6 +31,7 @@
3031
import java.util.ArrayList;
3132
import java.util.List;
3233
import java.util.Objects;
34+
import java.util.Optional;
3335
import java.util.UUID;
3436
import org.apache.kafka.common.serialization.Serializer;
3537
import org.apache.kafka.common.utils.Bytes;
@@ -97,6 +99,7 @@ public KVFlushManager init(final int kafkaPartition) {
9799

98100
@Override
99101
public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) {
102+
checkInitialized();
100103
final int pssId = pssPartitioner.pss(key.get(), this.lssId);
101104
return rs3Client.get(
102105
storeId,
@@ -114,6 +117,7 @@ public KeyValueIterator<Bytes, byte[]> range(
114117
final Bytes to,
115118
final long streamTimeMs
116119
) {
120+
checkInitialized();
117121
final var range = new Range<>(RangeBound.inclusive(from), RangeBound.exclusive(to));
118122
final List<KeyValueIterator<Bytes, byte[]>> pssIters = new ArrayList<>();
119123

@@ -181,4 +185,31 @@ public WalEntry delete(final int kafkaPartition, final Bytes key) {
181185
public long lastWrittenOffset(final int kafkaPartition) {
182186
return fetchOffset;
183187
}
188+
189+
@Override
190+
public byte[] checkpoint() {
191+
checkInitialized();
192+
final List<TableCheckpoint.TablePssCheckpoint> checkpoints = new ArrayList<>();
193+
for (final int pss : pssPartitioner.pssForLss(this.lssId)) {
194+
final Optional<Long> writtenOffset = flushManager.writtenOffset(pss);
195+
final PssCheckpoint rs3Checkpoint = rs3Client.createCheckpoint(
196+
storeId,
197+
lssId,
198+
pss,
199+
writtenOffset
200+
);
201+
checkpoints.add(new TableCheckpoint.TablePssCheckpoint(
202+
writtenOffset,
203+
rs3Checkpoint
204+
));
205+
}
206+
final TableCheckpoint checkpoint = new TableCheckpoint(checkpoints);
207+
return TableCheckpoint.serialize(checkpoint);
208+
}
209+
210+
private void checkInitialized() {
211+
if (this.lssId == null) {
212+
throw new IllegalStateException("table not initialized");
213+
}
214+
}
184215
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package dev.responsive.kafka.internal.db.rs3;
2+
3+
import com.fasterxml.jackson.annotation.JsonCreator;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
7+
import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint;
8+
import java.io.IOException;
9+
import java.nio.charset.Charset;
10+
import java.util.List;
11+
import java.util.Objects;
12+
import java.util.Optional;
13+
14+
public class TableCheckpoint {
15+
private static final ObjectMapper MAPPER = new ObjectMapper();
16+
17+
static {
18+
MAPPER.registerModule(new Jdk8Module());
19+
}
20+
21+
final List<TablePssCheckpoint> pssCheckpoints;
22+
23+
@JsonCreator
24+
public TableCheckpoint(
25+
@JsonProperty("pssCheckpoints") final List<TablePssCheckpoint> pssCheckpoints
26+
) {
27+
this.pssCheckpoints = List.copyOf(Objects.requireNonNull(pssCheckpoints));
28+
}
29+
30+
@JsonProperty("pssCheckpoints")
31+
public List<TablePssCheckpoint> pssCheckpoints() {
32+
return pssCheckpoints;
33+
}
34+
35+
@Override
36+
public boolean equals(final Object o) {
37+
if (this == o) {
38+
return true;
39+
}
40+
if (!(o instanceof TableCheckpoint)) {
41+
return false;
42+
}
43+
final TableCheckpoint that = (TableCheckpoint) o;
44+
return Objects.equals(pssCheckpoints, that.pssCheckpoints);
45+
}
46+
47+
@Override
48+
public int hashCode() {
49+
return Objects.hashCode(pssCheckpoints);
50+
}
51+
52+
public static class TablePssCheckpoint {
53+
final Optional<Long> writtenOffset;
54+
final PssCheckpoint checkpoint;
55+
56+
@JsonCreator
57+
public TablePssCheckpoint(
58+
@JsonProperty("writtenOffset") final Optional<Long> writtenOffset,
59+
@JsonProperty("checkpoint") final PssCheckpoint checkpoint
60+
) {
61+
this.writtenOffset = Objects.requireNonNull(writtenOffset);
62+
this.checkpoint = Objects.requireNonNull(checkpoint);
63+
}
64+
65+
@JsonProperty("writtenOffset")
66+
public Optional<Long> writtenOffset() {
67+
return writtenOffset;
68+
}
69+
70+
@JsonProperty("checkpoint")
71+
public PssCheckpoint checkpoint() {
72+
return checkpoint;
73+
}
74+
75+
@Override
76+
public boolean equals(final Object o) {
77+
if (this == o) {
78+
return true;
79+
}
80+
if (!(o instanceof TablePssCheckpoint)) {
81+
return false;
82+
}
83+
final TablePssCheckpoint that = (TablePssCheckpoint) o;
84+
return Objects.equals(writtenOffset, that.writtenOffset)
85+
&& Objects.equals(checkpoint, that.checkpoint);
86+
}
87+
88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(writtenOffset, checkpoint);
91+
}
92+
}
93+
94+
@Override
95+
public String toString() {
96+
return new String(TableCheckpoint.serialize(this), Charset.defaultCharset());
97+
}
98+
99+
public static byte[] serialize(TableCheckpoint tableCheckpoint) {
100+
try {
101+
return MAPPER.writeValueAsBytes(tableCheckpoint);
102+
} catch (final IOException e) {
103+
throw new RuntimeException(e);
104+
}
105+
}
106+
107+
public static TableCheckpoint deserialize(byte[] serialized) {
108+
try {
109+
return MAPPER.readValue(serialized, TableCheckpoint.class);
110+
} catch (final IOException e) {
111+
throw new RuntimeException(e);
112+
}
113+
}
114+
}

kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssId.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,9 @@ public boolean equals(final Object o) {
4141
public int hashCode() {
4242
return Objects.hashCode(id);
4343
}
44+
45+
@Override
46+
public String toString() {
47+
return "LssId{" + "id=" + id + '}';
48+
}
4449
}

kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,15 @@ public CreateStoreResult createStore(
168168
return delegate.createStore(storeName, options);
169169
}
170170

171+
@Override
172+
public PssCheckpoint createCheckpoint(
173+
final UUID storeId,
174+
final LssId lssId,
175+
final int pssId,
176+
final Optional<Long> expectedWrittenOffset) {
177+
return delegate.createCheckpoint(storeId, lssId, pssId, expectedWrittenOffset);
178+
}
179+
171180
public void close() {
172181
this.metrics.removeSensor(GET_SENSOR_NAME);
173182
}

0 commit comments

Comments
 (0)