Skip to content

Commit

Permalink
[fix][io] Kafka Source connector maybe stuck (apache#22511)
Browse files Browse the repository at this point in the history
(cherry picked from commit bbff29d)
(cherry picked from commit a0120d0)
  • Loading branch information
shibd authored and mukesh-ctds committed Apr 16, 2024
1 parent 6963e6b commit 8c5ad45
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -62,6 +63,7 @@ public abstract class KafkaAbstractSource<V> extends KafkaPushSource<V> {
private volatile boolean running = false;
private KafkaSourceConfig kafkaSourceConfig;
private Thread runnerThread;
private long maxPollIntervalMs;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
Expand Down Expand Up @@ -125,6 +127,13 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceConfig.getAutoOffsetReset());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass());
if (props.containsKey(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) {
maxPollIntervalMs = Long.parseLong(props.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG).toString());
} else {
maxPollIntervalMs = Long.parseLong(
ConsumerConfig.configDef().defaultValues().get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)
.toString());
}
try {
consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
} catch (Exception ex) {
Expand Down Expand Up @@ -174,7 +183,9 @@ public void start() {
index++;
}
if (!kafkaSourceConfig.isAutoCommitEnabled()) {
CompletableFuture.allOf(futures).get();
// Wait about 2/3 of the time of maxPollIntervalMs.
// so as to avoid waiting for the timeout to be kicked out of the consumer group.
CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS);
consumer.commitSync();
}
} catch (Exception e) {
Expand Down Expand Up @@ -252,6 +263,21 @@ public void ack() {
completableFuture.complete(null);
}

@Override
public void fail() {
completableFuture.completeExceptionally(
new RuntimeException(
String.format(
"Failed to process record with kafka topic: %s partition: %d offset: %d key: %s",
record.topic(),
record.partition(),
record.offset(),
getKey()
)
)
);
}

@Override
public Schema<V> getSchema() {
return schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
import java.util.Arrays;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
Expand All @@ -46,6 +52,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -218,6 +225,88 @@ public final void throwExceptionByPoll() throws Exception {
source.read();
}

@Test
public final void throwExceptionBySendFail() throws Exception {
KafkaAbstractSource source = new DummySource();

KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
kafkaSourceConfig.setTopic("test-topic");
kafkaSourceConfig.setAutoCommitEnabled(false);
Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
kafkaSourceConfigField.setAccessible(true);
kafkaSourceConfigField.set(source, kafkaSourceConfig);

Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
defaultMaxPollIntervalMsField.setAccessible(true);
defaultMaxPollIntervalMsField.set(source, 300000);

Consumer consumer = mock(Consumer.class);
ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
"t-key", "t-value".getBytes(StandardCharsets.UTF_8));
ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap(
new TopicPartition("topic", 0),
Arrays.asList(consumerRecord)));
Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));

Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
consumerField.setAccessible(true);
consumerField.set(source, consumer);
source.start();

// Mock send message fail
Record record = source.read();
record.fail();

// read again will throw RuntimeException.
try {
source.read();
fail("Should throw exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof RuntimeException);
assertTrue(e.getCause().getMessage().contains("Failed to process record with kafka topic"));
}
}

@Test
public final void throwExceptionBySendTimeOut() throws Exception {
KafkaAbstractSource source = new DummySource();

KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
kafkaSourceConfig.setTopic("test-topic");
kafkaSourceConfig.setAutoCommitEnabled(false);
Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
kafkaSourceConfigField.setAccessible(true);
kafkaSourceConfigField.set(source, kafkaSourceConfig);

Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
defaultMaxPollIntervalMsField.setAccessible(true);
defaultMaxPollIntervalMsField.set(source, 1);

Consumer consumer = mock(Consumer.class);
ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
"t-key", "t-value".getBytes(StandardCharsets.UTF_8));
ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap(
new TopicPartition("topic", 0),
Arrays.asList(consumerRecord)));
Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));

Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
consumerField.setAccessible(true);
consumerField.set(source, consumer);
source.start();

// Mock send message fail, just read do noting.
source.read();

// read again will throw TimeOutException.
try {
source.read();
fail("Should throw exception");
} catch (Exception e) {
assertTrue(e instanceof TimeoutException);
}
}

private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());
Expand Down

0 comments on commit 8c5ad45

Please sign in to comment.