Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ dependencies {
implementation("dev.responsive:controller-api:0.16.0")
implementation(libs.bundles.scylla)
implementation(libs.bundles.commons)
implementation(libs.jackson)
implementation(libs.mongodb.driver.sync)
implementation(libs.bundles.otel)
implementation(libs.bundles.grpc)
Expand Down
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from fb2d8f to 125749
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ private void onCommit(
final TopicPartition p = e.getKey().getPartition();
for (final ResponsiveStoreRegistration storeRegistration
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
storeRegistration.onCommit().accept(e.getValue());
storeRegistration.callbacks().notifyCommit(e.getValue());
}
}
for (final var e : writtenOffsets.entrySet()) {
final TopicPartition p = e.getKey();
for (final ResponsiveStoreRegistration storeRegistration
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
storeRegistration.onCommit().accept(e.getValue());
storeRegistration.callbacks().notifyCommit(e.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefix(
* @return the approximate number of entries for this kafka partition
*/
long approximateNumEntries(int kafkaPartition);

default byte[] checkpoint() {
throw new UnsupportedOperationException("checkpoints not supported for this store type");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.LssMetadata;
import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client;
import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil;
Expand All @@ -30,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -97,6 +99,7 @@ public KVFlushManager init(final int kafkaPartition) {

@Override
public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) {
checkInitialized();
final int pssId = pssPartitioner.pss(key.get(), this.lssId);
return rs3Client.get(
storeId,
Expand All @@ -114,6 +117,7 @@ public KeyValueIterator<Bytes, byte[]> range(
final Bytes to,
final long streamTimeMs
) {
checkInitialized();
final var range = new Range<>(RangeBound.inclusive(from), RangeBound.exclusive(to));
final List<KeyValueIterator<Bytes, byte[]>> pssIters = new ArrayList<>();

Expand Down Expand Up @@ -181,4 +185,31 @@ public WalEntry delete(final int kafkaPartition, final Bytes key) {
public long lastWrittenOffset(final int kafkaPartition) {
return fetchOffset;
}

@Override
public byte[] checkpoint() {
checkInitialized();
final List<TableCheckpoint.TablePssCheckpoint> checkpoints = new ArrayList<>();
for (final int pss : pssPartitioner.pssForLss(this.lssId)) {
final Optional<Long> writtenOffset = flushManager.writtenOffset(pss);
final PssCheckpoint rs3Checkpoint = rs3Client.createCheckpoint(
storeId,
lssId,
pss,
writtenOffset
);
checkpoints.add(new TableCheckpoint.TablePssCheckpoint(
writtenOffset,
rs3Checkpoint
));
}
final TableCheckpoint checkpoint = new TableCheckpoint(checkpoints);
return TableCheckpoint.serialize(checkpoint);
}

private void checkInitialized() {
if (this.lssId == null) {
throw new IllegalStateException("table not initialized");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package dev.responsive.kafka.internal.db.rs3;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class TableCheckpoint {
private static final ObjectMapper MAPPER = new ObjectMapper();

static {
MAPPER.registerModule(new Jdk8Module());
}

final List<TablePssCheckpoint> pssCheckpoints;

@JsonCreator
public TableCheckpoint(
@JsonProperty("pssCheckpoints") final List<TablePssCheckpoint> pssCheckpoints
) {
this.pssCheckpoints = List.copyOf(Objects.requireNonNull(pssCheckpoints));
}

@JsonProperty("pssCheckpoints")
public List<TablePssCheckpoint> pssCheckpoints() {
return pssCheckpoints;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TableCheckpoint)) {
return false;
}
final TableCheckpoint that = (TableCheckpoint) o;
return Objects.equals(pssCheckpoints, that.pssCheckpoints);
}

@Override
public int hashCode() {
return Objects.hashCode(pssCheckpoints);
}

public static class TablePssCheckpoint {
final Optional<Long> writtenOffset;
final PssCheckpoint checkpoint;

@JsonCreator
public TablePssCheckpoint(
@JsonProperty("writtenOffset") final Optional<Long> writtenOffset,
@JsonProperty("checkpoint") final PssCheckpoint checkpoint
) {
this.writtenOffset = Objects.requireNonNull(writtenOffset);
this.checkpoint = Objects.requireNonNull(checkpoint);
}

@JsonProperty("writtenOffset")
public Optional<Long> writtenOffset() {
return writtenOffset;
}

@JsonProperty("checkpoint")
public PssCheckpoint checkpoint() {
return checkpoint;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TablePssCheckpoint)) {
return false;
}
final TablePssCheckpoint that = (TablePssCheckpoint) o;
return Objects.equals(writtenOffset, that.writtenOffset)
&& Objects.equals(checkpoint, that.checkpoint);
}

@Override
public int hashCode() {
return Objects.hash(writtenOffset, checkpoint);
}
}

@Override
public String toString() {
return new String(TableCheckpoint.serialize(this), Charset.defaultCharset());
}

public static byte[] serialize(TableCheckpoint tableCheckpoint) {
try {
return MAPPER.writeValueAsBytes(tableCheckpoint);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static TableCheckpoint deserialize(byte[] serialized) {
try {
return MAPPER.readValue(serialized, TableCheckpoint.class);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hashCode(id);
}

@Override
public String toString() {
return "LssId{" + "id=" + id + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ public CreateStoreResult createStore(
return delegate.createStore(storeName, options);
}

@Override
public PssCheckpoint createCheckpoint(
final UUID storeId,
final LssId lssId,
final int pssId,
final Optional<Long> expectedWrittenOffset) {
return delegate.createCheckpoint(storeId, lssId, pssId, expectedWrittenOffset);
}

public void close() {
this.metrics.removeSensor(GET_SENSOR_NAME);
}
Expand Down
Loading
Loading