Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ To contribute code or documentation, please submit a [pull request](https://gith
A good way to familiarize yourself with the codebase and contribution process is
to look for and tackle low-hanging fruit in the [issue tracker](https://github.com/ibm-messaging/kafka-connect-mq-source/issues).

## Testing with ARM64

For instructions on testing with ARM64, see [IBM MQ 9.3.3.0 container image now available for Apple Silicon](https://community.ibm.com/community/user/blogs/richard-coppen/2023/06/30/ibm-mq-9330-container-image-now-available-for-appl).

## Create issues

If you would like to implement a new feature, please [raise an issue](https://github.com/ibm-messaging/kafka-connect-mq-source/issues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,34 +1067,21 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0");

// Expected DLQ Headers
// ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic,
// value=mytopic, schema=Schema{STRING}),
// ConnectHeader(key=__connect.errors.class.name,
// value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}),
// ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[]
// to Kafka Connect data failed due to serialization error: ,
// schema=Schema{STRING}), ConnectHeader(key=__connect.errors.timestamp,
// value=1749036171558, schema=Schema{STRING}),
// ConnectHeader(key=__connect.errors.cause.message,
// value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token
// 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null',
// 'true' or 'false')
// at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION`
// disabled); line: 1, column: 9], schema=Schema{STRING}),
// ConnectHeader(key=__connect.errors.cause.class,
// value=org.apache.kafka.common.errors.SerializationException,
// schema=Schema{STRING}),
// ConnectHeader(key=__connect.errors.exception.stacktrace,
// value=org.apache.kafka.connect.errors.DataException: Converting byte[] to
// Kafka Connect data failed due to serialization error:
// at
// org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
// at
// com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81)
// at
// com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238)
// at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork...
// [truncated], schema=Schema{STRING})])
/**
* ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic, value=mytopic, schema=Schema{STRING}),
* ConnectHeader(key=__connect.errors.class.name, value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}),
* ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[] to Kafka Connect data failed due to serialization error: , schema=Schema{STRING}),
* ConnectHeader(key=__connect.errors.timestamp, value=1749036171558, schema=Schema{STRING}),
* ConnectHeader(key=__connect.errors.cause.message, value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
* at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 9], schema=Schema{STRING}),
* ConnectHeader(key=__connect.errors.cause.class, value=org.apache.kafka.common.errors.SerializationException, schema=Schema{STRING}),
* ConnectHeader(key=__connect.errors.exception.stacktrace, value=org.apache.kafka.connect.errors.DataException:
* Converting byte[] to Kafka Connect data failed due to serialization error:
* at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
* at com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81)
* at com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238)
* at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork... [truncated], schema=Schema{STRING})])
*/
assertThat(headers.lastWithName("__connect.errors.topic").value())
.isEqualTo("mytopic");
assertThat(headers.lastWithName("__connect.errors.exception.class.name").value())
Expand Down Expand Up @@ -1497,4 +1484,247 @@ public void testPollWithShortMaxPollTime() throws Exception {

assertThat(records.size() < 100);
}

@Test
public void shouldSetJmsPropertiesIFJMSIsDisabled() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

final TextMessage message = getJmsContext().createTextMessage("message");
message.setStringProperty("teststring", "myvalue");
message.setIntProperty("volume", 11);
message.setDoubleProperty("decimalmeaning", 42.0);

// Both invalid and valid messages are received
final List<Message> testMessages = Arrays.asList(
message
);
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);

final List<SourceRecord> processedRecords = connectTask.poll();
assertThat(processedRecords).hasSize(1);
assertThat(processedRecords.get(0).topic()).isEqualTo("mytopic");

final Headers headers = processedRecords.get(0).headers();

// Actual headers
assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue");
assertThat(headers.lastWithName("volume").value()).isEqualTo("11");
assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0");
}

@Test
public void shouldSetJmsPropertiesWithDefaultRecordBuilderWhenJMSIsEnabled() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

final TextMessage message = getJmsContext().createTextMessage("test message content");
message.setStringProperty("customHeader", "headerValue");
message.setIntProperty("priority", 5);
message.setDoubleProperty("price", 99.99);
message.setBooleanProperty("isActive", true);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));

final List<SourceRecord> processedRecords = connectTask.poll();
assertThat(processedRecords).hasSize(1);
assertThat(processedRecords.get(0).topic()).isEqualTo("mytopic");
assertThat(processedRecords.get(0).value()).isEqualTo("test message content");

final Headers headers = processedRecords.get(0).headers();

// Verify JMS properties are copied to Kafka headers
assertThat(headers.lastWithName("customHeader").value()).isEqualTo("headerValue");
assertThat(headers.lastWithName("priority").value()).isEqualTo("5");
assertThat(headers.lastWithName("price").value()).isEqualTo("99.99");
assertThat(headers.lastWithName("isActive").value()).isEqualTo("true");
}

@Test
public void shouldNotSetJmsPropertiesWithDefaultRecordBuilderWhenCopyIsDisabled() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "false");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

final TextMessage message = getJmsContext().createTextMessage("test message");
message.setStringProperty("shouldNotAppear", "value");
message.setIntProperty("count", 42);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));

final List<SourceRecord> processedRecords = connectTask.poll();
assertThat(processedRecords).hasSize(1);
assertThat(processedRecords.get(0).value()).isEqualTo("test message");

final Headers headers = processedRecords.get(0).headers();

// Verify no JMS properties are copied when disabled
assertThat(headers.lastWithName("shouldNotAppear")).isNull();
assertThat(headers.lastWithName("count")).isNull();
}

@Test
public void shouldSetJmsPropertiesWithJsonRecordBuilderWhenJMSIsEnabled() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");

connectTask.start(connectorConfigProps);

final TextMessage message = getJmsContext().createTextMessage("{ \"id\": 123, \"name\": \"test\" }");
message.setStringProperty("correlationId", "corr-123");
message.setIntProperty("retryCount", 3);
message.setDoubleProperty("amount", 150.75);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));

final List<SourceRecord> processedRecords = connectTask.poll();
assertThat(processedRecords).hasSize(1);
assertThat(processedRecords.get(0).topic()).isEqualTo("mytopic");

// Verify JSON content is parsed correctly
final Map<?, ?> value = (Map<?, ?>) processedRecords.get(0).value();
assertThat(value.get("id")).isEqualTo(123L);
assertThat(value.get("name")).isEqualTo("test");

final Headers headers = processedRecords.get(0).headers();

// Verify JMS properties are copied to Kafka headers
assertThat(headers.lastWithName("correlationId").value()).isEqualTo("corr-123");
assertThat(headers.lastWithName("retryCount").value()).isEqualTo("3");
assertThat(headers.lastWithName("amount").value()).isEqualTo("150.75");
}

@Test
public void shouldNotSetJmsPropertiesWithJsonRecordBuilderWhenCopyIsDisabled() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "false");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");

connectTask.start(connectorConfigProps);

final TextMessage message = getJmsContext().createTextMessage("{ \"data\": \"value\" }");
message.setStringProperty("hiddenProperty", "shouldNotAppear");
message.setIntProperty("hiddenCount", 99);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));

final List<SourceRecord> processedRecords = connectTask.poll();
assertThat(processedRecords).hasSize(1);

final Headers headers = processedRecords.get(0).headers();

// Verify no JMS properties are copied when disabled
assertThat(headers.lastWithName("hiddenProperty")).isNull();
assertThat(headers.lastWithName("hiddenCount")).isNull();
}

@Test
public void shouldHandleMultipleJmsPropertiesWithDifferentTypesDefaultBuilder() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

connectTask.start(connectorConfigProps);

final TextMessage message = getJmsContext().createTextMessage("multi-property message");
message.setStringProperty("stringProp", "text");
message.setIntProperty("intProp", 100);
message.setLongProperty("longProp", 999999999L);
message.setFloatProperty("floatProp", 3.14f);
message.setDoubleProperty("doubleProp", 2.71828);
message.setBooleanProperty("boolProp", false);
message.setByteProperty("byteProp", (byte) 127);
message.setShortProperty("shortProp", (short) 32000);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));

final List<SourceRecord> processedRecords = connectTask.poll();
assertThat(processedRecords).hasSize(1);

final Headers headers = processedRecords.get(0).headers();

// Verify all property types are correctly converted to string headers
assertThat(headers.lastWithName("stringProp").value()).isEqualTo("text");
assertThat(headers.lastWithName("intProp").value()).isEqualTo("100");
assertThat(headers.lastWithName("longProp").value()).isEqualTo("999999999");
assertThat(headers.lastWithName("floatProp").value()).isEqualTo("3.14");
assertThat(headers.lastWithName("doubleProp").value()).isEqualTo("2.71828");
assertThat(headers.lastWithName("boolProp").value()).isEqualTo("false");
assertThat(headers.lastWithName("byteProp").value()).isEqualTo("127");
assertThat(headers.lastWithName("shortProp").value()).isEqualTo("32000");
}

@Test
public void shouldHandleMultipleJmsPropertiesWithDifferentTypesJsonBuilder() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");

connectTask.start(connectorConfigProps);

final TextMessage message = getJmsContext().createTextMessage("{ \"type\": \"multi-prop\" }");
message.setStringProperty("env", "production");
message.setIntProperty("maxRetries", 5);
message.setLongProperty("createdAt", 1609459200000L);
message.setDoubleProperty("threshold", 0.95);
message.setBooleanProperty("enabled", true);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));

final List<SourceRecord> processedRecords = connectTask.poll();
assertThat(processedRecords).hasSize(1);

final Headers headers = processedRecords.get(0).headers();

// Verify all property types are correctly converted
assertThat(headers.lastWithName("env").value()).isEqualTo("production");
assertThat(headers.lastWithName("maxRetries").value()).isEqualTo("5");
assertThat(headers.lastWithName("createdAt").value()).isEqualTo("1609459200000");
assertThat(headers.lastWithName("threshold").value()).isEqualTo("0.95");
assertThat(headers.lastWithName("enabled").value()).isEqualTo("true");
}
}
Loading