diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java index be55abea0..992c3c6ee 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java @@ -27,6 +27,8 @@ import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import dev.responsive.kafka.api.config.StorageBackend; import java.net.InetSocketAddress; import java.time.Duration; @@ -49,6 +51,14 @@ public class E2ETestUtils { private static final Logger LOG = LoggerFactory.getLogger(E2ETestUtils.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public static ObjectNode buildAssertionContext(final String message) { + final var details = MAPPER.createObjectNode(); + details.put("msg", message); + return details; + } + /** * Creates topics if they do not already exist. * diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/UncaughtStreamsAntithesisHandler.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/UncaughtStreamsAntithesisHandler.java index 663d6fa7e..1a2438562 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/UncaughtStreamsAntithesisHandler.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/UncaughtStreamsAntithesisHandler.java @@ -25,6 +25,8 @@ import com.datastax.oss.driver.api.core.servererrors.UnavailableException; import com.datastax.oss.driver.api.core.servererrors.WriteFailureException; import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.mongodb.MongoNotPrimaryException; import com.mongodb.MongoQueryException; import com.mongodb.MongoSocketReadException; @@ -54,7 +56,13 @@ public StreamThreadExceptionResponse handle(final Throwable exception) { causalSummary(exception, new LinkedList<>()), exception ); - Assert.unreachable("Uncaught exception on test app stream thread", null); + + final ObjectNode assertNode = new ObjectMapper().createObjectNode(); + assertNode.put("exceptionClass", exception.getClass().getName()); + assertNode.put("exceptionMessage", exception.getMessage()); + assertNode.put("summary", causalSummary(exception, new LinkedList<>())); + Assert.unreachable("Uncaught exception on test app stream thread", assertNode); + } return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; } @@ -69,6 +77,13 @@ private String causalSummary(final Throwable t, final List seen) { } private boolean shouldLogError(final Throwable throwable, List seen) { + if (throwable instanceof InjectedE2ETestException) { + final ObjectNode assertNode = new ObjectMapper().createObjectNode(); + assertNode.put("seenExceptions", seen.toString()); + Assert.reachable("Caught injected e2e test exception", assertNode); + return false; + } + final List> dontcare = List.of( AllNodesFailedException.class, ConnectException.class, diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java index bea4db1a1..0db5601bc 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java @@ -1,8 +1,9 @@ package dev.responsive.examples.e2etest; +import static dev.responsive.examples.common.E2ETestUtils.buildAssertionContext; + import com.antithesis.sdk.Assert; import com.antithesis.sdk.Lifecycle; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import dev.responsive.examples.common.E2ETestUtils; @@ -46,8 +47,6 @@ public class E2ETestDriver { private static final Logger LOG = LoggerFactory.getLogger(E2ETestDriver.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - private final UrandomGenerator randomGenerator = new UrandomGenerator(); private final Map properties; private final int numKeys; @@ -351,17 +350,24 @@ private synchronized List popOffsets(final long upTo) { } int timeout = 300; if (Duration.between(start, Instant.now()).getSeconds() > timeout) { + final String errorMessage = String.format( - "waited longer than %d seconds for offset %d %d %s", + "waited longer than %d seconds for offset %d of partition %d (earliest sent: %s)", timeout, upTo, partition, sent.isEmpty() ? "null" : sent.get(0).toString() ); - Assert.unreachable( - String.format("waited longer than %d seconds for offset", timeout), - E2ETestDriver.buildDetails(errorMessage) - ); + + final ObjectNode errorDetails = buildAssertionContext(errorMessage); + errorDetails.put("key", key); + errorDetails.put("partition", partition); + errorDetails.put("offset", upTo); + errorDetails.put("timeoutSec", timeout); + errorDetails.put("earliestSent", sent.isEmpty() ? "null" : sent.get(0).toString()); + + Assert.unreachable("Waited longer than timeout to pop offsets", errorDetails); + LOG.error(errorMessage); throw new IllegalStateException(errorMessage); } @@ -376,12 +382,6 @@ private synchronized List popOffsets(final long upTo) { } } - private static ObjectNode buildDetails(final String errorMessage) { - final var details = MAPPER.createObjectNode(); - details.put("msg", errorMessage); - return details; - } - private Instant lastCommittedOffsetLog = Instant.EPOCH; private void maybeCheckCommittedAndEndOffsets() { @@ -401,30 +401,45 @@ private void checkCommittedAndEndOffsets() { } private void checkNoStalledPartitions(final CommittedAndEndOffsets offsets) { - for (int p = 0; p < partitions; p++) { - final var tp = new TopicPartition(inputTopic, p); - if (stalledPartitions.containsKey(p)) { - final var stalledPartition = stalledPartitions.get(p); + for (int partition = 0; partition < partitions; partition++) { + final var tp = new TopicPartition(inputTopic, partition); + if (stalledPartitions.containsKey(partition)) { + final var stalledPartition = stalledPartitions.get(partition); if (offsets.inputCommitted().get(tp) > stalledPartition.offset()) { LOG.info("resume faults"); faultStopper.resumeFaults(); - stalledPartitions.remove(p); + stalledPartitions.remove(partition); } else if (offsets.timestamp() .isAfter(stalledPartition.detected().plus(stalledPartitionThreshold))) { + + final long inputCommittedOffset = offsets.inputCommitted.get(tp); + final Duration timeStalled = Duration.between( + stalledPartition.detected(), + offsets.timestamp() + ); + final String errorMessage = String.format( "Partition %d has not made progress from offset %d (current %d) for %s", - p, + partition, stalledPartition.offset(), - offsets.inputCommitted.get(tp), - Duration.between(stalledPartition.detected(), offsets.timestamp()) + inputCommittedOffset, + timeStalled ); + + final ObjectNode errorDetails = buildAssertionContext(errorMessage); + errorDetails.put("partition", partition); + errorDetails.put("lastCommittedOffset", stalledPartition.offset()); + errorDetails.put("inputCommittedOffset", inputCommittedOffset); + errorDetails.put("timeStalledMs", timeStalled.toMillis()); + Assert.unreachable("Stalled partition", errorDetails); + Assert.unreachable( String.format("Partition %d has not made progress from offset %d (current %d)", - p, + partition, stalledPartition.offset(), offsets.inputCommitted.get(tp) ), - E2ETestDriver.buildDetails(errorMessage) + errorDetails ); LOG.error(errorMessage); throw new IllegalStateException(errorMessage); @@ -446,12 +461,15 @@ private void checkNoStalledPartitions(final CommittedAndEndOffsets offsets) { if (Duration.between(os.timestamp(), offsets.timestamp()) .compareTo(faultStopThreshold) > 0) { LOG.info("pausing faults due stall on partition {} at {} {}", - p, + partition, currentCommitted, offsets.timestamp ); faultStopper.pauseFaults(); - stalledPartitions.put(p, new StalledPartition(offsets.timestamp(), currentCommitted)); + stalledPartitions.put( + partition, + new StalledPartition(offsets.timestamp(), currentCommitted) + ); break; } } @@ -580,14 +598,14 @@ private void updateReceived( } final var expectedChecksum = checksum.current(); if (!Arrays.equals(expectedChecksum, observedChecksum)) { - Assert.unreachable( - String.format("checksum mismatch - key(%s), recvdCount(%d), %s %s", - key, - recvdCount, - Arrays.toString(checksum.current()), - Arrays.toString(observedChecksum)), null - ); - throw new IllegalStateException("checksum mismatch"); + final String errorMessage = "checksum mismatch for key " + key; + final ObjectNode errorDetails = buildAssertionContext(errorMessage); + errorDetails.put("key", key); + errorDetails.put("expectedCheckSum", expectedChecksum); + errorDetails.put("observedCheckSum", observedChecksum); + errorDetails.put("numRecordsReceived", recvdCount); + Assert.unreachable("Checksum mismatch", errorDetails); + throw new IllegalStateException(errorMessage); } } } diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/ResultsComparatorService.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/ResultsComparatorService.java index e627a3992..3ae4076fe 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/ResultsComparatorService.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/ResultsComparatorService.java @@ -16,6 +16,7 @@ package dev.responsive.examples.regression; +import static dev.responsive.examples.common.E2ETestUtils.buildAssertionContext; import static dev.responsive.examples.regression.RegConstants.NUM_PARTITIONS; import static dev.responsive.examples.regression.RegConstants.resultsTopic; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; @@ -24,6 +25,7 @@ import com.antithesis.sdk.Assert; import com.antithesis.sdk.Lifecycle; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ArrayListMultimap; import com.google.common.util.concurrent.AbstractExecutionThreadService; import dev.responsive.examples.common.EventSignals; @@ -129,13 +131,24 @@ protected void run() throws Exception { baseline.remove(); matches++; } else if (responsive.hasNext() && baseline.hasNext()) { - Assert.unreachable(String.format( + final String errorMessage = String.format( "Expected to see identical output records in identical order, but the next set " + "of records did not match up. Most recent record from responsive is %s " + "and most recent record from baseline is %s", r.record, - b.record - ), null); + b.record); + + final ObjectNode errorDetails = buildAssertionContext(errorMessage); + errorDetails.put("responsiveKey", r.record.key()); + errorDetails.put("baselineKey", b.record.key()); + errorDetails.put("responsiveValue", r.record.value().toString()); + errorDetails.put("baselineValue", b.record.value().toString()); + errorDetails.put("responsiveRecord", r.record.toString()); + errorDetails.put("baselineRecord", b.record.toString()); + + Assert.unreachable("Mismatch between next record of Responsive & baseline", + errorDetails); + } else { // one of the streams is behind so we'll wait for the next // poll to see if any records come up here