Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import dev.responsive.kafka.api.config.StorageBackend;
import java.net.InetSocketAddress;
import java.time.Duration;
Expand All @@ -49,6 +51,14 @@
public class E2ETestUtils {
private static final Logger LOG = LoggerFactory.getLogger(E2ETestUtils.class);

private static final ObjectMapper MAPPER = new ObjectMapper();

public static ObjectNode buildAssertionContext(final String message) {
final var details = MAPPER.createObjectNode();
details.put("msg", message);
return details;
}

/**
* Creates topics if they do not already exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.core.servererrors.WriteFailureException;
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.mongodb.MongoNotPrimaryException;
import com.mongodb.MongoQueryException;
import com.mongodb.MongoSocketReadException;
Expand Down Expand Up @@ -54,7 +56,13 @@ public StreamThreadExceptionResponse handle(final Throwable exception) {
causalSummary(exception, new LinkedList<>()),
exception
);
Assert.unreachable("Uncaught exception on test app stream thread", null);

final ObjectNode assertNode = new ObjectMapper().createObjectNode();
assertNode.put("exceptionClass", exception.getClass().getName());
assertNode.put("exceptionMessage", exception.getMessage());
assertNode.put("summary", causalSummary(exception, new LinkedList<>()));
Assert.unreachable("Uncaught exception on test app stream thread", assertNode);

}
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
}
Expand All @@ -69,6 +77,13 @@ private String causalSummary(final Throwable t, final List<Throwable> seen) {
}

private boolean shouldLogError(final Throwable throwable, List<Throwable> seen) {
if (throwable instanceof InjectedE2ETestException) {
final ObjectNode assertNode = new ObjectMapper().createObjectNode();
assertNode.put("seenExceptions", seen.toString());
Assert.reachable("Caught injected e2e test exception", assertNode);
return false;
}

final List<Class<? extends Throwable>> dontcare = List.of(
AllNodesFailedException.class,
ConnectException.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package dev.responsive.examples.e2etest;

import static dev.responsive.examples.common.E2ETestUtils.buildAssertionContext;

import com.antithesis.sdk.Assert;
import com.antithesis.sdk.Lifecycle;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import dev.responsive.examples.common.E2ETestUtils;
Expand Down Expand Up @@ -46,8 +47,6 @@
public class E2ETestDriver {
private static final Logger LOG = LoggerFactory.getLogger(E2ETestDriver.class);

private static final ObjectMapper MAPPER = new ObjectMapper();

private final UrandomGenerator randomGenerator = new UrandomGenerator();
private final Map<String, Object> properties;
private final int numKeys;
Expand Down Expand Up @@ -351,17 +350,24 @@ private synchronized List<RecordMetadata> popOffsets(final long upTo) {
}
int timeout = 300;
if (Duration.between(start, Instant.now()).getSeconds() > timeout) {

final String errorMessage = String.format(
"waited longer than %d seconds for offset %d %d %s",
"waited longer than %d seconds for offset %d of partition %d (earliest sent: %s)",
timeout,
upTo,
partition,
sent.isEmpty() ? "null" : sent.get(0).toString()
);
Assert.unreachable(
String.format("waited longer than %d seconds for offset", timeout),
E2ETestDriver.buildDetails(errorMessage)
);

final ObjectNode errorDetails = buildAssertionContext(errorMessage);
errorDetails.put("key", key);
errorDetails.put("partition", partition);
errorDetails.put("offset", upTo);
errorDetails.put("timeoutSec", timeout);
errorDetails.put("earliestSent", sent.isEmpty() ? "null" : sent.get(0).toString());

Assert.unreachable("Waited longer than timeout to pop offsets", errorDetails);

LOG.error(errorMessage);
throw new IllegalStateException(errorMessage);
}
Expand All @@ -376,12 +382,6 @@ private synchronized List<RecordMetadata> popOffsets(final long upTo) {
}
}

private static ObjectNode buildDetails(final String errorMessage) {
final var details = MAPPER.createObjectNode();
details.put("msg", errorMessage);
return details;
}

private Instant lastCommittedOffsetLog = Instant.EPOCH;

private void maybeCheckCommittedAndEndOffsets() {
Expand All @@ -401,30 +401,45 @@ private void checkCommittedAndEndOffsets() {
}

private void checkNoStalledPartitions(final CommittedAndEndOffsets offsets) {
for (int p = 0; p < partitions; p++) {
final var tp = new TopicPartition(inputTopic, p);
if (stalledPartitions.containsKey(p)) {
final var stalledPartition = stalledPartitions.get(p);
for (int partition = 0; partition < partitions; partition++) {
final var tp = new TopicPartition(inputTopic, partition);
if (stalledPartitions.containsKey(partition)) {
final var stalledPartition = stalledPartitions.get(partition);
if (offsets.inputCommitted().get(tp) > stalledPartition.offset()) {
LOG.info("resume faults");
faultStopper.resumeFaults();
stalledPartitions.remove(p);
stalledPartitions.remove(partition);
} else if (offsets.timestamp()
.isAfter(stalledPartition.detected().plus(stalledPartitionThreshold))) {

final long inputCommittedOffset = offsets.inputCommitted.get(tp);
final Duration timeStalled = Duration.between(
stalledPartition.detected(),
offsets.timestamp()
);

final String errorMessage = String.format(
"Partition %d has not made progress from offset %d (current %d) for %s",
p,
partition,
stalledPartition.offset(),
offsets.inputCommitted.get(tp),
Duration.between(stalledPartition.detected(), offsets.timestamp())
inputCommittedOffset,
timeStalled
);

final ObjectNode errorDetails = buildAssertionContext(errorMessage);
errorDetails.put("partition", partition);
errorDetails.put("lastCommittedOffset", stalledPartition.offset());
errorDetails.put("inputCommittedOffset", inputCommittedOffset);
errorDetails.put("timeStalledMs", timeStalled.toMillis());
Assert.unreachable("Stalled partition", errorDetails);

Assert.unreachable(
String.format("Partition %d has not made progress from offset %d (current %d)",
p,
partition,
stalledPartition.offset(),
offsets.inputCommitted.get(tp)
),
E2ETestDriver.buildDetails(errorMessage)
errorDetails
);
LOG.error(errorMessage);
throw new IllegalStateException(errorMessage);
Expand All @@ -446,12 +461,15 @@ private void checkNoStalledPartitions(final CommittedAndEndOffsets offsets) {
if (Duration.between(os.timestamp(), offsets.timestamp())
.compareTo(faultStopThreshold) > 0) {
LOG.info("pausing faults due stall on partition {} at {} {}",
p,
partition,
currentCommitted,
offsets.timestamp
);
faultStopper.pauseFaults();
stalledPartitions.put(p, new StalledPartition(offsets.timestamp(), currentCommitted));
stalledPartitions.put(
partition,
new StalledPartition(offsets.timestamp(), currentCommitted)
);
break;
}
}
Expand Down Expand Up @@ -580,14 +598,14 @@ private void updateReceived(
}
final var expectedChecksum = checksum.current();
if (!Arrays.equals(expectedChecksum, observedChecksum)) {
Assert.unreachable(
String.format("checksum mismatch - key(%s), recvdCount(%d), %s %s",
key,
recvdCount,
Arrays.toString(checksum.current()),
Arrays.toString(observedChecksum)), null
);
throw new IllegalStateException("checksum mismatch");
final String errorMessage = "checksum mismatch for key " + key;
final ObjectNode errorDetails = buildAssertionContext(errorMessage);
errorDetails.put("key", key);
errorDetails.put("expectedCheckSum", expectedChecksum);
errorDetails.put("observedCheckSum", observedChecksum);
errorDetails.put("numRecordsReceived", recvdCount);
Assert.unreachable("Checksum mismatch", errorDetails);
throw new IllegalStateException(errorMessage);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package dev.responsive.examples.regression;

import static dev.responsive.examples.common.E2ETestUtils.buildAssertionContext;
import static dev.responsive.examples.regression.RegConstants.NUM_PARTITIONS;
import static dev.responsive.examples.regression.RegConstants.resultsTopic;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
Expand All @@ -24,6 +25,7 @@

import com.antithesis.sdk.Assert;
import com.antithesis.sdk.Lifecycle;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import dev.responsive.examples.common.EventSignals;
Expand Down Expand Up @@ -129,13 +131,24 @@ protected void run() throws Exception {
baseline.remove();
matches++;
} else if (responsive.hasNext() && baseline.hasNext()) {
Assert.unreachable(String.format(
final String errorMessage = String.format(
"Expected to see identical output records in identical order, but the next set "
+ "of records did not match up. Most recent record from responsive is %s "
+ "and most recent record from baseline is %s",
r.record,
b.record
), null);
b.record);

final ObjectNode errorDetails = buildAssertionContext(errorMessage);
errorDetails.put("responsiveKey", r.record.key());
errorDetails.put("baselineKey", b.record.key());
errorDetails.put("responsiveValue", r.record.value().toString());
errorDetails.put("baselineValue", b.record.value().toString());
errorDetails.put("responsiveRecord", r.record.toString());
errorDetails.put("baselineRecord", b.record.toString());

Assert.unreachable("Mismatch between next record of Responsive & baseline",
errorDetails);

} else {
// one of the streams is behind so we'll wait for the next
// poll to see if any records come up here
Expand Down
Loading