Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public RemoteWriter<Bytes, Integer> createWriter(
@Override
public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) {
return String.format("<batchOffset=%d, persistedOffset=%d>",
batchOffset, table.fetchOffset(kafkaPartition));
batchOffset, table.lastWrittenOffset(kafkaPartition));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public CassandraFactFlushManager init(
}

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
final BoundStatement bound = fetchOffset
.bind()
.setInt(PARTITION_KEY.bind(), kafkaPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public RemoteWriter<Bytes, Integer> createWriter(
@Override
public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) {
return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>",
batchOffset, table.fetchOffset(kafkaPartition),
batchOffset, table.lastWrittenOffset(kafkaPartition),
epoch, table.fetchEpoch(failedTablePartition));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public BoundStatement delete(
}

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
final int metadataTablePartition = partitioner.metadataTablePartition(kafkaPartition);

final List<Row> result = client.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public String failedFlushInfo(
final SegmentPartition failedTablePartition
) {
return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>",
batchOffset, table.fetchOffset(kafkaPartition),
batchOffset, table.lastWrittenOffset(kafkaPartition),
epoch, table.fetchEpoch(failedTablePartition));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ private BoundStatement expireSegment(final Segmenter.SegmentPartition segmentToD
}

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
final Segmenter.SegmentPartition metadataPartition =
partitioner.metadataTablePartition(kafkaPartition);
final List<Row> result = client.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public RemoteWriter<Bytes, Integer> createWriter(
@Override
public String failedFlushInfo(final long batchOffset, final Integer failedTablePartition) {
return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>",
batchOffset, table.fetchOffset(kafkaPartition),
batchOffset, table.lastWrittenOffset(kafkaPartition),
table.localEpoch(kafkaPartition), table.fetchEpoch(kafkaPartition));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public String failedFlushInfo(
final SegmentPartition failedTablePartition
) {
return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>",
batchOffset, table.fetchOffset(kafkaPartition),
batchOffset, table.lastWrittenOffset(kafkaPartition),
table.localEpoch(kafkaPartition), table.fetchEpoch(kafkaPartition));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public String failedFlushInfo(
final SegmentPartition failedTablePartition
) {
return String.format("<batchOffset=%d, persistedOffset=%d>, <localEpoch=%d, persistedEpoch=%d>",
batchOffset, table.fetchOffset(kafkaPartition),
batchOffset, table.lastWrittenOffset(kafkaPartition),
table.localEpoch(kafkaPartition), table.fetchEpoch(kafkaPartition));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ S delete(
);

/**
* Get the offset corresponding to the last write to the table for a specific
* Kafka partition.
*
* @param kafkaPartition the kafka partition
* @return the current offset fetched from the metadata table
* partition for the given kafka partition
*/
long fetchOffset(final int kafkaPartition);
long lastWrittenOffset(final int kafkaPartition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public BoundStatement delete(int kafkaPartition, Bytes key) {
}

@Override
public long fetchOffset(int kafkaPartition) {
public long lastWrittenOffset(int kafkaPartition) {
checkKafkaPartition(kafkaPartition);
return ResponsiveStoreRegistration.NO_COMMITTED_OFFSET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public WriteModel<KVDoc> delete(final int kafkaPartition, final Bytes key) {
}

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
final KVMetadataDoc result = metadata.find(
Filters.eq(KVMetadataDoc.PARTITION, kafkaPartition)
).first();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ public long fetchEpoch(final int kafkaPartition) {
}

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
final SessionMetadataDoc remoteMetadata = metadata.find(
Filters.eq(SessionMetadataDoc.PARTITION, kafkaPartition)
).first();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public long fetchEpoch(final int kafkaPartition) {
}

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
final WindowMetadataDoc remoteMetadata = metadata.find(
Filters.eq(WindowMetadataDoc.PARTITION, kafkaPartition)
).first();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public WalEntry delete(final int kafkaPartition, final Bytes key) {
}

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
return fetchOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> record
// of auto.commit.interval.ms) unlike the changelog equivalent which
// always restores from scratch
final int partition = rec.partition();
final long offset = table.fetchOffset(partition);
final long offset = table.lastWrittenOffset(partition);
if (rec.offset() < offset) {
// ignore records that have already been processed - race conditions
// are not important since the worst case we'll have just not written
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public static PartitionedOperations create(
config
);

final long restoreStartOffset = table.fetchOffset(changelog.partition());
final long restoreStartOffset = table.lastWrittenOffset(changelog.partition());
registration = new ResponsiveStoreRegistration(
name.kafkaName(),
changelog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public static RemoteWindowOperations create(
params.retainDuplicates(),
responsiveConfig
);
final long restoreStartOffset = table.fetchOffset(changelog.partition());
final long restoreStartOffset = table.lastWrittenOffset(changelog.partition());
registration = new ResponsiveStoreRegistration(
name.kafkaName(),
changelog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static SessionOperationsImpl create(
false,
responsiveConfig
);
final long restoreStartOffset = table.fetchOffset(changelog.partition());
final long restoreStartOffset = table.lastWrittenOffset(changelog.partition());
final var registration = new ResponsiveStoreRegistration(
name.kafkaName(),
changelog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void shouldFlushStoresBeforeClose(final KVSchema type) throws Exception {
final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(properties);
final RemoteKVTable<?> table = remoteKVTable(type, defaultFactory, config, changelog);

final long cassandraOffset = table.fetchOffset(0);
final long cassandraOffset = table.lastWrittenOffset(0);
assertThat(cassandraOffset, greaterThan(0L));

final List<ConsumerRecord<Long, Long>> changelogRecords
Expand Down Expand Up @@ -307,7 +307,7 @@ public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exceptio

table = remoteKVTable(type, defaultFactory, config, changelog);

final long remoteOffset = table.fetchOffset(0);
final long remoteOffset = table.lastWrittenOffset(0);
assertThat(remoteOffset, greaterThan(0L));

final long changelogOffset = admin.listOffsets(Map.of(changelog, OffsetSpec.latest())).all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,13 @@ public void shouldFlushToRemoteTableWithoutSubpartitions() throws Exception {
final CassandraFactTable table = CassandraFactTable.create(
new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL, config), client);

final var offset0 = table.fetchOffset(0);
final var offset1 = table.fetchOffset(1);
final var offset0 = table.lastWrittenOffset(0);
final var offset1 = table.lastWrittenOffset(1);

assertThat(offset0, is(notNullValue()));
assertThat(offset1, is(notNullValue()));

Assertions.assertEquals(table.fetchOffset(2), NO_COMMITTED_OFFSET);
Assertions.assertEquals(table.lastWrittenOffset(2), NO_COMMITTED_OFFSET);

LOG.info("Checking data in remote table");
// these store ValueAndTimestamp, so we need to just pluck the last 8 bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public void shouldInitializeWithCorrectMetadata() throws Exception {
final var token = schema.init(1);
schema.init(2);
client.execute(schema.setOffset(2, 10));
final long offset1 = schema.fetchOffset(1);
final long offset2 = schema.fetchOffset(2);
final long offset1 = schema.lastWrittenOffset(1);
final long offset2 = schema.lastWrittenOffset(2);

// Then:
assertThat(offset1, is(-1L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void shouldRestoreFromLowestPssWrittenOffset() {
table.init(PARTITION_ID);

// when:
final var restorePartition = table.fetchOffset(PARTITION_ID);
final var restorePartition = table.lastWrittenOffset(PARTITION_ID);
assertThat(restorePartition, is(100L));
}

Expand All @@ -203,7 +203,7 @@ public void shouldRestoreFromStartIfLowestPssFlushedOffsetIsUnspecified() {
table.init(PARTITION_ID);

// when:
final var restorePartition = table.fetchOffset(PARTITION_ID);
final var restorePartition = table.lastWrittenOffset(PARTITION_ID);
assertThat(restorePartition, is(ResponsiveStoreRegistration.NO_COMMITTED_OFFSET));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void shouldFenceOffsetFlushBasedOnMetadataRowEpoch() {
assertThat(table.fetchEpoch(8), is(1L));
assertThat(table.fetchEpoch(9), is(2L));
assertThat(table.fetchEpoch(10), is(1L));
assertThat(table.fetchOffset(changelog.partition()), is(-1L));
assertThat(table.lastWrittenOffset(changelog.partition()), is(-1L));
}
}

Expand Down Expand Up @@ -449,10 +449,10 @@ public void shouldOnlyFlushWhenBufferFullWithRecordsTrigger() {
buffer.flush(9L);

// Then:
assertThat(table.fetchOffset(KAFKA_PARTITION), is(-1L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L));
buffer.put(Bytes.wrap(new byte[]{10}), VALUE, CURRENT_TS);
buffer.flush(10L);
assertThat(table.fetchOffset(KAFKA_PARTITION), is(10L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(10L));
}
}

Expand All @@ -473,10 +473,10 @@ public void shouldOnlyFlushWhenBufferFullWithBytesTrigger() {
buffer.flush(9L);

// Then:
assertThat(table.fetchOffset(KAFKA_PARTITION), is(-1L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L));
buffer.put(Bytes.wrap(new byte[]{10}), value, CURRENT_TS);
buffer.flush(10L);
assertThat(table.fetchOffset(KAFKA_PARTITION), is(10L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(10L));
}
}

Expand All @@ -496,10 +496,10 @@ public void shouldOnlyFlushWhenIntervalTriggerElapsed() {
buffer.flush(1L);

// Then:
assertThat(table.fetchOffset(KAFKA_PARTITION), is(-1L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L));
clock.set(clock.get().plus(Duration.ofSeconds(35)));
buffer.flush(5L);
assertThat(table.fetchOffset(KAFKA_PARTITION), is(5L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(5L));
}
}

Expand All @@ -518,7 +518,7 @@ public void shouldUpdateOffsetWhenNoRecordsInBuffer() {
buffer.flush(10L);

// Then:
assertThat(table.fetchOffset(KAFKA_PARTITION), is(10L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(10L));
}
}

Expand All @@ -534,7 +534,7 @@ private Optional<Instant> measureFlushTime(
buffer.put(Bytes.wrap(new byte[]{18}), VALUE, CURRENT_TS);
while (clock.get().isBefore(to)) {
buffer.flush(flushOffset);
if (table.fetchOffset(KAFKA_PARTITION) == flushOffset) {
if (table.lastWrittenOffset(KAFKA_PARTITION) == flushOffset) {
return Optional.of(clock.get());
}
clock.set(clock.get().plus(Duration.ofSeconds(1)));
Expand Down Expand Up @@ -613,7 +613,7 @@ public void shouldRestoreRecords() {
buffer.restoreBatch(List.of(record), -1L);

// Then:
assertThat(table.fetchOffset(KAFKA_PARTITION), is(100L));
assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(100L));
final byte[] value = table.get(KAFKA_PARTITION, KEY, MIN_VALID_TS);
assertThat(value, is(VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public TTDTable(final TTDCassandraClient client) {
public abstract long count();

@Override
public long fetchOffset(final int kafkaPartition) {
public long lastWrittenOffset(final int kafkaPartition) {
return 0;
}

Expand Down
Loading