Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ dependencies {
implementation("dev.responsive:controller-api:0.16.0")
implementation(libs.bundles.scylla)
implementation(libs.bundles.commons)
implementation(libs.jackson)
implementation(libs.mongodb.driver.sync)
implementation(libs.bundles.otel)
implementation(libs.bundles.grpc)
Expand Down
2 changes: 1 addition & 1 deletion kafka-client/src/main/external-protos/rs3
Submodule rs3 updated from fb2d8f to 7454ff
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ private void onCommit(
final TopicPartition p = e.getKey().getPartition();
for (final ResponsiveStoreRegistration storeRegistration
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
storeRegistration.onCommit().accept(e.getValue());
storeRegistration.callbacks().notifyCommit(e.getValue());
}
}
for (final var e : writtenOffsets.entrySet()) {
final TopicPartition p = e.getKey();
for (final ResponsiveStoreRegistration storeRegistration
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
storeRegistration.onCommit().accept(e.getValue());
storeRegistration.callbacks().notifyCommit(e.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefix(
* @return the approximate number of entries for this kafka partition
*/
long approximateNumEntries(int kafkaPartition);

default byte[] checkpoint() {
throw new UnsupportedOperationException("checkpoints not supported for this store type");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dev.responsive.kafka.internal.db.rs3.client.LssId;
import dev.responsive.kafka.internal.db.rs3.client.LssMetadata;
import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client;
import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint;
import dev.responsive.kafka.internal.db.rs3.client.Put;
import dev.responsive.kafka.internal.db.rs3.client.RS3Client;
import dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil;
Expand All @@ -30,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -97,6 +99,7 @@ public KVFlushManager init(final int kafkaPartition) {

@Override
public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) {
checkInitialized();
final int pssId = pssPartitioner.pss(key.get(), this.lssId);
return rs3Client.get(
storeId,
Expand All @@ -114,6 +117,7 @@ public KeyValueIterator<Bytes, byte[]> range(
final Bytes to,
final long streamTimeMs
) {
checkInitialized();
final var range = new Range<>(RangeBound.inclusive(from), RangeBound.exclusive(to));
final List<KeyValueIterator<Bytes, byte[]>> pssIters = new ArrayList<>();

Expand Down Expand Up @@ -181,4 +185,31 @@ public WalEntry delete(final int kafkaPartition, final Bytes key) {
public long lastWrittenOffset(final int kafkaPartition) {
return fetchOffset;
}

@Override
public byte[] checkpoint() {
checkInitialized();
final List<TableCheckpoint.TablePssCheckpoint> checkpoints = new ArrayList<>();
for (final int pss : pssPartitioner.pssForLss(this.lssId)) {
final Optional<Long> writtenOffset = flushManager.writtenOffset(pss);
final PssCheckpoint rs3Checkpoint = rs3Client.createCheckpoint(
storeId,
lssId,
pss,
writtenOffset
);
checkpoints.add(new TableCheckpoint.TablePssCheckpoint(
writtenOffset,
rs3Checkpoint
));
}
final TableCheckpoint checkpoint = new TableCheckpoint(checkpoints);
return TableCheckpoint.serialize(checkpoint);
}

private void checkInitialized() {
if (this.lssId == null) {
throw new IllegalStateException("table not initialized");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package dev.responsive.kafka.internal.db.rs3;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class TableCheckpoint {
private static final ObjectMapper MAPPER = new ObjectMapper();

static {
MAPPER.registerModule(new Jdk8Module());
}

final List<TablePssCheckpoint> pssCheckpoints;

@JsonCreator
public TableCheckpoint(
@JsonProperty("pssCheckpoints") final List<TablePssCheckpoint> pssCheckpoints
) {
this.pssCheckpoints = List.copyOf(Objects.requireNonNull(pssCheckpoints));
}

@JsonProperty("pssCheckpoints")
public List<TablePssCheckpoint> pssCheckpoints() {
return pssCheckpoints;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TableCheckpoint)) {
return false;
}
final TableCheckpoint that = (TableCheckpoint) o;
return Objects.equals(pssCheckpoints, that.pssCheckpoints);
}

@Override
public int hashCode() {
return Objects.hashCode(pssCheckpoints);
}

public static class TablePssCheckpoint {
final Optional<Long> writtenOffset;
final PssCheckpoint checkpoint;

@JsonCreator
public TablePssCheckpoint(
@JsonProperty("writtenOffset") final Optional<Long> writtenOffset,
@JsonProperty("checkpoint") final PssCheckpoint checkpoint
) {
this.writtenOffset = Objects.requireNonNull(writtenOffset);
this.checkpoint = Objects.requireNonNull(checkpoint);
}

@JsonProperty("writtenOffset")
public Optional<Long> writtenOffset() {
return writtenOffset;
}

@JsonProperty("checkpoint")
public PssCheckpoint checkpoint() {
return checkpoint;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TablePssCheckpoint)) {
return false;
}
final TablePssCheckpoint that = (TablePssCheckpoint) o;
return Objects.equals(writtenOffset, that.writtenOffset)
&& Objects.equals(checkpoint, that.checkpoint);
}

@Override
public int hashCode() {
return Objects.hash(writtenOffset, checkpoint);
}
}

public static byte[] serialize(TableCheckpoint tableCheckpoint) {
try {
return MAPPER.writeValueAsBytes(tableCheckpoint);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

public static TableCheckpoint deserialize(byte[] serialized) {
try {
return MAPPER.readValue(serialized, TableCheckpoint.class);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ public CreateStoreResult createStore(
return delegate.createStore(storeName, options);
}

@Override
public PssCheckpoint createCheckpoint(
final UUID storeId,
final LssId lssId,
final int pssId,
final Optional<Long> expectedWrittenOffset) {
return delegate.createCheckpoint(storeId, lssId, pssId, expectedWrittenOffset);
}

public void close() {
this.metrics.removeSensor(GET_SENSOR_NAME);
}
Expand Down
Loading
Loading