Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ResponsiveConfig extends AbstractConfig {

public static final String STORAGE_BACKEND_TYPE_CONFIG = "responsive.storage.backend.type";
private static final String STORAGE_BACKEND_TYPE_DOC = "The storage backend";
private static final StorageBackend STORAGE_BACKEND_TYPE_DEFAULT = StorageBackend.CASSANDRA;
private static final StorageBackend STORAGE_BACKEND_TYPE_DEFAULT = StorageBackend.MONGO_DB;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I'm a little anxious about this, not sure whether or not existing customers set this explicitly or not. please check with them first!

I originally thought you were just going to change the default in the test configurations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry, good call. We should check with them though because I think we do want to shift to Mongo as the default (or do we?)

I'll extract this out into a separate PR that we can merge if/when we're ready. Does it make sense to change the default for the tests before we change the default itself?


public static final String RESPONSIVE_MODE = "responsive.mode";
public static final String RESPONSIVE_MODE_DEFAULT = ResponsiveMode.RUN.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.CassandraClientFactory;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
Expand Down Expand Up @@ -90,7 +91,7 @@ public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> con
}
};

private final CassandraClientFactory mockCassandryFactory = new CassandraClientFactory() {
private final CassandraClientFactory mockCassandraFactory = new CassandraClientFactory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

@Override
public CqlSession createCqlSession(
final ResponsiveConfig config,
Expand Down Expand Up @@ -126,6 +127,7 @@ public void setUp() {

properties.put(RESPONSIVE_ORG_CONFIG, "responsive");
properties.put(RESPONSIVE_ENV_CONFIG, "test");
properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.CASSANDRA.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the point of this PR is to change our tests to run with Mongo?

Copy link
Contributor Author

@ableegoldman ableegoldman Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of them are Cassandra-specific or rely on a mocked Cassandra client to do something like fault injection, as is the case here. I went through and left a note on all the other tests that I hardcoded to use the Cassandra test harness

This specific test actually isn't using the test harness and only had to be updated because I changed the default. Since I'm backing that change out this test actually doesn't need to be updated yet anyways, and I'll back out this change too (so that we remember to update this test if/when we do change the actual default)

}

@SuppressWarnings("resource")
Expand All @@ -145,7 +147,7 @@ public void shouldInvalidateBadConfigs() {
builder.build(),
properties,
supplier,
mockCassandryFactory
mockCassandraFactory
)
);
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;

import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import dev.responsive.kafka.testutils.ResponsiveConfigParam;
Expand All @@ -56,11 +57,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith(ResponsiveExtension.class)
public class GlobalStoreIntegrationTest {

@RegisterExtension
static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global stores are incompatible with Mongo(I guess we should fix this...?)


private static final String INPUT_TOPIC = "input";
private static final String GLOBAL_TOPIC = "global";
private static final String OUTPUT_TOPIC = "output";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions;
import dev.responsive.kafka.internal.db.mongo.MongoKVTable;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.spec.DefaultTableSpec;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
Expand All @@ -82,7 +81,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -123,11 +121,8 @@

public class ResponsiveKeyValueStoreRestoreIntegrationTest {

// hardcoding CASSANDRA, but this test passes with MONGODB as well
private static final StorageBackend BACKEND = StorageBackend.CASSANDRA;

@RegisterExtension
static ResponsiveExtension EXTENSION = new ResponsiveExtension(BACKEND);
static ResponsiveExtension EXTENSION = new ResponsiveExtension();

private static final int NUM_PARTITIONS = 1;
private static final int MAX_POLL_MS = 5000;
Expand Down Expand Up @@ -172,7 +167,7 @@ public void after() {
@ParameterizedTest
@EnumSource(KVSchema.class)
public void shouldFlushStoresBeforeClose(final KVSchema type) throws Exception {
final Map<String, Object> properties = getMutableProperties();
final Map<String, Object> properties = getMutableProperties(type);
final KafkaProducer<Long, Long> producer = new KafkaProducer<>(properties);
final KafkaClientSupplier defaultClientSupplier = new DefaultKafkaClientSupplier();
final CassandraClientFactory defaultFactory = new DefaultCassandraClientFactory();
Expand Down Expand Up @@ -205,7 +200,7 @@ public void shouldFlushStoresBeforeClose(final KVSchema type) throws Exception {
@EnumSource(KVSchema.class)
public void shouldRepairOffsetsIfOutOfRangeAndConfigured(final KVSchema type) throws Exception {
// Given:
final Map<String, Object> properties = getMutableProperties();
final Map<String, Object> properties = getMutableProperties(type);
properties.put(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG, true);
final KafkaProducer<Long, Long> producer = new KafkaProducer<>(properties);
final KafkaClientSupplier defaultClientSupplier = new DefaultKafkaClientSupplier();
Expand Down Expand Up @@ -274,7 +269,7 @@ public void shouldRepairOffsetsIfOutOfRangeAndConfigured(final KVSchema type) th
@ParameterizedTest
@EnumSource(KVSchema.class)
public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exception {
final Map<String, Object> properties = getMutableProperties();
final Map<String, Object> properties = getMutableProperties(type);
final KafkaProducer<Long, Long> producer = new KafkaProducer<>(properties);
final KafkaClientSupplier defaultClientSupplier = new DefaultKafkaClientSupplier();
final CassandraClientFactory defaultFactory = new DefaultCassandraClientFactory();
Expand Down Expand Up @@ -362,35 +357,20 @@ private RemoteKVTable<?> remoteKVTable(
final TopicPartition changelog
) throws InterruptedException, TimeoutException {
final RemoteKVTable<?> table;
if (EXTENSION.backend == StorageBackend.CASSANDRA) {

if (type == KVSchema.FACT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we're removing the ability to test with cassandra and instead tying that to the KVSchema type? Am I reading that correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for this specific test (sorry I meant to leave a comment to clarify before you read this)

My thought was, since we still have at least one customer running Cassandra fact stores then it makes sense to keep some tests parametrized to run that. But no one is running Cassandra KV stores, we would recommend Mongo for that, so we might as well change this to run the KV schema variation against mongo.

Is the method of parametrizing confusing though? I can change it so that the storage backend is the parameter and we determine the schema type based on that.

final CassandraClient cassandraClient = defaultFactory.createClient(
defaultFactory.createCqlSession(config, null),
config);

switch (type) {
case KEY_VALUE:
final SubPartitioner partitioner = SubPartitioner.create(
OptionalInt.empty(),
NUM_PARTITIONS,
table = cassandraClient.factFactory()
.create(new DefaultTableSpec(
aggName(),
config,
changelog.topic()
);
table = cassandraClient.kvFactory()
.create(new DefaultTableSpec(aggName(), partitioner, TtlResolver.NO_TTL));
break;
case FACT:
table = cassandraClient.factFactory()
.create(new DefaultTableSpec(
aggName(),
TablePartitioner.defaultPartitioner(),
TtlResolver.NO_TTL
));
break;
default:
throw new IllegalArgumentException("Unexpected type " + type);
}
} else if (EXTENSION.backend == StorageBackend.MONGO_DB) {
TablePartitioner.defaultPartitioner(),
TtlResolver.NO_TTL
));

} else if (type == KVSchema.KEY_VALUE) {
final var hostname = config.getString(MONGO_ENDPOINT_CONFIG);
final String user = config.getString(MONGO_USERNAME_CONFIG);
final Password pass = config.getPassword(MONGO_PASSWORD_CONFIG);
Expand All @@ -409,7 +389,7 @@ private RemoteKVTable<?> remoteKVTable(
);
table.init(0);
} else {
throw new IllegalArgumentException(EXTENSION.backend + " Unsupported");
throw new IllegalArgumentException("Unsupported type: " + type);
}
return table;
}
Expand Down Expand Up @@ -573,9 +553,17 @@ public ConsumerRecords<byte[], byte[]> record(final ConsumerRecords<byte[], byte
}
}

private Map<String, Object> getMutableProperties() {
private Map<String, Object> getMutableProperties(final KVSchema type) {
final Map<String, Object> properties = new HashMap<>(responsiveProps);

if (type == KVSchema.FACT) {
properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.CASSANDRA.name());
} else if (type == KVSchema.KEY_VALUE) {
properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.MONGO_DB.name());
} else {
throw new IllegalArgumentException("Unexpected schema type: " + type.name());
}

properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
Expand Down Expand Up @@ -603,14 +591,6 @@ private Map<String, Object> getMutableProperties() {
return properties;
}

private ResponsiveKeyValueParams params(final KVSchema type, final String name) {
switch (type) {
case KEY_VALUE: return ResponsiveKeyValueParams.keyValue(name);
case FACT: return ResponsiveKeyValueParams.fact(name);
default: throw new IllegalArgumentException();
}
}

private long firstOffset(final TopicPartition topic)
throws ExecutionException, InterruptedException {
return admin.listOffsets(Map.of(topic, OffsetSpec.earliest())).all().get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import dev.responsive.kafka.api.stores.ResponsiveWindowParams;
Expand Down Expand Up @@ -84,12 +85,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.CassandraContainer;

@ExtendWith(ResponsiveExtension.class)
public class TablePartitionerIntegrationTest {

@RegisterExtension
static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table partitioner is an exclusively cassandra concept


private static final int NUM_PARTITIONS_INPUT = 2;
private static final int NUM_PARTITIONS_OUTPUT = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.TtlProvider;
import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
Expand All @@ -50,12 +51,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.CassandraContainer;

@ExtendWith(ResponsiveExtension.class)
class CassandraFactTableIntegrationTest {

@RegisterExtension
static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously this test is cassandra specific 😄 (ditto the one below)


private String storeName; // ie the "kafkaName", NOT the "cassandraName"
private ResponsiveKeyValueParams params;
private CassandraClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.TtlProvider;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
Expand All @@ -49,12 +50,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.CassandraContainer;

@ExtendWith(ResponsiveExtension.class)
public class CassandraKVTableIntegrationTest {

@RegisterExtension
static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA);

// Set up with 4 subpartitions per kafka partition
private static final int NUM_SUBPARTITIONS_TOTAL = 8;
private static final int NUM_KAFKA_PARTITIONS = NUM_SUBPARTITIONS_TOTAL / 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.internal.db.BatchFlusher;
import dev.responsive.kafka.internal.db.BytesKeySpec;
import dev.responsive.kafka.internal.db.CassandraClient;
Expand Down Expand Up @@ -101,18 +102,22 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.testcontainers.containers.CassandraContainer;

@ExtendWith({MockitoExtension.class, ResponsiveExtension.class})
@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)

public class CommitBufferTest {

@RegisterExtension
static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should eventually migrate this test to Mongo but that will be a pretty huge shift, so imo it makes sense to leave this one as Cassandra for now since it's pretty much hardcoded into the test setup


private static final KeySpec<Bytes> KEY_SPEC = new BytesKeySpec();
private static final Bytes KEY = Bytes.wrap(ByteBuffer.allocate(4).putInt(0).array());
private static final byte[] VALUE = new byte[]{1};
Expand Down
Loading