diff --git a/README.md b/README.md index 5bb208b..2ba0d4a 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,14 @@ Kafka topic to write the messages to. *Type:* List +##### `rabbitmq.queue.topic.mapping` +*Importance:* High + +*Type:* List + +A list containing a mapping between a RabbitMQ queue and a Kafka topic. + This setting is an alternative for the 'rabbitmq.queue' and 'kafka.topic' setting. This allows to use a single connector instance to have a many-to-many mapping, instead of only a many queues to one topic mapping. + When both settings are present. The 'rabbitmq.queue' and 'kafka.topic' will be used. Example of mapping config: 'queue1:topic1,queue2:topic2' rabbitmq.queue ##### `rabbitmq.host` @@ -63,7 +71,8 @@ The username to authenticate to RabbitMQ with. See `ConnectionFactory.setUsernam *Default Value:* / -Converter to compose the Kafka message. +The virtual host to use when connecting to the broker. See `ConnectionFactory.setVirtualHost(java.lang.String) `_ + ##### `message.converter` *Importance:* Medium @@ -71,7 +80,13 @@ Converter to compose the Kafka message. *Default Value:* com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter -The virtual host to use when connecting to the broker. See `ConnectionFactory.setVirtualHost(java.lang.String) `_ +*Other allowed values*: +- com.github.themeetgroup.kafka.connect.rabbitmq.source.data.BytesSourceMessageConverter +- com.github.themeetgroup.kafka.connect.rabbitmq.source.data.StringSourceMessageConverter + +Converter to compose the Kafka message. + + ##### `rabbitmq.port` *Importance:* Medium @@ -256,6 +271,20 @@ exchange to publish the messages on. routing key used for publishing the messages. + +##### `rabbitmq.format` +*Importance:* High + +*Type:* String + +*Default Value:* bytes + +*Other allowed values*: +- json +- avro (non Confluent avro) + +The format type to use when writing data to RabbitMQ + ##### `topics` *Importance:* High diff --git a/bin/create-topic.sh b/bin/create-topics.sh similarity index 81% rename from bin/create-topic.sh rename to bin/create-topics.sh index 56ad312..bed1168 100755 --- a/bin/create-topic.sh +++ b/bin/create-topics.sh @@ -15,4 +15,5 @@ # limitations under the License. # -kafka-topics --create --topic rabbitmq.test --bootstrap-server 127.0.0.1:9092 \ No newline at end of file +kafka-topics --create --topic topic1 --bootstrap-server 127.0.0.1:9092 +kafka-topics --create --topic topic2 --bootstrap-server 127.0.0.1:9092 \ No newline at end of file diff --git a/bin/debug.sh b/bin/debug.sh index 806ef00..e52c42c 100755 --- a/bin/debug.sh +++ b/bin/debug.sh @@ -21,5 +21,5 @@ export KAFKA_DEBUG='y' set -e -mvn clean package +mvn clean package -Dcheckstyle.skip connect-standalone config/connect-avro-docker.properties config/RabbitMQSourceConnector.properties \ No newline at end of file diff --git a/bin/read-topic-with-headers.sh b/bin/read-topic-with-headers.sh index 3abe8cd..fbc7402 100755 --- a/bin/read-topic-with-headers.sh +++ b/bin/read-topic-with-headers.sh @@ -15,7 +15,7 @@ # limitations under the License. # -kafkacat -b localhost:9092 -t rabbitmq.test -C \ +kafkacat -b localhost:9092 -t topic1 -C \ -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T diff --git a/config/RabbitMQSinkConnector.properties b/config/RabbitMQSinkConnector.properties new file mode 100644 index 0000000..598daab --- /dev/null +++ b/config/RabbitMQSinkConnector.properties @@ -0,0 +1,7 @@ +name=rabbitmq-sink +tasks.max=1 +connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnector +rabbitmq.exchange=exchange +rabbitmq.routing.key=routingkey +rabbitmq.format=json +topics=rabbitmq-test \ No newline at end of file diff --git a/config/RabbitMQSourceConnector.properties b/config/RabbitMQSourceConnector.properties new file mode 100644 index 0000000..49a6baa --- /dev/null +++ b/config/RabbitMQSourceConnector.properties @@ -0,0 +1,7 @@ +name=rabbitmq-source +tasks.max=1 +connector.class=com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnector +#rabbitmq.queue=test1,test2 +#kafka.topic=rabbitmq-test +rabbitmq.queue.topic.mapping=test1:topic1,test2:topic2 +message.converter=com.github.themeetgroup.kafka.connect.rabbitmq.source.data.BytesSourceMessageConverter \ No newline at end of file diff --git a/config/connect-avro-docker.properties b/config/connect-avro-docker.properties new file mode 100644 index 0000000..3a49888 --- /dev/null +++ b/config/connect-avro-docker.properties @@ -0,0 +1,6 @@ +bootstrap.servers=localhost:9092 +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=http://localhost:8081 +offset.storage.file.filename=/tmp/connect.offsets +plugin.path=target/kafka-connect-target/usr/share/kafka-connect \ No newline at end of file diff --git a/pom.xml b/pom.xml index d18eca4..1f282e4 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,13 @@ + + + Confluent + https://packages.confluent.io/maven/ + + + jcustenborder @@ -32,13 +39,21 @@ - insidn + insidin Jan Uyttenhove https://github.com/insidin Committer + + jelledv + Jelle De Vleminck + https://github.com/jelledv + + Committer + + @@ -55,32 +70,64 @@ 5.10.0 3.3.0 + 2.6.0 + 6.0.0 + + 2.8.5 + 1.9.2 + + io.confluent + kafka-connect-avro-converter + ${confluent.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + com.rabbitmq amqp-client ${rabbitmq.version} - com.github.jcustenborder.kafka.connect - connect-utils-testing-data - ${connect-utils.version} - - - org.apache.avro - avro - 1.8.2 + org.apache.kafka + connect-json + ${kafka.version} - io.confluent - kafka-avro-serializer - 5.1.0 + org.apache.kafka + connect-runtime + ${kafka.version} + provided + + + org.apache.maven.plugins + maven-assembly-plugin + 3.1.1 + + + jar-with-dependencies + + + + + fat-jar + package + + single + + + + org.apache.maven.plugins maven-javadoc-plugin @@ -118,6 +165,35 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + ${project.basedir}/src/test/resources + + ${project.basedir}/src/test/resources/payment.avsc + + true + private + + + + second + generate-sources + + schema + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + true + + diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java index 21497c3..bc99c2c 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkConnectorConfig.java @@ -24,6 +24,7 @@ import com.github.jcustenborder.kafka.connect.utils.template.StructTemplate; public class RabbitMQSinkConnectorConfig extends CommonRabbitMQConnectorConfig { + static final String KAFKA_TOPIC_TEMPLATE = "kafkaTopicTemplate"; public static final String TOPIC_CONF = "topics"; static final String TOPIC_DOC = "Kafka topic to read the messages from."; @@ -36,14 +37,17 @@ public class RabbitMQSinkConnectorConfig extends CommonRabbitMQConnectorConfig { public static final String ROUTING_KEY_CONF = "rabbitmq.routing.key"; static final String ROUTING_KEY_DOC = "routing key used for publishing the messages."; + public static final String FORMAT_CONF = "rabbitmq.format"; + public static final String FORMAT_CONF_DOC = "The format type to use when writing data to rabbitMQ"; + public static final String FORMAT_CONF_DEFAULT = "bytes"; public static final String HEADER_CONF = "rabbitmq.headers"; public static final String HEADER_CONF_DOC = "Headers to set for outbounf messages. Set with `headername1`:`headervalue1`,`headername2`:`headervalue2`"; - //TODO: include other config variables here public final StructTemplate kafkaTopic; public final String exchange; public final String routingKey; + public final String format; public RabbitMQSinkConnectorConfig(Map settings) { super(config(), settings); @@ -52,16 +56,15 @@ public RabbitMQSinkConnectorConfig(Map settings) { this.kafkaTopic.addTemplate(KAFKA_TOPIC_TEMPLATE, kafkaTopicFormat); this.exchange = this.getString(EXCHANGE_CONF); this.routingKey = this.getString(ROUTING_KEY_CONF); + this.format = this.getString(FORMAT_CONF); } public static ConfigDef config() { return CommonRabbitMQConnectorConfig.config() - .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) - .define(EXCHANGE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, EXCHANGE_DOC) - .define(ROUTING_KEY_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ROUTING_KEY_DOC) - .define(HEADER_CONF, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.LOW, HEADER_CONF_DOC); - - + .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(EXCHANGE_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, EXCHANGE_DOC) + .define(ROUTING_KEY_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ROUTING_KEY_DOC) + .define(FORMAT_CONF, ConfigDef.Type.STRING, FORMAT_CONF_DEFAULT, ConfigDef.Importance.HIGH, FORMAT_CONF_DOC) + .define(HEADER_CONF, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.LOW, HEADER_CONF_DOC); } - } diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java index 56d2d42..0e2a9ad 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/RabbitMQSinkTask.java @@ -17,6 +17,7 @@ package com.github.themeetgroup.kafka.connect.rabbitmq.sink; import com.github.jcustenborder.kafka.connect.utils.VersionUtil; +import com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.RecordFormatter; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -35,12 +36,12 @@ import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.RabbitMQSinkConnectorConfig.HEADER_CONF; public class RabbitMQSinkTask extends SinkTask { - private static final Logger log = LoggerFactory.getLogger(RabbitMQSinkTask.class); - RabbitMQSinkConnectorConfig config; - - Channel channel; - Connection connection; + private static final Logger log = LoggerFactory.getLogger(RabbitMQSinkTask.class); + private RabbitMQSinkConnectorConfig config; + private RecordFormatter recordFormatter; + private Channel channel; + private Connection connection; @Override public String version() { @@ -51,12 +52,9 @@ public String version() { public void put(Collection sinkRecords) { for (SinkRecord record : sinkRecords) { log.trace("current sinkRecord value: " + record.value()); - if (!(record.value() instanceof byte[])) { - throw new ConnectException("the value of the record has an invalid type (must be of type byte[])"); - } try { channel.basicPublish(this.config.exchange, this.config.routingKey, - RabbitMQSinkHeaderParser.parse(config.getString(HEADER_CONF)), (byte[]) record.value()); + RabbitMQSinkHeaderParser.parse(config.getString(HEADER_CONF)), recordFormatter.format(record)); } catch (IOException e) { log.error("There was an error while publishing the outgoing message to RabbitMQ"); throw new RetriableException(e); @@ -67,6 +65,7 @@ public void put(Collection sinkRecords) { @Override public void start(Map settings) { this.config = new RabbitMQSinkConnectorConfig(settings); + this.recordFormatter = RecordFormatter.getInstance(config.format); ConnectionFactory connectionFactory = this.config.connectionFactory(); try { log.info("Opening connection to {}:{}/{} (SSL: {})", this.config.host, this.config.port, this.config.virtualHost, this.config.useSsl); diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java new file mode 100644 index 0000000..e84369e --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatter.java @@ -0,0 +1,91 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import io.confluent.connect.avro.AvroData; +import io.confluent.connect.avro.AvroDataConfig; +import io.confluent.kafka.serializers.NonRecordContainer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class AvroFormatter implements RecordFormatter { + + private final AvroData avroData; + private final EncoderFactory encoderFactory; + + public AvroFormatter() { + Map avroDataConfigMap = new HashMap() { { + put(AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, true); + put(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 10); + } }; + avroData = new AvroData(new AvroDataConfig(avroDataConfigMap)); + encoderFactory = EncoderFactory.get(); + } + + @Override + public byte[] format(SinkRecord sinkRecord) { + Schema avroSchema = avroData.fromConnectSchema(sinkRecord.valueSchema()); + Object o = avroData.fromConnectData(sinkRecord.valueSchema(), sinkRecord.value()); + if (o == null) { + return null; + } + return serialize(o, avroSchema); + } + + private byte[] serialize(Object object, Schema schema) { + Object value = object instanceof NonRecordContainer ? ((NonRecordContainer) object).getValue() : object; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + if (schema.getType() == Schema.Type.BYTES) { + if (value instanceof byte[]) { + out.write(((byte[]) value)); + } else { + if (!(value instanceof ByteBuffer)) { + throw new DataException("Error serializing message to format Avro. Unrecognized bytes object of type: " + value.getClass().getName()); + } + out.write(((ByteBuffer) value).array()); + } + } else { + BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); + DatumWriter writer; + if (value instanceof SpecificRecord) { + writer = new SpecificDatumWriter<>(schema); + } else { + writer = new GenericDatumWriter<>(schema); + } + writer.write(value, encoder); + encoder.flush(); + } + + return out.toByteArray(); + } catch (IOException e) { + throw new DataException("Error serializing message to format Avro", e); + } + } +} diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatter.java new file mode 100644 index 0000000..715c908 --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatter.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.converters.ByteArrayConverter; +import org.apache.kafka.connect.sink.SinkRecord; + +public class BytesRecordFormatter implements RecordFormatter { + + private final ByteArrayConverter converter; + + public BytesRecordFormatter() { + converter = new ByteArrayConverter(); + } + + @Override + public byte[] format(SinkRecord record) { + return converter.fromConnectData( + record.topic(), + record.valueSchema(), + record.value()); + } +} diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatter.java new file mode 100644 index 0000000..0b6b01c --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatter.java @@ -0,0 +1,65 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class JsonRecordFormatter implements RecordFormatter { + + private final ObjectMapper mapper; + private final JsonConverter converter; + + public JsonRecordFormatter() { + this.mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + mapper.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); + mapper.registerModule(new JavaTimeModule()); + converter = new JsonConverter(); + Map converterConfig = new HashMap<>(); + converterConfig.put("schemas.enable", "false"); + converterConfig.put("schemas.cache.size", "10"); + this.converter.configure(converterConfig, false); + } + + @Override + public byte[] format(SinkRecord record) { + try { + Object value = record.value(); + if (value instanceof Struct) { + return converter.fromConnectData( + record.topic(), + record.valueSchema(), + value + ); + } else { + return mapper.writeValueAsBytes(value); + } + } catch (IOException e) { + throw new ConnectException(e); + } + } +} diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java new file mode 100644 index 0000000..189a15d --- /dev/null +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/RecordFormatter.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2017 Kyumars Sheykh Esmaili (kyumarss@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +public interface RecordFormatter { + + byte[] format(SinkRecord sinkRecord); + + static RecordFormatter getInstance(String type) { + if ("bytes".equals(type)) { + return new BytesRecordFormatter(); + } else if ("json".equals(type)) { + return new JsonRecordFormatter(); + } else if ("avro".equals(type)) { + return new AvroFormatter(); + } + throw new ConnectException("The provided format type is not one of 'bytes', 'json' or 'avro', but: " + type); + } +} diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java index f07d0b3..0a504e9 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/ConnectConsumer.java @@ -32,10 +32,14 @@ class ConnectConsumer implements Consumer { private static final Logger log = LoggerFactory.getLogger(ConnectConsumer.class); private final SourceRecordConcurrentLinkedDeque records; private final SourceRecordBuilder sourceRecordBuilder; + private final String queue; - ConnectConsumer(SourceRecordConcurrentLinkedDeque records, RabbitMQSourceConnectorConfig config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { + ConnectConsumer(SourceRecordConcurrentLinkedDeque records, + RabbitMQSourceConnectorConfig config, + String queue) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { this.records = records; this.sourceRecordBuilder = new SourceRecordBuilder(config); + this.queue = queue; } @Override @@ -67,7 +71,7 @@ public void handleRecoverOk(String s) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { log.trace("handleDelivery({})", consumerTag); - SourceRecord sourceRecord = this.sourceRecordBuilder.sourceRecord(consumerTag, envelope, basicProperties, bytes); + SourceRecord sourceRecord = this.sourceRecordBuilder.sourceRecord(queue, consumerTag, envelope, basicProperties, bytes); this.records.add(sourceRecord); } } diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java index 079546c..46b730a 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnector.java @@ -21,14 +21,22 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.util.ConnectorUtils; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_TOPIC_MAPPING_CONF; @Description("Connector is used to read from a RabbitMQ Queue or Topic.") public class RabbitMQSourceConnector extends SourceConnector { private Map settings; + private RabbitMQSourceConnectorConfig config; @Override public String version() { @@ -38,6 +46,7 @@ public String version() { @Override public void start(Map settings) { this.settings = settings; + this.config = new RabbitMQSourceConnectorConfig(settings); } @Override @@ -47,7 +56,22 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { - return TaskConfigs.multiple(this.settings, maxTasks); + String queueToTopicMapping = settings.get(QUEUE_TOPIC_MAPPING_CONF); + if (queueToTopicMapping == null || queueToTopicMapping.isEmpty()) { + return TaskConfigs.multiple(this.settings, maxTasks); + } + + List listQueueToTopicMapping = Arrays.stream(queueToTopicMapping.split(",")).collect(Collectors.toList()); + List> partitionedQueueToTopicMapping = ConnectorUtils.groupPartitions(listQueueToTopicMapping, maxTasks); + + List> connectorConfig = new ArrayList<>(); + for (List partition : partitionedQueueToTopicMapping) { + Map taskConfig = new HashMap<>(settings); + taskConfig.put(QUEUE_TOPIC_MAPPING_CONF, String.join(",", partition)); + connectorConfig.add(taskConfig); + } + + return connectorConfig; } @Override diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java index 3a1a83b..7709c35 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfig.java @@ -17,9 +17,15 @@ import com.github.themeetgroup.kafka.connect.rabbitmq.CommonRabbitMQConnectorConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.errors.ConnectException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toMap; public class RabbitMQSourceConnectorConfig extends CommonRabbitMQConnectorConfig { @@ -42,28 +48,47 @@ public class RabbitMQSourceConnectorConfig extends CommonRabbitMQConnectorConfig public static final String MESSAGE_CONVERTER_CLASSNAME_DOC = "Converter to compose the Kafka message. Optional, defaults to " + "com.github.themeetgroup.kafka.connect.rabbitmq.source.data.MessageConverter"; - public final String kafkaTopic; - public final List queues; + public static final String QUEUE_TOPIC_MAPPING_CONF = "rabbitmq.queue.topic.mapping"; + public static final String QUEUE_TOPIC_MAPPING_DOC = "A list containing a mapping between a RabbitMQ queue and a Kafka topic.\n" + + " This setting is an alternative for the 'rabbitmq.queue' and 'kafka.topic' setting.\n" + + " When both settings are present. The 'rabbitmq.queue' and 'kafka.topic' will be used. Example of mapping config: 'queue1:topic1,queue2:topic2'"; + public final int prefetchCount; public final boolean prefetchGlobal; public final String messageConverter; + public final Map queueToTopicMap; public RabbitMQSourceConnectorConfig(Map settings) { super(config(), settings); - this.kafkaTopic = this.getString(TOPIC_CONF); - this.queues = this.getList(QUEUE_CONF); this.prefetchCount = this.getInt(PREFETCH_COUNT_CONF); this.prefetchGlobal = this.getBoolean(PREFETCH_GLOBAL_CONF); this.messageConverter = this.getString(MESSAGE_CONVERTER_CLASSNAME_CONF); + + String topic = this.getString(TOPIC_CONF); + List queues = this.getList(QUEUE_CONF); + List queueTopicMappingList = this.getList(QUEUE_TOPIC_MAPPING_CONF); + + if (!queues.isEmpty() && !topic.isEmpty()) { + queueToTopicMap = queues.stream() + .collect(Collectors.toMap(x -> x, x -> topic)); + } else if (!queueTopicMappingList.isEmpty()) { + queueToTopicMap = queueTopicMappingList.stream() + .map(x -> x.split(":")) + .collect(toMap(x -> x[0], x -> x[1])); + } else { + throw new ConnectException("No valid queue / topic configuration has been found. Either use the combination of " + + "" + TOPIC_CONF + " and " + QUEUE_CONF + " or use the " + QUEUE_TOPIC_MAPPING_CONF + " setting."); + } } public static ConfigDef config() { return CommonRabbitMQConnectorConfig.config() - .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(TOPIC_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, TOPIC_DOC) .define(PREFETCH_COUNT_CONF, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, PREFETCH_COUNT_DOC) .define(PREFETCH_GLOBAL_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, PREFETCH_GLOBAL_DOC) - .define(QUEUE_CONF, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUEUE_DOC) - .define(MESSAGE_CONVERTER_CLASSNAME_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, MESSAGE_CONVERTER_CLASSNAME_DOC); + .define(QUEUE_CONF, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.HIGH, QUEUE_DOC) + .define(MESSAGE_CONVERTER_CLASSNAME_CONF, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, MESSAGE_CONVERTER_CLASSNAME_DOC) + .define(QUEUE_TOPIC_MAPPING_CONF, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.HIGH, QUEUE_TOPIC_MAPPING_DOC); } } diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java index 5f07045..86a3d45 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceTask.java @@ -20,6 +20,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.source.SourceRecord; @@ -29,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -49,43 +52,25 @@ public String version() { public void start(Map settings) { RabbitMQSourceConnectorConfig config = new RabbitMQSourceConnectorConfig(settings); this.records = new SourceRecordConcurrentLinkedDeque(); - ConnectConsumer consumer; - try { - consumer = new ConnectConsumer(this.records, config); - } catch (Exception e) { - throw new ConnectException(e); - } - ConnectionFactory connectionFactory = config.connectionFactory(); try { + ConnectionFactory connectionFactory = config.connectionFactory(); log.info("Opening connection to {}:{}/{} (SSL: {})", config.host, config.port, config.virtualHost, config.useSsl); this.connection = connectionFactory.newConnection(); - } catch (IOException | TimeoutException e) { - throw new ConnectException(e); - } - try { log.info("Creating Channel"); this.channel = this.connection.createChannel(); - log.info("Declaring queues"); - for (String queue : config.queues) { - this.channel.queueDeclare(queue, true, false, false, null); - } - } catch (IOException e) { - throw new ConnectException(e); - } - for (String queue : config.queues) { - try { - log.info("Starting consumer"); + for (String queue : config.queueToTopicMap.keySet()) { + log.info("Declaring queue {} & starting consumer for queue", queue); + ConnectConsumer consumer = new ConnectConsumer(this.records, config, queue); + this.channel.queueDeclare(queue, true, false, false, null); this.channel.basicConsume(queue, consumer); - log.info("Setting channel.basicQos({}, {});", config.prefetchCount, config.prefetchGlobal); this.channel.basicQos(config.prefetchCount, config.prefetchGlobal); - } catch (IOException ex) { - throw new ConnectException(ex); } + } catch (Exception e) { + throw new ConnectException(e); } - } @Override diff --git a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java index 09b2fe1..bed7ce1 100644 --- a/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java +++ b/src/main/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/data/SourceRecordBuilder.java @@ -22,10 +22,12 @@ import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import java.lang.reflect.InvocationTargetException; +import java.util.Optional; public class SourceRecordBuilder { @@ -41,13 +43,16 @@ public SourceRecordBuilder(RabbitMQSourceConnectorConfig config) throws ClassNot (SourceMessageConverter) (Class.forName(messageConverterClassName).getConstructor().newInstance()); } - public SourceRecord sourceRecord(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { + public SourceRecord sourceRecord(String queue, String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) { Object key = this.messageConverter.key(consumerTag, envelope, basicProperties, bytes); Schema keySchema = this.messageConverter.keySchema(); Object value = this.messageConverter.value(consumerTag, envelope, basicProperties, bytes); Schema valueSchema = this.messageConverter.valueSchema(); Headers headers = this.messageConverter.headers(consumerTag, envelope, basicProperties, bytes); - String topic = this.config.kafkaTopic; + + String topic = Optional + .ofNullable(config.queueToTopicMap.get(queue)) + .orElseThrow(() -> new ConnectException("There was no Kafka topic found for the consumed queue '" + queue + "'")); return new SourceRecord( ImmutableMap.of("routingKey", envelope.getRoutingKey()), diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java new file mode 100644 index 0000000..6db839c --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/AvroFormatterTest.java @@ -0,0 +1,91 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.util.Utf8; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.createSinkRecord; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentSchema; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AvroFormatterTest { + + private final RecordFormatter avroRecordFormatter = new AvroFormatter(); + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private final KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(new MockSchemaRegistryClient()); + private final PlainAvroDeserializer plainAvroDeserializer = new PlainAvroDeserializer<>(Payment.class); + + private Schema schema; + + @BeforeEach + void setUp() throws IOException, URISyntaxException { + URL resource = this.getClass().getClassLoader().getResource("payment.avsc"); + schema = new Schema.Parser().parse(new File(resource.toURI())); + } + + @Test + void givenAStruct_whenFormattingWithAvroRecordFormatter_expectStructToAvro() throws IOException { + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); + SinkRecord sinkRecord = createSinkRecord(paymentSchema(), payment); + + byte[] output = avroRecordFormatter.format(sinkRecord); + + GenericRecord record = toGenericRecord(schema, output); + assertEquals(1, record.get("id")); + assertEquals(true, record.get("isCashPayment")); + assertEquals(new Utf8("testSender"), record.get("sender")); + assertEquals(new GenericData.EnumSymbol(Currency.SCHEMA$, "EURO"), record.get("currency")); + + Payment specificOutput = plainAvroDeserializer.deserialize(null, output); + assertEquals(1, specificOutput.getId()); + assertTrue(specificOutput.getIsCashPayment()); + assertEquals("testSender", specificOutput.getSender().toString()); + assertNull(specificOutput.getComment()); + assertEquals(Currency.EURO, specificOutput.getCurrency()); + } + + // The "avro" formatter is serializing data in NON-confluent avro, meaning the first bytes do not contain the schema id + // see: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format + // If you want confluent avro bytes using the RabbitMQ sink connector + // you can use the "org.apache.kafka.connect.converters.ByteArrayConverter" converter + // after putting confluent avro serialized data on your topic + @Test + void validateExceptionIsThrown_whenTryingToDeserializeOutputWithKafkaAvroDeserializer() { + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); + SinkRecord sinkRecord = createSinkRecord(paymentSchema(), payment); + + byte[] output = avroRecordFormatter.format(sinkRecord); + + assertThrows(SerializationException.class, () -> kafkaAvroDeserializer.deserialize("test", output)); + } + + private GenericRecord toGenericRecord(Schema schema, byte[] avroBytes) throws IOException { + DatumReader reader = new GenericDatumReader<>(schema); + ByteArrayInputStream stream = new ByteArrayInputStream(avroBytes); + BinaryDecoder binaryDecoder = decoderFactory.binaryDecoder(stream, null); + return reader.read(null, binaryDecoder); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java new file mode 100644 index 0000000..d397f83 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/BytesRecordFormatterTest.java @@ -0,0 +1,39 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.createSinkRecord; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class BytesRecordFormatterTest { + + private final RecordFormatter bytesRecordFormatter = new BytesRecordFormatter(); + + @Test + void givenABytesSchemaAndValue_expectCorrectFormat() { + byte[] testString = "test".getBytes(StandardCharsets.UTF_8); + SinkRecord record = createSinkRecord(Schema.BYTES_SCHEMA, testString); + + byte[] output = bytesRecordFormatter.format(record); + + assertEquals(testString, output); + } + + // When using the BytesFormatter, the "org.apache.kafka.connect.converters.ByteArrayConverter" must be used as value converter + // This is also the default behaviour when not specifying a formatter + @Test + void givenAStruct_whenFormattingWithBytesRecordFormatter_expectDataException() { + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); + SinkRecord record = createSinkRecord(TestData.paymentSchema(), payment); + + assertThrows(DataException.class, () -> bytesRecordFormatter.format(record)); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java new file mode 100644 index 0000000..d454a40 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/JsonRecordFormatterTest.java @@ -0,0 +1,85 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.createSinkRecord; +import static com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.TestData.paymentValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +class JsonRecordFormatterTest { + + private final RecordFormatter jsonRecordFormatter = new JsonRecordFormatter(); + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + objectMapper.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false); + objectMapper.registerModule(new JavaTimeModule()); + } + + @Test + void givenAStringSchemaAndValue_whenFormattingWithJsonRecordFormatter_expectQuotedString() { + String value = "test"; + SinkRecord sinkRecord = TestData.createSinkRecord(Schema.STRING_SCHEMA, value); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + assertEquals("\"test\"", new String(output, StandardCharsets.UTF_8)); + } + + @Test + void givenAnIntSchemaAndValue_whenFormattingWithJsonRecordFormatter_expectQuotedInt() { + int value = 44; + SinkRecord sinkRecord = TestData.createSinkRecord(Schema.INT32_SCHEMA, value); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + assertEquals("44", new String(output, StandardCharsets.UTF_8)); + } + + @Test + void givenAStruct_whenFormattingWithJsonRecordFormatter_expectStructToJson() throws IOException { + Struct payment = paymentValue(1, true, Currency.EURO, "testSender"); + SinkRecord sinkRecord = createSinkRecord(TestData.paymentSchema(), payment); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + Map map = objectMapper.readValue(output, Map.class); + assertEquals(5, map.size()); + assertEquals(1, map.get("id")); + assertEquals(true, map.get("isCashPayment")); + assertEquals("testSender", map.get("sender")); + assertNull(map.get("comment")); + assertEquals("EURO", map.get("currency")); + } + + @Test + void givenASchemalessValue_whenFormattingWithJsonRecordFormatter_expectMapToJson() throws IOException { + Map schemalessValue = new HashMap<>(); + schemalessValue.put("id", 1); + schemalessValue.put("sender", "testSender"); + SinkRecord sinkRecord = createSinkRecord(null, schemalessValue); + + byte[] output = jsonRecordFormatter.format(sinkRecord); + + Map map = objectMapper.readValue(output, Map.class); + assertEquals(2, map.size()); + assertEquals(1, map.get("id")); + assertEquals("testSender", map.get("sender")); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/PlainAvroDeserializer.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/PlainAvroDeserializer.java new file mode 100644 index 0000000..86254b6 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/PlainAvroDeserializer.java @@ -0,0 +1,55 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.avro.Schema; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.serialization.Deserializer; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +public class PlainAvroDeserializer implements Deserializer { + + static { + SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + SpecificData.get().addLogicalTypeConversion(new TimeConversions.DateConversion()); + + GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); + GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + GenericData.get().addLogicalTypeConversion(new TimeConversions.DateConversion()); + } + + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private final DatumReader datumReader; + + public PlainAvroDeserializer(Class cls) { + this(SpecificData.get().getSchema(cls)); + } + + public PlainAvroDeserializer(Schema schema) { + datumReader = new SpecificDatumReader<>(schema); + } + + @Override + public T deserialize(String topic, byte[] data) { + try { + if (data == null) { + return null; + } + + try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { + BinaryDecoder binaryDecoder = decoderFactory.binaryDecoder(stream, null); + return datumReader.read(null, binaryDecoder); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java new file mode 100644 index 0000000..472a268 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/sink/format/TestData.java @@ -0,0 +1,48 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.sink.format; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; + +public class TestData { + + public static Schema paymentSchema() { + return SchemaBuilder.struct() + .name("com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.Payment") + .doc("Payment schema used in unit tests") + .field("id", SchemaBuilder.int32().build()) + .field("isCashPayment", SchemaBuilder.bool().build()) + .field("currency", SchemaBuilder.string() + .parameter( + "io.confluent.connect.avro.Enum", + "com.github.themeetgroup.kafka.connect.rabbitmq.sink.format.Currency") + .parameter( + "io.confluent.connect.avro.Enum.EURO", + "EURO") + .parameter( + "io.confluent.connect.avro.Enum.DOLLAR", + "DOLLAR") + .build()) + .field("sender", SchemaBuilder.string().build()) + .field("comment", SchemaBuilder.string().optional().build()) + .build(); + } + + public static Struct paymentValue(int id, boolean isCashPayment, Currency currency, String sender) { + return paymentValue(id, isCashPayment, currency, sender, null); + } + + public static Struct paymentValue(int id, boolean isCashPayment, Currency currency, String sender, String comment) { + return new Struct(paymentSchema()) + .put("id", id) + .put("isCashPayment", isCashPayment) + .put("currency", currency.toString()) + .put("sender", sender) + .put("comment", comment); + } + + public static SinkRecord createSinkRecord(Schema valueSchema, Object value) { + return new SinkRecord("test", 0, null, null, valueSchema, value, 0); + } +} diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfigTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfigTest.java new file mode 100644 index 0000000..8a91da5 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorConfigTest.java @@ -0,0 +1,53 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.source; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_TOPIC_MAPPING_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.TOPIC_CONF; +import static org.junit.jupiter.api.Assertions.*; + +class RabbitMQSourceConnectorConfigTest { + + @Test + void givenNoTopicAndQueueAndQueueToTopicConfig_whenCreatingConfig_expectConnectException() { + Map settings = new HashMap<>(); + + assertThrows(ConnectException.class, () -> new RabbitMQSourceConnectorConfig(settings)); + } + + @Test + void givenTopicAndQueuesCombination_whenCreatingConfig_expectEveryQueueMappedToSameTopic() { + Map settings = new HashMap<>(); + settings.put(TOPIC_CONF, "test_topic"); + settings.put(QUEUE_CONF, "queue1,queue2,queue3"); + + RabbitMQSourceConnectorConfig config = new RabbitMQSourceConnectorConfig(settings); + + Map queueToTopicMap = config.queueToTopicMap; + assertEquals(3, queueToTopicMap.size()); + assertEquals("test_topic", queueToTopicMap.get("queue1")); + assertEquals("test_topic", queueToTopicMap.get("queue2")); + assertEquals("test_topic", queueToTopicMap.get("queue2")); + } + + @Test + void givenOnlyTopicToQueueMapping_whenCreatingConfig_expectTopicCorrectlyMapped() { + Map settings = new HashMap<>(); + settings.put(QUEUE_TOPIC_MAPPING_CONF, "queue1:topic1,queue2:topic2,queue3:topic2"); + + RabbitMQSourceConnectorConfig config = new RabbitMQSourceConnectorConfig(settings); + + Map queueToTopicMap = config.queueToTopicMap; + assertEquals(3, queueToTopicMap.size()); + assertEquals("topic1", queueToTopicMap.get("queue1")); + assertEquals("topic2", queueToTopicMap.get("queue2")); + assertEquals("topic2", queueToTopicMap.get("queue3")); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorTest.java b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorTest.java new file mode 100644 index 0000000..22da381 --- /dev/null +++ b/src/test/java/com/github/themeetgroup/kafka/connect/rabbitmq/source/RabbitMQSourceConnectorTest.java @@ -0,0 +1,50 @@ +package com.github.themeetgroup.kafka.connect.rabbitmq.source; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.QUEUE_TOPIC_MAPPING_CONF; +import static com.github.themeetgroup.kafka.connect.rabbitmq.source.RabbitMQSourceConnectorConfig.TOPIC_CONF; +import static org.junit.jupiter.api.Assertions.*; + +class RabbitMQSourceConnectorTest { + + private RabbitMQSourceConnector connector; + + @BeforeEach + void setUp() { + connector = new RabbitMQSourceConnector(); + } + + @Test + void givenNormalTopicAndQueueConfig_whenCreatingTaskConfig_expectSameConfigForEveryTask() { + Map settings = new HashMap<>(); + settings.put(TOPIC_CONF, "test_topic"); + settings.put(QUEUE_CONF, "queue1,queue2,queue3"); + + connector.start(settings); + List> taskConfigs = connector.taskConfigs(2); + + assertEquals(2, taskConfigs.size()); + assertEquals(taskConfigs.get(0), taskConfigs.get(1)); + } + + @Test + void givenQueueToTopicMappingConfig_whenCreatingTaskConfig_expectConfigSplitUpPerTask() { + Map settings = new HashMap<>(); + settings.put(QUEUE_TOPIC_MAPPING_CONF, "queue1:topic1,queue2:topic2,queue3:topic2"); + + connector.start(settings); + List> taskConfigs = connector.taskConfigs(3); + + assertEquals(3, taskConfigs.size()); + assertEquals("queue1:topic1", taskConfigs.get(0).get(QUEUE_TOPIC_MAPPING_CONF)); + assertEquals("queue2:topic2", taskConfigs.get(1).get(QUEUE_TOPIC_MAPPING_CONF)); + assertEquals("queue3:topic2", taskConfigs.get(2).get(QUEUE_TOPIC_MAPPING_CONF)); + } +} \ No newline at end of file diff --git a/src/test/resources/payment.avsc b/src/test/resources/payment.avsc new file mode 100644 index 0000000..b54c4c6 --- /dev/null +++ b/src/test/resources/payment.avsc @@ -0,0 +1,12 @@ +{ + "type": "record", + "name": "Payment", + "namespace": "com.github.themeetgroup.kafka.connect.rabbitmq.sink.format", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "isCashPayment", "type": "boolean"}, + {"name": "currency", "type": { "type": "enum", "name": "Currency", "symbols" : ["EURO","DOLLAR"]}}, + {"name": "sender", "type": "string"}, + {"name": "comment", "type": ["null", "string"], "default": null} + ] +} \ No newline at end of file