diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactFlushManager.java index 2d1c0e79b..cf7d68242 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactFlushManager.java @@ -64,7 +64,7 @@ public RemoteWriter createWriter( @Override public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) { return String.format("", - batchOffset, table.fetchOffset(kafkaPartition)); + batchOffset, table.lastWrittenOffset(kafkaPartition)); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java index 662dc3429..8d97d0313 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java @@ -248,7 +248,7 @@ public CassandraFactFlushManager init( } @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { final BoundStatement bound = fetchOffset .bind() .setInt(PARTITION_KEY.bind(), kafkaPartition); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKVFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKVFlushManager.java index b96b89447..6861353f5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKVFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKVFlushManager.java @@ -71,7 +71,7 @@ public RemoteWriter createWriter( @Override public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) { return String.format(", ", - batchOffset, table.fetchOffset(kafkaPartition), + batchOffset, table.lastWrittenOffset(kafkaPartition), epoch, table.fetchEpoch(failedTablePartition)); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java index f658055f0..d830f02ab 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java @@ -507,7 +507,7 @@ public BoundStatement delete( } @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { final int metadataTablePartition = partitioner.metadataTablePartition(kafkaPartition); final List result = client.execute( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java index b73d8a17c..25eaaeef6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java @@ -98,7 +98,7 @@ public String failedFlushInfo( final SegmentPartition failedTablePartition ) { return String.format(", ", - batchOffset, table.fetchOffset(kafkaPartition), + batchOffset, table.lastWrittenOffset(kafkaPartition), epoch, table.fetchEpoch(failedTablePartition)); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowTable.java index 176ca2e02..37dff6749 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowTable.java @@ -627,7 +627,7 @@ private BoundStatement expireSegment(final Segmenter.SegmentPartition segmentToD } @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { final Segmenter.SegmentPartition metadataPartition = partitioner.metadataTablePartition(kafkaPartition); final List result = client.execute( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoKVFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoKVFlushManager.java index b900c9e9f..c283e87b9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoKVFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoKVFlushManager.java @@ -71,7 +71,7 @@ public RemoteWriter createWriter( @Override public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) { return String.format(", ", - batchOffset, table.fetchOffset(kafkaPartition), + batchOffset, table.lastWrittenOffset(kafkaPartition), table.localEpoch(kafkaPartition), table.fetchEpoch(kafkaPartition)); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoSessionFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoSessionFlushManager.java index b912e6ffb..63023891d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoSessionFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoSessionFlushManager.java @@ -104,7 +104,7 @@ public String failedFlushInfo( final SegmentPartition failedTablePartition ) { return String.format(", ", - batchOffset, table.fetchOffset(kafkaPartition), + batchOffset, table.lastWrittenOffset(kafkaPartition), table.localEpoch(kafkaPartition), table.fetchEpoch(kafkaPartition)); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java index 133c95403..803760d2a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java @@ -103,7 +103,7 @@ public String failedFlushInfo( final SegmentPartition failedTablePartition ) { return String.format(", ", - batchOffset, table.fetchOffset(kafkaPartition), + batchOffset, table.lastWrittenOffset(kafkaPartition), table.localEpoch(kafkaPartition), table.fetchEpoch(kafkaPartition)); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java index f0c450fa3..79ca898ee 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java @@ -61,9 +61,12 @@ S delete( ); /** + * Get the offset corresponding to the last write to the table for a specific + * Kafka partition. + * * @param kafkaPartition the kafka partition * @return the current offset fetched from the metadata table * partition for the given kafka partition */ - long fetchOffset(final int kafkaPartition); + long lastWrittenOffset(final int kafkaPartition); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java index 5430ea604..4bf8c871d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java @@ -180,7 +180,7 @@ public BoundStatement delete(int kafkaPartition, Bytes key) { } @Override - public long fetchOffset(int kafkaPartition) { + public long lastWrittenOffset(int kafkaPartition) { checkKafkaPartition(kafkaPartition); return ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java index e9436a110..cb3c93207 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java @@ -288,7 +288,7 @@ public WriteModel delete(final int kafkaPartition, final Bytes key) { } @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { final KVMetadataDoc result = metadata.find( Filters.eq(KVMetadataDoc.PARTITION, kafkaPartition) ).first(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTable.java index 8aa44daf1..a8d3d8b7c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoSessionTable.java @@ -448,7 +448,7 @@ public long fetchEpoch(final int kafkaPartition) { } @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { final SessionMetadataDoc remoteMetadata = metadata.find( Filters.eq(SessionMetadataDoc.PARTITION, kafkaPartition) ).first(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTable.java index 9c1b05bb9..7bbfaa24d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTable.java @@ -332,7 +332,7 @@ public long fetchEpoch(final int kafkaPartition) { } @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { final WindowMetadataDoc remoteMetadata = metadata.find( Filters.eq(WindowMetadataDoc.PARTITION, kafkaPartition) ).first(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java index a0c59b562..6d6da3b02 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3KVTable.java @@ -188,7 +188,7 @@ public WalEntry delete(final int kafkaPartition, final Bytes key) { } @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { return fetchOffset; } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java index 9fee2b353..a046afffe 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java @@ -182,7 +182,7 @@ public void restoreBatch(final Collection> record // of auto.commit.interval.ms) unlike the changelog equivalent which // always restores from scratch final int partition = rec.partition(); - final long offset = table.fetchOffset(partition); + final long offset = table.lastWrittenOffset(partition); if (rec.offset() < offset) { // ignore records that have already been processed - race conditions // are not important since the worst case we'll have just not written diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index 89d339d71..772cbe26f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -155,7 +155,7 @@ public static PartitionedOperations create( config ); - final long restoreStartOffset = table.fetchOffset(changelog.partition()); + final long restoreStartOffset = table.lastWrittenOffset(changelog.partition()); registration = new ResponsiveStoreRegistration( name.kafkaName(), changelog, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java index f33c0f8f2..93926bb7d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java @@ -140,7 +140,7 @@ public static RemoteWindowOperations create( params.retainDuplicates(), responsiveConfig ); - final long restoreStartOffset = table.fetchOffset(changelog.partition()); + final long restoreStartOffset = table.lastWrittenOffset(changelog.partition()); registration = new ResponsiveStoreRegistration( name.kafkaName(), changelog, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java index af6ae93c2..3d3e554af 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java @@ -117,7 +117,7 @@ public static SessionOperationsImpl create( false, responsiveConfig ); - final long restoreStartOffset = table.fetchOffset(changelog.partition()); + final long restoreStartOffset = table.lastWrittenOffset(changelog.partition()); final var registration = new ResponsiveStoreRegistration( name.kafkaName(), changelog, diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java index 3555bfc61..2163a97b8 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java @@ -179,7 +179,7 @@ public void shouldFlushStoresBeforeClose(final KVSchema type) throws Exception { final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(properties); final RemoteKVTable table = remoteKVTable(type, defaultFactory, config, changelog); - final long cassandraOffset = table.fetchOffset(0); + final long cassandraOffset = table.lastWrittenOffset(0); assertThat(cassandraOffset, greaterThan(0L)); final List> changelogRecords @@ -307,7 +307,7 @@ public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exceptio table = remoteKVTable(type, defaultFactory, config, changelog); - final long remoteOffset = table.fetchOffset(0); + final long remoteOffset = table.lastWrittenOffset(0); assertThat(remoteOffset, greaterThan(0L)); final long changelogOffset = admin.listOffsets(Map.of(changelog, OffsetSpec.latest())).all() diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java index 4c4150d99..39e24f6dd 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java @@ -257,13 +257,13 @@ public void shouldFlushToRemoteTableWithoutSubpartitions() throws Exception { final CassandraFactTable table = CassandraFactTable.create( new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL, config), client); - final var offset0 = table.fetchOffset(0); - final var offset1 = table.fetchOffset(1); + final var offset0 = table.lastWrittenOffset(0); + final var offset1 = table.lastWrittenOffset(1); assertThat(offset0, is(notNullValue())); assertThat(offset1, is(notNullValue())); - Assertions.assertEquals(table.fetchOffset(2), NO_COMMITTED_OFFSET); + Assertions.assertEquals(table.lastWrittenOffset(2), NO_COMMITTED_OFFSET); LOG.info("Checking data in remote table"); // these store ValueAndTimestamp, so we need to just pluck the last 8 bytes diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java index a773e10db..0a0a415d0 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java @@ -92,8 +92,8 @@ public void shouldInitializeWithCorrectMetadata() throws Exception { final var token = schema.init(1); schema.init(2); client.execute(schema.setOffset(2, 10)); - final long offset1 = schema.fetchOffset(1); - final long offset2 = schema.fetchOffset(2); + final long offset1 = schema.lastWrittenOffset(1); + final long offset2 = schema.lastWrittenOffset(2); // Then: assertThat(offset1, is(-1L)); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java index 76a2ed8b3..9f6f28434 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/RS3KVTableIntegrationTest.java @@ -177,7 +177,7 @@ public void shouldRestoreFromLowestPssWrittenOffset() { table.init(PARTITION_ID); // when: - final var restorePartition = table.fetchOffset(PARTITION_ID); + final var restorePartition = table.lastWrittenOffset(PARTITION_ID); assertThat(restorePartition, is(100L)); } @@ -203,7 +203,7 @@ public void shouldRestoreFromStartIfLowestPssFlushedOffsetIsUnspecified() { table.init(PARTITION_ID); // when: - final var restorePartition = table.fetchOffset(PARTITION_ID); + final var restorePartition = table.lastWrittenOffset(PARTITION_ID); assertThat(restorePartition, is(ResponsiveStoreRegistration.NO_COMMITTED_OFFSET)); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index befe3f254..2b14e27ee 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -347,7 +347,7 @@ public void shouldFenceOffsetFlushBasedOnMetadataRowEpoch() { assertThat(table.fetchEpoch(8), is(1L)); assertThat(table.fetchEpoch(9), is(2L)); assertThat(table.fetchEpoch(10), is(1L)); - assertThat(table.fetchOffset(changelog.partition()), is(-1L)); + assertThat(table.lastWrittenOffset(changelog.partition()), is(-1L)); } } @@ -449,10 +449,10 @@ public void shouldOnlyFlushWhenBufferFullWithRecordsTrigger() { buffer.flush(9L); // Then: - assertThat(table.fetchOffset(KAFKA_PARTITION), is(-1L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); buffer.put(Bytes.wrap(new byte[]{10}), VALUE, CURRENT_TS); buffer.flush(10L); - assertThat(table.fetchOffset(KAFKA_PARTITION), is(10L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(10L)); } } @@ -473,10 +473,10 @@ public void shouldOnlyFlushWhenBufferFullWithBytesTrigger() { buffer.flush(9L); // Then: - assertThat(table.fetchOffset(KAFKA_PARTITION), is(-1L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); buffer.put(Bytes.wrap(new byte[]{10}), value, CURRENT_TS); buffer.flush(10L); - assertThat(table.fetchOffset(KAFKA_PARTITION), is(10L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(10L)); } } @@ -496,10 +496,10 @@ public void shouldOnlyFlushWhenIntervalTriggerElapsed() { buffer.flush(1L); // Then: - assertThat(table.fetchOffset(KAFKA_PARTITION), is(-1L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); clock.set(clock.get().plus(Duration.ofSeconds(35))); buffer.flush(5L); - assertThat(table.fetchOffset(KAFKA_PARTITION), is(5L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(5L)); } } @@ -518,7 +518,7 @@ public void shouldUpdateOffsetWhenNoRecordsInBuffer() { buffer.flush(10L); // Then: - assertThat(table.fetchOffset(KAFKA_PARTITION), is(10L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(10L)); } } @@ -534,7 +534,7 @@ private Optional measureFlushTime( buffer.put(Bytes.wrap(new byte[]{18}), VALUE, CURRENT_TS); while (clock.get().isBefore(to)) { buffer.flush(flushOffset); - if (table.fetchOffset(KAFKA_PARTITION) == flushOffset) { + if (table.lastWrittenOffset(KAFKA_PARTITION) == flushOffset) { return Optional.of(clock.get()); } clock.set(clock.get().plus(Duration.ofSeconds(1))); @@ -613,7 +613,7 @@ public void shouldRestoreRecords() { buffer.restoreBatch(List.of(record), -1L); // Then: - assertThat(table.fetchOffset(KAFKA_PARTITION), is(100L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(100L)); final byte[] value = table.get(KAFKA_PARTITION, KEY, MIN_VALID_TS); assertThat(value, is(VALUE)); } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDTable.java index d22d782fc..1bd01a0fe 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDTable.java @@ -31,7 +31,7 @@ public TTDTable(final TTDCassandraClient client) { public abstract long count(); @Override - public long fetchOffset(final int kafkaPartition) { + public long lastWrittenOffset(final int kafkaPartition) { return 0; }