From b42633cc6085ff5408c41ce6e80af929b03633d2 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov <8622884+dlg99@users.noreply.github.com> Date: Fri, 10 Mar 2023 01:58:40 -0800 Subject: [PATCH] [fix][io] KCA: Option to use kafka connector's SourceConnector class to create task and task config (#19772) (cherry picked from commit 90b0f0a17579d22d413853ed4941d81debbe0cbe) --- .../connect/AbstractKafkaConnectSource.java | 48 ++++++++++++++--- .../kafka/connect/KafkaConnectSourceTest.java | 54 ++++++++++++------- 2 files changed, 77 insertions(+), 25 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 97390a85fcae4..1d71d41b357c7 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.io.kafka.connect; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -30,7 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; @@ -55,6 +59,7 @@ public abstract class AbstractKafkaConnectSource implements Source { // kafka connect related variables private SourceTaskContext sourceTaskContext; + private SourceConnector connector; @Getter private SourceTask sourceTask; public Converter keyConverter; @@ -71,6 +76,8 @@ public abstract class AbstractKafkaConnectSource implements Source { // number of outstandingRecords that have been polled but not been acked private final AtomicInteger outstandingRecords = new AtomicInteger(0); + public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass"; + @Override public void open(Map config, SourceContext sourceContext) throws Exception { Map stringConfig = new HashMap<>(); @@ -80,12 +87,6 @@ public void open(Map config, SourceContext sourceContext) throws } }); - // get the source class name from config and create source task from reflection - sourceTask = Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)) - .asSubclass(SourceTask.class) - .getDeclaredConstructor() - .newInstance(); - topicNamespace = stringConfig.get(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG); // initialize the key and value converter @@ -129,8 +130,36 @@ public void open(Map config, SourceContext sourceContext) throws sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig); + final Map taskConfig; + if (config.get(CONNECTOR_CLASS) != null) { + String kafkaConnectorFQClassName = config.get(CONNECTOR_CLASS).toString(); + Class clazz = Class.forName(kafkaConnectorFQClassName); + connector = (SourceConnector) clazz.getConstructor().newInstance(); + + Class taskClass = connector.taskClass(); + sourceTask = (SourceTask) taskClass.getConstructor().newInstance(); + + connector.initialize(new PulsarKafkaSinkContext()); + connector.start(stringConfig); + + List> configs = connector.taskConfigs(1); + checkNotNull(configs); + checkArgument(configs.size() == 1); + taskConfig = configs.get(0); + } else { + // for backward compatibility with old configuration + // that use the task directly + + // get the source class name from config and create source task from reflection + sourceTask = Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)) + .asSubclass(SourceTask.class) + .getDeclaredConstructor() + .newInstance(); + taskConfig = stringConfig; + } + sourceTask.initialize(sourceTaskContext); - sourceTask.start(stringConfig); + sourceTask.start(taskConfig); } @Override @@ -178,6 +207,11 @@ public void close() { sourceTask = null; } + if (connector != null) { + connector.stop(); + connector = null; + } + if (offsetStore != null) { offsetStore.stop(); offsetStore = null; diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index 26ecf21aa0b5c..39c7b0b740837 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import java.io.File; import java.io.OutputStream; import java.nio.file.Files; @@ -47,7 +46,6 @@ @Slf4j public class KafkaConnectSourceTest extends ProducerConsumerBase { - private Map config = new HashMap<>(); private String offsetTopicName; // The topic to publish data to, for kafkaSource private String topicName; @@ -62,18 +60,10 @@ protected void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); - config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); - config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset"; - config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName); - this.topicName = "persistent://my-property/my-ns/kafka-connect-source"; - config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName); tempFile = File.createTempFile("some-file-name", null); - config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString()); - config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE)); + tempFile.deleteOnExit(); this.context = mock(SourceContext.class); this.client = PulsarClient.builder() @@ -91,16 +81,44 @@ protected void cleanup() throws Exception { tempFile.delete(); super.internalCleanup(); } - protected void completedFlush(Throwable error, Void result) { - if (error != null) { - log.error("Failed to flush {} offsets to storage: ", this, error); - } else { - log.info("Finished flushing {} offsets to storage", this); - } + + @Test + public void testOpenAndReadConnectorConfig() throws Exception { + Map config = getConfig(); + config.put(AbstractKafkaConnectSource.CONNECTOR_CLASS, + "org.apache.kafka.connect.file.FileStreamSourceConnector"); + + testOpenAndReadTask(config); } @Test - public void testOpenAndRead() throws Exception { + public void testOpenAndReadTaskDirect() throws Exception { + Map config = getConfig(); + + config.put(TaskConfig.TASK_CLASS_CONFIG, + "org.apache.kafka.connect.file.FileStreamSourceTask"); + + testOpenAndReadTask(config); + } + + private Map getConfig() { + Map config = new HashMap<>(); + + config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + "org.apache.kafka.connect.storage.StringConverter"); + config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, + "org.apache.kafka.connect.storage.StringConverter"); + + config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName); + + config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName); + config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString()); + config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, + String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE)); + return config; + } + + private void testOpenAndReadTask(Map config) throws Exception { kafkaConnectSource = new KafkaConnectSource(); kafkaConnectSource.open(config, context);