Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ private void onCommit(
final TopicPartition p = e.getKey().getPartition();
for (final ResponsiveStoreRegistration storeRegistration
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
// Committed offsets are already exclusive (one more than last consumed offset)
storeRegistration.callbacks().notifyCommit(e.getValue());
}
}
for (final var e : writtenOffsets.entrySet()) {
final TopicPartition p = e.getKey();
for (final ResponsiveStoreRegistration storeRegistration
: storeRegistry.getRegisteredStoresForChangelog(p, threadId)) {
storeRegistration.callbacks().notifyCommit(e.getValue());
// Add one since the written offset is inclusive
storeRegistration.callbacks().notifyCommit(e.getValue() + 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

package dev.responsive.kafka.integration;

import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_CONNECTION_STRING_CONFIG;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.getCassandraValidName;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeInput;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.slurpPartition;
Expand Down Expand Up @@ -57,14 +56,12 @@
import dev.responsive.kafka.internal.db.CassandraClientFactory;
import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions;
import dev.responsive.kafka.internal.db.mongo.MongoKVTable;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.spec.DefaultTableSpec;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.SessionUtil;
import dev.responsive.kafka.testutils.IntegrationTestUtils;
import dev.responsive.kafka.testutils.IntegrationTestUtils.MockResponsiveKafkaStreams;
import dev.responsive.kafka.testutils.ResponsiveConfigParam;
Expand All @@ -75,6 +72,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -107,10 +105,12 @@
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.LoggerFactory;

public class ResponsiveKeyValueStoreRestoreIntegrationTest {

Expand Down Expand Up @@ -185,7 +185,8 @@ public void shouldFlushStoresBeforeClose(final KVSchema type) throws Exception {
final List<ConsumerRecord<Long, Long>> changelogRecords
= slurpPartition(changelog, properties);
final long last = changelogRecords.get(changelogRecords.size() - 1).offset();
assertThat(cassandraOffset, equalTo(last));
// Written offset is exclusive
assertThat(cassandraOffset, equalTo(last + 1));
}
}

Expand Down Expand Up @@ -259,9 +260,11 @@ public void shouldRepairOffsetsIfOutOfRangeAndConfigured(final KVSchema type) th
}
}

@ParameterizedTest
@EnumSource(KVSchema.class)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fault injector doesn't actually work with KVSchema.FACT because it uses a different statement type. I think the off-by-one error was allowing the assertions to pass because we always reconsumed one extra record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found a good way to target the update from flush when using the FACT table. It uses a DefaultBoundStatement which is used by several other queries prior to flush. Without the targeted fault, the task crashes before it reads any input.

public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exception {
// @ParameterizedTest
// @EnumSource(KVSchema.class)
@Test
public void shouldRestoreUnflushedChangelog() throws Exception {
final KVSchema type = KVSchema.KEY_VALUE;
final Map<String, Object> properties = getMutableProperties(type);
final KafkaProducer<Long, Long> producer = new KafkaProducer<>(properties);
final KafkaClientSupplier defaultClientSupplier = new DefaultKafkaClientSupplier();
Expand All @@ -280,6 +283,12 @@ public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exceptio
waitTillFullyConsumed(admin, input, name, Duration.ofSeconds(120));
}

final TopicPartition changelog = new TopicPartition(name + "-" + aggName() + "-changelog", 0);
final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(properties);

final RemoteKVTable<?> changelogTable = remoteKVTable(type, defaultFactory, config, changelog);
assertThat(changelogTable.lastWrittenOffset(0), greaterThan(0L));

// restart with fault injecting cassandra client
final FaultInjectingCassandraClientSupplier cassandraFaultInjector
= new FaultInjectingCassandraClientSupplier();
Expand All @@ -299,22 +308,12 @@ public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exceptio
IntegrationTestUtils
.waitTillConsumedPast(admin, input, name, endInput + 1, Duration.ofSeconds(30));
}
final TopicPartition changelog = new TopicPartition(name + "-" + aggName() + "-changelog", 0);

// Make sure changelog is ahead of remote
final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(properties);
final RemoteKVTable<?> table;

table = remoteKVTable(type, defaultFactory, config, changelog);

final long remoteOffset = table.lastWrittenOffset(0);
final long remoteOffset = changelogTable.lastWrittenOffset(0);
assertThat(remoteOffset, greaterThan(0L));

final long changelogOffset = admin.listOffsets(Map.of(changelog, OffsetSpec.latest())).all()
.get()
.get(changelog)
.offset();
assertThat(remoteOffset, lessThan(changelogOffset));
final long changelogEndOffset = IntegrationTestUtils.endOffset(admin, changelog);
assertThat(remoteOffset, lessThan(changelogEndOffset));

// Restart with restore recorder
final TestKafkaClientSupplier recordingClientSupplier = new TestKafkaClientSupplier();
Expand Down Expand Up @@ -351,30 +350,33 @@ private RemoteKVTable<?> remoteKVTable(
) throws InterruptedException, TimeoutException {
final RemoteKVTable<?> table;

if (type == KVSchema.FACT) {
final CassandraClient cassandraClient = defaultFactory.createClient(
defaultFactory.createCqlSession(config, null),
config);
final CassandraClient cassandraClient = defaultFactory.createClient(
defaultFactory.createCqlSession(config, null),
config);

if (type == KVSchema.FACT) {
table = cassandraClient.factFactory()
.create(new DefaultTableSpec(
aggName(),
TablePartitioner.defaultPartitioner(),
TtlResolver.NO_TTL,
config
));

} else if (type == KVSchema.KEY_VALUE) {
final var connectionString = config.getPassword(MONGO_CONNECTION_STRING_CONFIG).value();
final var mongoClient = SessionUtil.connect(connectionString, "", null);
table = new MongoKVTable(
mongoClient,
final SubPartitioner partitioner = SubPartitioner.create(
OptionalInt.empty(),
NUM_PARTITIONS,
aggName(),
CollectionCreationOptions.fromConfig(config),
TtlResolver.NO_TTL,
config
config,
changelog.topic()
);
table.init(0);
table = cassandraClient.kvFactory()
Copy link
Contributor Author

@hachikuji hachikuji May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman I had to partially revert #383 here. One of the tests shouldRestoreUnflushedChangelog used a cassandra fault injector which did nothing when mongo was used as the backend.

.create(new DefaultTableSpec(
aggName(),
partitioner,
TtlResolver.NO_TTL,
config
));
} else {
throw new IllegalArgumentException("Unsupported type: " + type);
}
Expand Down Expand Up @@ -537,14 +539,7 @@ public ConsumerRecords<byte[], byte[]> record(final ConsumerRecords<byte[], byte
private Map<String, Object> getMutableProperties(final KVSchema type) {
final Map<String, Object> properties = new HashMap<>(responsiveProps);

if (type == KVSchema.FACT) {
properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.CASSANDRA.name());
} else if (type == KVSchema.KEY_VALUE) {
properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.MONGO_DB.name());
} else {
throw new IllegalArgumentException("Unexpected schema type: " + type.name());
}

properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.CASSANDRA.name());
properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void shouldNotifyStoresOnCommittedChangelogWrites() {
sendWrittenOffsets(Map.of(PARTITION2, 456L));

// then:
verify(store2Callbacks).notifyCommit(456L);
verify(store2Callbacks).notifyCommit(457L);
verifyNoInteractions(store1Callbacks);
}

Expand Down
Loading