From 4cff18c6924e3192bd5e58f1a5edd295ade80898 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Thu, 17 Apr 2025 00:00:13 -0700 Subject: [PATCH 1/3] add wiring for creating checkpoints of responsive kv stores - integrate with rs3 protocol for checkpoint creation - rs3 store takes checkpoints - can take a checkpoint from registration --- kafka-client/build.gradle.kts | 1 + kafka-client/src/main/external-protos/rs3 | 2 +- .../internal/clients/StoreCommitListener.java | 4 +- .../kafka/internal/db/RemoteKVTable.java | 4 + .../kafka/internal/db/rs3/RS3KVTable.java | 31 ++++ .../internal/db/rs3/TableCheckpoint.java | 108 +++++++++++ .../db/rs3/client/MeteredRS3Client.java | 9 + .../internal/db/rs3/client/PssCheckpoint.java | 175 ++++++++++++++++++ .../internal/db/rs3/client/RS3Client.java | 7 + .../db/rs3/client/grpc/GrpcRS3Client.java | 33 +++- .../db/rs3/client/grpc/GrpcRs3Util.java | 31 ++++ .../stores/PartitionedOperations.java | 13 +- .../stores/ResponsiveStoreRegistration.java | 19 +- .../clients/StoreCommitListenerTest.java | 17 +- .../kafka/internal/db/rs3/RS3KVTableTest.java | 56 ++++++ .../internal/db/rs3/TableCheckpointTest.java | 56 ++++++ .../internal/clients/TTDCassandraClient.java | 2 +- 17 files changed, 540 insertions(+), 28 deletions(-) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/PssCheckpoint.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/TableCheckpointTest.java diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 148cc5af8..08efad9c3 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -137,6 +137,7 @@ dependencies { implementation("dev.responsive:controller-api:0.16.0") implementation(libs.bundles.scylla) implementation(libs.bundles.commons) + implementation(libs.jackson) implementation(libs.mongodb.driver.sync) implementation(libs.bundles.otel) implementation(libs.bundles.grpc) diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index fb2d8f80c..7454ff942 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit fb2d8f80c9a976f7999a131a75c74d9b82bdcc4c +Subproject commit 7454ff9425f9297bb7570f3099dddb6d7e79d53e diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java index aef4ca38b..e55b24a31 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java @@ -41,14 +41,14 @@ private void onCommit( final TopicPartition p = e.getKey().getPartition(); for (final ResponsiveStoreRegistration storeRegistration : storeRegistry.getRegisteredStoresForChangelog(p, threadId)) { - storeRegistration.onCommit().accept(e.getValue()); + storeRegistration.callbacks().notifyCommit(e.getValue()); } } for (final var e : writtenOffsets.entrySet()) { final TopicPartition p = e.getKey(); for (final ResponsiveStoreRegistration storeRegistration : storeRegistry.getRegisteredStoresForChangelog(p, threadId)) { - storeRegistration.onCommit().accept(e.getValue()); + storeRegistration.callbacks().notifyCommit(e.getValue()); } } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java index c71fe4cea..cd1aab956 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java @@ -94,4 +94,8 @@ , P> KeyValueIterator prefix( * @return the approximate number of entries for this kafka partition */ long approximateNumEntries(int kafkaPartition); + + default byte[] checkpoint() { + throw new UnsupportedOperationException("checkpoints not supported for this store type"); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java index b71635a7f..fb4d0fda4 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java @@ -18,6 +18,7 @@ import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.LssMetadata; import dev.responsive.kafka.internal.db.rs3.client.MeteredRS3Client; +import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil; @@ -30,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.UUID; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; @@ -97,6 +99,7 @@ public KVFlushManager init(final int kafkaPartition) { @Override public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) { + checkInitialized(); final int pssId = pssPartitioner.pss(key.get(), this.lssId); return rs3Client.get( storeId, @@ -114,6 +117,7 @@ public KeyValueIterator range( final Bytes to, final long streamTimeMs ) { + checkInitialized(); final var range = new Range<>(RangeBound.inclusive(from), RangeBound.exclusive(to)); final List> pssIters = new ArrayList<>(); @@ -181,4 +185,31 @@ public WalEntry delete(final int kafkaPartition, final Bytes key) { public long lastWrittenOffset(final int kafkaPartition) { return fetchOffset; } + + @Override + public byte[] checkpoint() { + checkInitialized(); + final List checkpoints = new ArrayList<>(); + for (final int pss : pssPartitioner.pssForLss(this.lssId)) { + final Optional writtenOffset = flushManager.writtenOffset(pss); + final PssCheckpoint rs3Checkpoint = rs3Client.createCheckpoint( + storeId, + lssId, + pss, + writtenOffset + ); + checkpoints.add(new TableCheckpoint.TablePssCheckpoint( + writtenOffset, + rs3Checkpoint + )); + } + final TableCheckpoint checkpoint = new TableCheckpoint(checkpoints); + return TableCheckpoint.serialize(checkpoint); + } + + private void checkInitialized() { + if (this.lssId == null) { + throw new IllegalStateException("table not initialized"); + } + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java new file mode 100644 index 000000000..e361a2cf8 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java @@ -0,0 +1,108 @@ +package dev.responsive.kafka.internal.db.rs3; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +public class TableCheckpoint { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + static { + MAPPER.registerModule(new Jdk8Module()); + } + + final List pssCheckpoints; + + @JsonCreator + public TableCheckpoint( + @JsonProperty("pssCheckpoints") final List pssCheckpoints + ) { + this.pssCheckpoints = List.copyOf(Objects.requireNonNull(pssCheckpoints)); + } + + @JsonProperty("pssCheckpoints") + public List pssCheckpoints() { + return pssCheckpoints; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TableCheckpoint)) { + return false; + } + final TableCheckpoint that = (TableCheckpoint) o; + return Objects.equals(pssCheckpoints, that.pssCheckpoints); + } + + @Override + public int hashCode() { + return Objects.hashCode(pssCheckpoints); + } + + public static class TablePssCheckpoint { + final Optional writtenOffset; + final PssCheckpoint checkpoint; + + @JsonCreator + public TablePssCheckpoint( + @JsonProperty("writtenOffset") final Optional writtenOffset, + @JsonProperty("checkpoint") final PssCheckpoint checkpoint + ) { + this.writtenOffset = Objects.requireNonNull(writtenOffset); + this.checkpoint = Objects.requireNonNull(checkpoint); + } + + @JsonProperty("writtenOffset") + public Optional writtenOffset() { + return writtenOffset; + } + + @JsonProperty("checkpoint") + public PssCheckpoint checkpoint() { + return checkpoint; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TablePssCheckpoint)) { + return false; + } + final TablePssCheckpoint that = (TablePssCheckpoint) o; + return Objects.equals(writtenOffset, that.writtenOffset) + && Objects.equals(checkpoint, that.checkpoint); + } + + @Override + public int hashCode() { + return Objects.hash(writtenOffset, checkpoint); + } + } + + public static byte[] serialize(TableCheckpoint tableCheckpoint) { + try { + return MAPPER.writeValueAsBytes(tableCheckpoint); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + public static TableCheckpoint deserialize(byte[] serialized) { + try { + return MAPPER.readValue(serialized, TableCheckpoint.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java index 5452eb4a3..acc31f315 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/MeteredRS3Client.java @@ -168,6 +168,15 @@ public CreateStoreResult createStore( return delegate.createStore(storeName, options); } + @Override + public PssCheckpoint createCheckpoint( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset) { + return delegate.createCheckpoint(storeId, lssId, pssId, expectedWrittenOffset); + } + public void close() { this.metrics.removeSensor(GET_SENSOR_NAME); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/PssCheckpoint.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/PssCheckpoint.java new file mode 100644 index 000000000..4fc443248 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/PssCheckpoint.java @@ -0,0 +1,175 @@ +package dev.responsive.kafka.internal.db.rs3.client; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.util.Objects; +import java.util.UUID; + +public class PssCheckpoint { + private final UUID storeId; + private final int pssId; + private final StorageCheckpoint checkpoint; + + @JsonCreator + public PssCheckpoint( + @JsonProperty("storeId") final UUID storeId, + @JsonProperty("pssId") final int pssId, + @JsonProperty("checkpoint") StorageCheckpoint checkpoint, + @JsonProperty("slateDbStorageCheckpoint") + final SlateDbStorageCheckpoint slateDbStorageCheckpoint + ) { + this.storeId = Objects.requireNonNull(storeId); + this.pssId = pssId; + if (slateDbStorageCheckpoint != null) { + this.checkpoint = slateDbStorageCheckpoint; + } else { + this.checkpoint = Objects.requireNonNull(checkpoint); + } + } + + @JsonProperty("storeId") + public UUID getStoreId() { + return storeId; + } + + @JsonProperty("pssId") + public int pssId() { + return pssId; + } + + @JsonProperty("checkpoint") + public StorageCheckpoint checkpoint() { + return checkpoint; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PssCheckpoint)) { + return false; + } + final PssCheckpoint that = (PssCheckpoint) o; + return Objects.equals(storeId, that.storeId) + && pssId == that.pssId + && Objects.equals(checkpoint, that.checkpoint); + } + + @Override + public int hashCode() { + return Objects.hash(storeId, pssId, checkpoint); + } + + public static final String SLATEDB_STORAGE_TYPE = "SlateDbStorage"; + + @JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.EXISTING_PROPERTY, + property = "type", + visible = true, + defaultImpl = SlateDbStorageCheckpoint.class + ) + @JsonSubTypes({ + @JsonSubTypes.Type(value = SlateDbStorageCheckpoint.class, name = SLATEDB_STORAGE_TYPE) + }) + public abstract static class StorageCheckpoint { + private final String type; + + StorageCheckpoint(final String type) { + this.type = type; + } + + @JsonProperty("type") + public String type() { + return type; + } + + public abstract void visit(CheckpointVisitor visitor); + + public abstract T map(CheckpointMapper mapper); + } + + public interface CheckpointVisitor { + void visit(SlateDbStorageCheckpoint checkpoint); + } + + public interface CheckpointMapper { + T map(SlateDbStorageCheckpoint checkpoint); + } + + public static class SlateDbStorageCheckpoint extends StorageCheckpoint { + private final String path; + private final UUID checkpointId; + + @JsonCreator + public SlateDbStorageCheckpoint( + @JsonProperty("type") final String type, + @JsonProperty("path") final String path, + @JsonProperty("checkpointId") final UUID checkpointId + ) { + super(type); + this.path = path; + this.checkpointId = checkpointId; + } + + public SlateDbStorageCheckpoint( + @JsonProperty("path") final String path, + @JsonProperty("checkpointId") final UUID checkpointId + ) { + super(SLATEDB_STORAGE_TYPE); + this.path = path; + this.checkpointId = checkpointId; + } + + @JsonProperty("path") + public String path() { + return path; + } + + @JsonProperty("checkpointId") + public UUID checkpointId() { + return checkpointId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SlateDbStorageCheckpoint)) { + return false; + } + final SlateDbStorageCheckpoint that = (SlateDbStorageCheckpoint) o; + return Objects.equals(path, that.path) + && Objects.deepEquals(checkpointId, that.checkpointId); + } + + @Override + public void visit(final CheckpointVisitor visitor) { + visitor.visit(this); + } + + @Override + public T map(final CheckpointMapper mapper) { + return mapper.map(this); + } + + @Override + public int hashCode() { + return Objects.hash(path, checkpointId); + } + + @Override + public String toString() { + return "SlateDbStorageCheckpoint{" + + "path='" + path + '\'' + + ", checkpointId=" + checkpointId + + '}'; + } + } + + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java index 4ca0eedca..b58a49dc5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/RS3Client.java @@ -80,5 +80,12 @@ CreateStoreResult createStore( CreateStoreOptions options ); + PssCheckpoint createCheckpoint( + UUID storeId, + LssId lssId, + int pssId, + Optional expectedWrittenOffset + ); + void close(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index c0eb12e19..2a0fd44ff 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -13,6 +13,7 @@ package dev.responsive.kafka.internal.db.rs3.client.grpc; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.checkField; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeStatusFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeTypeFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto; @@ -28,6 +29,7 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreResult; import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets; import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; @@ -84,8 +86,8 @@ public CurrentOffsets getCurrentOffsets(final UUID storeId, final LssId lssId, f () -> stub.getOffsets(request), () -> "GetOffsets(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")" ); - checkField(result::hasWrittenOffset, "writtenOffset"); - checkField(result::hasFlushedOffset, "flushedOffset"); + GrpcRs3Util.checkField(result::hasWrittenOffset, "writtenOffset"); + GrpcRs3Util.checkField(result::hasFlushedOffset, "flushedOffset"); return new CurrentOffsets( walOffsetFromProto(result.getWrittenOffset()), walOffsetFromProto(result.getFlushedOffset()) @@ -157,7 +159,7 @@ public StreamSenderMessageReceiver> writeWalSegmentAsyn streamSender, resultObserver.message() .thenApply(r -> { - checkField(r::hasFlushedOffset, "flushedOffset"); + GrpcRs3Util.checkField(r::hasFlushedOffset, "flushedOffset"); return walOffsetFromProto(r.getFlushedOffset()); }) ); @@ -293,7 +295,7 @@ public Optional get( expectedWrittenOffset ); return kvOpt.map(kv -> { - checkField(kv::hasBasicKv, "value"); + GrpcRs3Util.checkField(kv::hasBasicKv, "value"); final var value = kv.getBasicKv().getValue().getValue(); return value.toByteArray(); }); @@ -319,7 +321,7 @@ public Optional windowedGet( expectedWrittenOffset ); return kvOpt.map(kv -> { - checkField(kv::hasWindowKv, "value"); + GrpcRs3Util.checkField(kv::hasWindowKv, "value"); final var value = kv.getWindowKv().getValue().getValue(); return value.toByteArray(); }); @@ -391,10 +393,25 @@ public CreateStoreResult createStore( return new CreateStoreResult(uuidFromProto(result.getStoreId()), result.getPssIdsList()); } - private void checkField(final Supplier check, final String field) { - if (!check.get()) { - throw new RuntimeException("rs3 resp proto missing field " + field); + @Override + public PssCheckpoint createCheckpoint( + final UUID storeId, + final LssId lssId, + final int pssId, + final Optional expectedWrittenOffset) { + final var request = Rs3.CreateCheckpointRequest.newBuilder() + .setLssId(lssIdProto(lssId)) + .setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset)) + .build(); + final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); + final Rs3.CreateCheckpointResult result; + try { + result = stub.createCheckpoint(request); + } catch (final RuntimeException e) { + throw GrpcRs3Util.wrapThrowable(e); } + checkField(result::hasCheckpoint, "checkpoint"); + return GrpcRs3Util.pssCheckpointFromProto(storeId, pssId, result.getCheckpoint()); } private class RangeProxy> implements GrpcRangeRequestProxy { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java index b11e306a5..8afae26ab 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java @@ -15,6 +15,7 @@ import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.Delete; +import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TransientException; @@ -25,6 +26,8 @@ import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import java.util.Optional; +import java.util.UUID; +import java.util.function.Supplier; public class GrpcRs3Util { public static final Rs3.WALOffset UNWRITTEN_WAL_OFFSET = Rs3.WALOffset.newBuilder() @@ -201,4 +204,32 @@ public static Optional walOffsetFromProto(final Rs3.WALOffset walOffset) { return Optional.empty(); } } + + public static UUID uuidFromProto(final Rs3.UUID uuidProto) { + checkField(uuidProto::hasHigh, "high"); + checkField(uuidProto::hasLow, "low"); + return new UUID(uuidProto.getHigh(), uuidProto.getLow()); + } + + public static PssCheckpoint pssCheckpointFromProto( + final UUID storeId, + final int pssId, + final Rs3.StorageCheckpoint checkpointProto + ) { + checkField(checkpointProto::hasSlatedbStorageCheckpoint, "slatedbStorageCheckpoint"); + final var slateDbCheckpointProto = checkpointProto.getSlatedbStorageCheckpoint(); + checkField(slateDbCheckpointProto::hasPath, "path"); + checkField(slateDbCheckpointProto::hasCheckpointId, "checkpointId"); + final var slateDbCheckpoint = new PssCheckpoint.SlateDbStorageCheckpoint( + slateDbCheckpointProto.getPath(), + uuidFromProto(slateDbCheckpointProto.getCheckpointId()) + ); + return new PssCheckpoint(storeId, pssId, slateDbCheckpoint, null); + } + + public static void checkField(final Supplier check, final String field) { + if (!check.get()) { + throw new RuntimeException("rs3 resp proto missing field " + field); + } + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index aa026a871..9496d33da 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -163,13 +163,24 @@ public static PartitionedOperations create( ); final long restoreStartOffset = table.lastWrittenOffset(changelog.partition()); + final CommitBuffer initializedBuffer = buffer; registration = new ResponsiveStoreRegistration( name.kafkaName(), changelog, restoreStartOffset == NO_COMMITTED_OFFSET ? OptionalLong.empty() : OptionalLong.of(restoreStartOffset), - buffer::flush, + new ResponsiveStoreRegistration.StoreCallbacks() { + @Override + public void notifyCommit(long committedOffset) { + initializedBuffer.flush(committedOffset); + } + + @Override + public byte[] checkpoint() { + return table.checkpoint(); + } + }, streamThreadId() ); storeRegistry.registerStore(registration); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java index 0c9f6fa7d..7d4b50a55 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java @@ -15,7 +15,6 @@ import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import java.util.Objects; import java.util.OptionalLong; -import java.util.function.Consumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -26,7 +25,7 @@ public final class ResponsiveStoreRegistration { private final Logger log; private final String storeName; private final TopicPartition changelogTopicPartition; - private final Consumer onCommit; + private final StoreCallbacks callbacks; private final String threadId; private final InjectedStoreArgs injectedStoreArgs = new InjectedStoreArgs(); @@ -37,13 +36,13 @@ public ResponsiveStoreRegistration( final String storeName, final TopicPartition changelogTopicPartition, final OptionalLong startOffset, - final Consumer onCommit, + final StoreCallbacks callbacks, final String threadId ) { this.storeName = Objects.requireNonNull(storeName); this.changelogTopicPartition = Objects.requireNonNull(changelogTopicPartition); this.startOffset = startOffset; - this.onCommit = Objects.requireNonNull(onCommit); + this.callbacks = Objects.requireNonNull(callbacks); this.threadId = Objects.requireNonNull(threadId); this.log = new LogContext( String.format("changelog [%s]", changelogTopicPartition) @@ -63,8 +62,8 @@ public String storeName() { return storeName; } - public Consumer onCommit() { - return onCommit; + public StoreCallbacks callbacks() { + return callbacks; } public InjectedStoreArgs injectedStoreArgs() { @@ -74,4 +73,12 @@ public InjectedStoreArgs injectedStoreArgs() { public String threadId() { return threadId; } + + public interface StoreCallbacks { + void notifyCommit(long committedOffset); + + default byte[] checkpoint() { + throw new UnsupportedOperationException("checkpoints not supported for store type"); + } + } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java index 6ec8789d8..10cef9fcf 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.OptionalLong; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.RecordMetadata; @@ -37,9 +36,9 @@ class StoreCommitListenerTest { private static final TopicPartition PARTITION2 = new TopicPartition("booboo", 2); @Mock - private Consumer store1Flush; + private ResponsiveStoreRegistration.StoreCallbacks store1Callbacks; @Mock - private Consumer store2Flush; + private ResponsiveStoreRegistration.StoreCallbacks store2Callbacks; private final ResponsiveStoreRegistry registry = new ResponsiveStoreRegistry(); private final OffsetRecorder offsetRecorder = new OffsetRecorder(true, "thread1"); private StoreCommitListener commitListener; @@ -50,14 +49,14 @@ public void setup() { "store1", PARTITION1, OptionalLong.of(0), - store1Flush, + store1Callbacks, "thread1" )); registry.registerStore(new ResponsiveStoreRegistration( "store2", PARTITION2, OptionalLong.of(0), - store2Flush, + store2Callbacks, "thread1" )); commitListener = new StoreCommitListener(registry, offsetRecorder); @@ -69,8 +68,8 @@ public void shouldNotifyStoresOnCommittedChangelogOffsets() { sendCommittedOffsets(Map.of(PARTITION1, 123L)); // then: - verify(store1Flush).accept(123L); - verifyNoInteractions(store2Flush); + verify(store1Callbacks).notifyCommit(123L); + verifyNoInteractions(store2Callbacks); } @Test @@ -79,8 +78,8 @@ public void shouldNotifyStoresOnCommittedChangelogWrites() { sendWrittenOffsets(Map.of(PARTITION2, 456L)); // then: - verify(store2Flush).accept(456L); - verifyNoInteractions(store1Flush); + verify(store2Callbacks).notifyCommit(456L); + verifyNoInteractions(store1Callbacks); } private void sendCommittedOffsets(final Map offsets) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableTest.java index 0d087de5a..0ad7a539e 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableTest.java @@ -17,12 +17,15 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import dev.responsive.kafka.internal.db.rs3.client.CurrentOffsets; import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; import dev.responsive.kafka.internal.db.rs3.client.RS3Client; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; @@ -35,6 +38,7 @@ import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -395,4 +399,56 @@ public void shouldPropagateUnexpectedExceptionFromStream() { assertThat(statusRuntimeException.getStatus(), is(Status.UNKNOWN)); } + @Test + public void shouldCreateCheckpoint() { + // given: + final var storeId = UUID.randomUUID(); + final int partition = 1; + final var pssCheckpoint = new PssCheckpoint( + storeId, + partition, + new PssCheckpoint.SlateDbStorageCheckpoint( + "/foo/bar", + UUID.randomUUID() + ), + null + ); + when(metrics.addSensor(any())) + .thenReturn(Mockito.mock(Sensor.class)); + when(client.createCheckpoint(any(), any(), anyInt(), any())).thenReturn(pssCheckpoint); + var lssId = new LssId(partition); + when(partitioner.pssForLss(lssId)).thenReturn(List.of(partition)); + when(client.getCurrentOffsets(storeId, lssId, partition)) + .thenReturn(new CurrentOffsets(Optional.empty(), Optional.empty())); + final var table = new RS3KVTable( + "store", + storeId, + client, + partitioner, + metrics, + metricsScopeBuilder + ); + table.init(partition); + + // when: + final byte[] serializedCheckpoint = table.checkpoint(); + + // then: + verify(client).createCheckpoint( + storeId, new LssId(partition), partition, Optional.empty()); + final TableCheckpoint checkpoint = TableCheckpoint.deserialize(serializedCheckpoint); + assertThat( + checkpoint, + is( + new TableCheckpoint( + List.of( + new TableCheckpoint.TablePssCheckpoint( + Optional.empty(), + pssCheckpoint + ) + ) + ) + ) + ); + } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/TableCheckpointTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/TableCheckpointTest.java new file mode 100644 index 000000000..c18d22686 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/TableCheckpointTest.java @@ -0,0 +1,56 @@ +package dev.responsive.kafka.internal.db.rs3; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class TableCheckpointTest { + + @Test + public void shouldSerializeTableCheckpoint() { + // given: + final var storeId = UUID.randomUUID(); + final TableCheckpoint checkpoint = new TableCheckpoint( + List.of( + new TableCheckpoint.TablePssCheckpoint( + Optional.of(123L), + new PssCheckpoint( + storeId, + 10, + new PssCheckpoint.SlateDbStorageCheckpoint( + "/foo/bar/10", + UUID.randomUUID() + ), + null + ) + ), + new TableCheckpoint.TablePssCheckpoint( + Optional.empty(), + new PssCheckpoint( + storeId, + 11, + new PssCheckpoint.SlateDbStorageCheckpoint( + "/foo/bar/11", + UUID.randomUUID() + ), + null + ) + ) + ) + ); + + // when: + final byte[] serialized = TableCheckpoint.serialize(checkpoint); + System.out.println(new String(serialized, Charset.defaultCharset())); + + // then: + final TableCheckpoint deserialized = TableCheckpoint.deserialize(serialized); + assertThat(deserialized, is(checkpoint)); + } +} \ No newline at end of file diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java index aa027fc61..6aedf543c 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java @@ -71,7 +71,7 @@ public void advanceWallClockTime(final Duration advance) { } public void flush() { - storeRegistry.stores().forEach(s -> s.onCommit().accept(0L)); + storeRegistry.stores().forEach(s -> s.callbacks().notifyCommit(0L)); } @Override From 83412c09ccbcb6f3b77978bb97f74588aade102c Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Fri, 9 May 2025 17:06:02 -0700 Subject: [PATCH 2/3] review feedback --- .../internal/db/rs3/TableCheckpoint.java | 6 +++ .../kafka/internal/db/rs3/client/LssId.java | 5 +++ .../db/rs3/client/grpc/GrpcRS3Client.java | 13 ++++--- .../db/rs3/client/grpc/GrpcRS3ClientTest.java | 39 +++++++++++++++++++ 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java index e361a2cf8..3f2a1e686 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/TableCheckpoint.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; import java.io.IOException; +import java.nio.charset.Charset; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -90,6 +91,11 @@ public int hashCode() { } } + @Override + public String toString() { + return new String(TableCheckpoint.serialize(this), Charset.defaultCharset()); + } + public static byte[] serialize(TableCheckpoint tableCheckpoint) { try { return MAPPER.writeValueAsBytes(tableCheckpoint); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssId.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssId.java index 06dde1b7f..5497e7adc 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssId.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/LssId.java @@ -41,4 +41,9 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hashCode(id); } + + @Override + public String toString() { + return "LssId{" + "id=" + id + '}'; + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index 2a0fd44ff..c73b4f8e6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -405,11 +405,14 @@ public PssCheckpoint createCheckpoint( .build(); final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub(); final Rs3.CreateCheckpointResult result; - try { - result = stub.createCheckpoint(request); - } catch (final RuntimeException e) { - throw GrpcRs3Util.wrapThrowable(e); - } + result = withRetry( + () -> stub.createCheckpoint(request), + () -> String.format("CreateCheckpoint(storeId=%s, lssId=%s, pssId=%d", + storeId.toString(), + lssId, + pssId + ) + ); checkField(result::hasCheckpoint, "checkpoint"); return GrpcRs3Util.pssCheckpointFromProto(storeId, pssId, result.getCheckpoint()); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index a021fd07c..94295e405 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -14,6 +14,7 @@ import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.createStoreOptionsProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newEndOfStreamResult; import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; @@ -38,6 +39,7 @@ import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.CreateStoreOptions; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes.StoreType; import dev.responsive.kafka.internal.db.rs3.client.LssId; +import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; import dev.responsive.kafka.internal.db.rs3.client.RS3TimeoutException; @@ -1252,6 +1254,43 @@ public void shouldPropagateUnexpectedExceptionFromObserverOnError() { } } + @Test + public void shouldCreateCheckpoint() { + final var checkpointId = UUID.randomUUID(); + final var checkpointPath = "/some/checkpoint/path"; + final var storageCheckpoint = Rs3.StorageCheckpoint.newBuilder().setSlatedbStorageCheckpoint( + Rs3.SlateDbStorageCheckpoint + .newBuilder() + .setCheckpointId(uuidToProto(checkpointId)) + .setPath(checkpointPath) + .build() + ); + when(stub.createCheckpoint(any())) + .thenReturn(Rs3.CreateCheckpointResult + .newBuilder() + .setCheckpoint(storageCheckpoint) + .build()); + + final var checkpoint = client.createCheckpoint( + STORE_ID, + LSS_ID, + PSS_ID, + Optional.of(10L) + ).checkpoint(); + + assertThat( + checkpoint.map(PssCheckpoint.SlateDbStorageCheckpoint::checkpointId), is(checkpointId)); + assertThat(checkpoint.map(PssCheckpoint.SlateDbStorageCheckpoint::path), is(checkpointPath)); + + verify(stub).createCheckpoint( + Rs3.CreateCheckpointRequest + .newBuilder() + .setLssId(lssIdProto(LSS_ID)) + .setExpectedWrittenOffset(walOffsetProto(10)) + .build() + ); + } + private StreamObserver verifyWalSegmentResultObserver() { verify(asyncStub).writeWALSegmentStream(writeWALSegmentResultObserverCaptor.capture()); return writeWALSegmentResultObserverCaptor.getValue(); From 9889efd7b94a9e515c86e09232d5c854bb60be08 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Fri, 9 May 2025 20:05:58 -0700 Subject: [PATCH 3/3] consolidate proto utils fns --- kafka-client/src/main/external-protos/rs3 | 2 +- .../db/rs3/client/grpc/GrpcRS3Client.java | 6 +++--- .../db/rs3/client/grpc/GrpcRs3Util.java | 14 +++++++++++++ .../kafka/internal/utils/Utils.java | 20 ------------------- .../db/rs3/client/grpc/GrpcRS3ClientTest.java | 4 ++-- 5 files changed, 20 insertions(+), 26 deletions(-) diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 7454ff942..125749a3a 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 7454ff9425f9297bb7570f3099dddb6d7e79d53e +Subproject commit 125749a3ab825c808056755dcdb19eef27b9a0e9 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java index c73b4f8e6..302c6934a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java @@ -14,14 +14,14 @@ import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicKeyProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.checkField; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.lssIdProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeStatusFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.storeTypeFromProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.uuidFromProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.uuidToProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetFromProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.windowKeyProto; -import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; -import static dev.responsive.kafka.internal.utils.Utils.uuidFromProto; -import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; import com.google.common.annotations.VisibleForTesting; import dev.responsive.kafka.api.config.ResponsiveConfig; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java index 8afae26ab..cc3b85117 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRs3Util.java @@ -15,6 +15,7 @@ import com.google.protobuf.ByteString; import dev.responsive.kafka.internal.db.rs3.client.CreateStoreTypes; import dev.responsive.kafka.internal.db.rs3.client.Delete; +import dev.responsive.kafka.internal.db.rs3.client.LssId; import dev.responsive.kafka.internal.db.rs3.client.PssCheckpoint; import dev.responsive.kafka.internal.db.rs3.client.Put; import dev.responsive.kafka.internal.db.rs3.client.RS3Exception; @@ -211,6 +212,13 @@ public static UUID uuidFromProto(final Rs3.UUID uuidProto) { return new UUID(uuidProto.getHigh(), uuidProto.getLow()); } + public static Rs3.UUID uuidToProto(final UUID uuid) { + return Rs3.UUID.newBuilder() + .setHigh(uuid.getMostSignificantBits()) + .setLow(uuid.getLeastSignificantBits()) + .build(); + } + public static PssCheckpoint pssCheckpointFromProto( final UUID storeId, final int pssId, @@ -227,6 +235,12 @@ public static PssCheckpoint pssCheckpointFromProto( return new PssCheckpoint(storeId, pssId, slateDbCheckpoint, null); } + public static Rs3.LSSId lssIdProto(final LssId lssId) { + return Rs3.LSSId.newBuilder() + .setId(lssId.id()) + .build(); + } + public static void checkField(final Supplier check, final String field) { if (!check.get()) { throw new RuntimeException("rs3 resp proto missing field " + field); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java index 3c84eadaf..053ed7b33 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java @@ -12,9 +12,6 @@ package dev.responsive.kafka.internal.utils; -import dev.responsive.kafka.internal.db.rs3.client.LssId; -import dev.responsive.rs3.Rs3; -import java.util.UUID; import java.util.regex.Pattern; import org.apache.kafka.common.utils.Bytes; import org.slf4j.Logger; @@ -129,21 +126,4 @@ public static Bytes incrementWithoutOverflow(final Bytes input) { return null; } } - - public static UUID uuidFromProto(final Rs3.UUID uuid) { - return new UUID(uuid.getHigh(), uuid.getLow()); - } - - public static Rs3.UUID uuidToProto(final UUID uuid) { - return Rs3.UUID.newBuilder() - .setHigh(uuid.getMostSignificantBits()) - .setLow(uuid.getLeastSignificantBits()) - .build(); - } - - public static Rs3.LSSId lssIdProto(final LssId lssId) { - return Rs3.LSSId.newBuilder() - .setId(lssId.id()) - .build(); - } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java index 94295e405..473e0fb47 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java @@ -14,10 +14,10 @@ import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.basicPutProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.createStoreOptionsProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.lssIdProto; +import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.uuidToProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.walOffsetProto; import static dev.responsive.kafka.internal.db.rs3.client.grpc.GrpsRs3TestUtil.newEndOfStreamResult; -import static dev.responsive.kafka.internal.utils.Utils.lssIdProto; -import static dev.responsive.kafka.internal.utils.Utils.uuidToProto; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan;