Skip to content

Commit

Permalink
KAFKA-3935; Fix test_restart_failed_task system test for SinkTasks
Browse files Browse the repository at this point in the history
Fix the test by using a more liberal timeout and forcing more frequent SinkTask.put() calls. Also add some logging to aid future debugging.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]>

Closes apache#1663 from ewencp/kafka-3935-fix-restart-system-test
  • Loading branch information
ewencp authored and ijuma committed Jul 26, 2016
1 parent 8a417c8 commit d154696
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -49,6 +51,8 @@ public class MockConnector extends Connector {

public static final long DEFAULT_FAILURE_DELAY_MS = 15000;

private static final Logger log = LoggerFactory.getLogger(MockConnector.class);

private Map<String, String> config;
private ScheduledExecutorService executor;

Expand All @@ -69,10 +73,12 @@ public void start(Map<String, String> config) {
if (delayMsString != null)
delayMs = Long.parseLong(delayMsString);

log.debug("Started MockConnector with failure delay of {} ms", delayMs);
executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(new Runnable() {
@Override
public void run() {
log.debug("Triggering connector failure");
context.raiseError(new RuntimeException());
}
}, delayMs, TimeUnit.MILLISECONDS);
Expand All @@ -86,6 +92,7 @@ public Class<? extends Task> taskClass() {

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.debug("Creating single task for MockConnector");
return Collections.singletonList(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Map;

public class MockSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class);

private String mockMode;
private long startTimeMs;
Expand All @@ -47,15 +50,21 @@ public void start(Map<String, String> config) {
this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
if (delayMsString != null)
failureDelayMs = Long.parseLong(delayMsString);

log.debug("Started MockSinkTask at {} with failure scheduled in {} ms", startTimeMs, failureDelayMs);
setTimeout();
}
}

@Override
public void put(Collection<SinkRecord> records) {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
if (now > startTimeMs + failureDelayMs)
if (now > startTimeMs + failureDelayMs) {
log.debug("Triggering sink task failure");
throw new RuntimeException();
}
setTimeout();
}
}

Expand All @@ -68,4 +77,12 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
public void stop() {

}

private void setTimeout() {
// Set a reasonable minimum delay. Since this mock task may not actually consume any data from Kafka, it may only
// see put() calls triggered by wakeups for offset commits. To make sure we aren't tied to the offset commit
// interval, we force a wakeup every 250ms or after the failure delay, whichever is smaller. This is not overly
// aggressive but ensures any scheduled tasks this connector performs are reasonably close to the target time.
context.timeout(Math.min(failureDelayMs, 250));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class MockSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class);

private String mockMode;
private long startTimeMs;
Expand All @@ -46,15 +49,19 @@ public void start(Map<String, String> config) {
this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
if (delayMsString != null)
failureDelayMs = Long.parseLong(delayMsString);

log.debug("Started MockSourceTask at {} with failure scheduled in {} ms", startTimeMs, failureDelayMs);
}
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
if (MockConnector.TASK_FAILURE.equals(mockMode)) {
long now = System.currentTimeMillis();
if (now > startTimeMs + failureDelayMs)
if (now > startTimeMs + failureDelayMs) {
log.debug("Triggering source task failure");
throw new RuntimeException();
}
}
return Collections.emptyList();
}
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def test_restart_failed_task(self, connector_type):
connector.start()

task_id = 0
wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=15,
wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=20,
err_msg="Failed to see task transition to the FAILED state")

self.cc.restart_task(connector.name, task_id)
Expand Down

0 comments on commit d154696

Please sign in to comment.