Skip to content

Commit 3f30ffd

Browse files
committed
fix: allow headers when message body isnt jms
Signed-off-by: Joel Hanson <[email protected]>
1 parent dafd80e commit 3f30ffd

File tree

4 files changed

+537
-53
lines changed

4 files changed

+537
-53
lines changed

CONTRIBUTING.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ To contribute code or documentation, please submit a [pull request](https://gith
1111
A good way to familiarize yourself with the codebase and contribution process is
1212
to look for and tackle low-hanging fruit in the [issue tracker](https://github.com/ibm-messaging/kafka-connect-mq-source/issues).
1313

14+
## Testing with ARM64
15+
16+
For instructions on testing with ARM64, see [IBM MQ 9.3.3.0 container image now available for Apple Silicon](https://community.ibm.com/community/user/blogs/richard-coppen/2023/06/30/ibm-mq-9330-container-image-now-available-for-appl).
17+
1418
## Create issues
1519

1620
If you would like to implement a new feature, please [raise an issue](https://github.com/ibm-messaging/kafka-connect-mq-source/issues)
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/**
2+
* Copyright 2025 IBM Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.ibm.eventstreams.connect.mqsource;
17+
18+
import org.apache.kafka.connect.header.Headers;
19+
import org.apache.kafka.connect.source.SourceRecord;
20+
import org.junit.Test;
21+
22+
import javax.jms.BytesMessage;
23+
import javax.jms.TextMessage;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.junit.Assert.assertArrayEquals;
30+
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.assertNull;
32+
33+
public class BaseRecordBuilderIT extends AbstractJMSContextIT {
34+
@Test
35+
public void testToSourceRecord_DefaultRecordBuilder_AnyMessage_JmsFalse_ByteArrayConverter() throws Exception {
36+
// Test: DefaultRecordBuilder with mq.message.body.jms=false
37+
// Expected: String data output
38+
Map<String, String> connectorProps = getDefaultConnectorProperties();
39+
connectorProps.put("mq.message.body.jms", "false");
40+
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
41+
connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
42+
43+
JMSWorker worker = new JMSWorker();
44+
worker.configure(getPropertiesConfig(connectorProps));
45+
worker.connect();
46+
47+
try {
48+
TextMessage textMessage = getJmsContext().createTextMessage("Text message");
49+
textMessage.setStringProperty("customHeader", "headerValue");
50+
textMessage.setIntProperty("priority", 5);
51+
textMessage.setDoubleProperty("price", 99.99);
52+
53+
Map<String, Long> sourceOffset = new HashMap<>();
54+
sourceOffset.put("sequence-id", 1L);
55+
56+
Map<String, String> sourcePartition = new HashMap<>();
57+
sourcePartition.put("source", "myqmgr/myq");
58+
59+
SourceRecord sourceRecord = worker.toSourceRecord(textMessage, true, sourceOffset, sourcePartition);
60+
61+
assertThat(sourceRecord).isNotNull();
62+
assertThat(sourceRecord.value()).isInstanceOf(String.class);
63+
assertThat(sourceRecord.valueSchema()).isNull();
64+
65+
// Verify data
66+
String value = (String) sourceRecord.value();
67+
assertThat(value).isNotNull();
68+
69+
// Verify JMS properties are copied to Kafka headers
70+
Headers headers = sourceRecord.headers();
71+
assertThat(headers.lastWithName("customHeader").value()).isEqualTo("headerValue");
72+
assertThat(headers.lastWithName("priority").value()).isEqualTo("5");
73+
assertThat(headers.lastWithName("price").value()).isEqualTo("99.99");
74+
} finally {
75+
worker.stop();
76+
}
77+
}
78+
79+
@Test
80+
public void testToSourceRecord_DefaultRecordBuilder_BytesMessage_JmsTrue_ByteArrayConverter() throws Exception {
81+
// Test: DefaultRecordBuilder with JMS BytesMessage, mq.message.body.jms=true
82+
// Expected: Binary data output
83+
Map<String, String> connectorProps = getDefaultConnectorProperties();
84+
connectorProps.put("mq.message.body.jms", "true");
85+
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
86+
connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
87+
88+
JMSWorker worker = new JMSWorker();
89+
worker.configure(getPropertiesConfig(connectorProps));
90+
worker.connect();
91+
92+
try {
93+
BytesMessage bytesMessage = getJmsContext().createBytesMessage();
94+
byte[] testData = "Binary bytes message".getBytes(StandardCharsets.UTF_8);
95+
bytesMessage.writeBytes(testData);
96+
bytesMessage.setStringProperty("messageType", "binary");
97+
bytesMessage.setIntProperty("version", 2);
98+
bytesMessage.setLongProperty("timestamp", 1234567890L);
99+
bytesMessage.reset(); // Reset to make the message readable
100+
101+
Map<String, Long> sourceOffset = new HashMap<>();
102+
sourceOffset.put("sequence-id", 2L);
103+
104+
Map<String, String> sourcePartition = new HashMap<>();
105+
sourcePartition.put("source", "myqmgr/myq");
106+
107+
SourceRecord sourceRecord = worker.toSourceRecord(bytesMessage, true, sourceOffset, sourcePartition);
108+
109+
assertThat(sourceRecord).isNotNull();
110+
assertThat(sourceRecord.value()).isInstanceOf(byte[].class);
111+
assertThat(sourceRecord.valueSchema()).isNull();
112+
113+
// Verify binary data matches
114+
byte[] value = (byte[]) sourceRecord.value();
115+
assertArrayEquals(testData, value);
116+
117+
// Verify JMS properties are copied to Kafka headers
118+
Headers headers = sourceRecord.headers();
119+
assertThat(headers.lastWithName("messageType").value()).isEqualTo("binary");
120+
assertThat(headers.lastWithName("version").value()).isEqualTo("2");
121+
assertThat(headers.lastWithName("timestamp").value()).isEqualTo("1234567890");
122+
} finally {
123+
worker.stop();
124+
}
125+
}
126+
127+
@Test
128+
public void testToSourceRecord_DefaultRecordBuilder_TextMessage_JmsTrue_StringConverter() throws Exception {
129+
// Test: DefaultRecordBuilder with JMS TextMessage, mq.message.body.jms=true and StringConverter
130+
// Expected: String data output
131+
Map<String, String> connectorProps = getDefaultConnectorProperties();
132+
connectorProps.put("mq.message.body.jms", "true");
133+
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
134+
connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
135+
136+
JMSWorker worker = new JMSWorker();
137+
worker.configure(getPropertiesConfig(connectorProps));
138+
worker.connect();
139+
140+
try {
141+
String testText = "This is a text message";
142+
TextMessage textMessage = getJmsContext().createTextMessage(testText);
143+
textMessage.setStringProperty("source", "system-a");
144+
textMessage.setIntProperty("retryCount", 3);
145+
textMessage.setDoubleProperty("threshold", 0.95);
146+
textMessage.setBooleanProperty("enabled", true);
147+
148+
Map<String, Long> sourceOffset = new HashMap<>();
149+
sourceOffset.put("sequence-id", 3L);
150+
151+
Map<String, String> sourcePartition = new HashMap<>();
152+
sourcePartition.put("source", "myqmgr/myq");
153+
154+
SourceRecord sourceRecord = worker.toSourceRecord(textMessage, true, sourceOffset, sourcePartition);
155+
156+
assertThat(sourceRecord).isNotNull();
157+
assertThat(sourceRecord.value()).isInstanceOf(String.class);
158+
assertNull(sourceRecord.valueSchema());
159+
160+
// Verify string data matches
161+
String value = (String) sourceRecord.value();
162+
assertEquals(testText, value);
163+
164+
// Verify JMS properties are copied to Kafka headers
165+
Headers headers = sourceRecord.headers();
166+
assertThat(headers.lastWithName("source").value()).isEqualTo("system-a");
167+
assertThat(headers.lastWithName("retryCount").value()).isEqualTo("3");
168+
assertThat(headers.lastWithName("threshold").value()).isEqualTo("0.95");
169+
assertThat(headers.lastWithName("enabled").value()).isEqualTo("true");
170+
} finally {
171+
worker.stop();
172+
}
173+
}
174+
175+
@Test
176+
public void testToSourceRecord_JsonRecordBuilder_JsonMessage_JsonConverter() throws Exception {
177+
// Test: JsonRecordBuilder with JSON message and JsonConverter
178+
// Expected: JSON output with no schema
179+
Map<String, String> connectorProps = getDefaultConnectorProperties();
180+
connectorProps.put("mq.message.body.jms", "true"); // Not used by JsonRecordBuilder
181+
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
182+
connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
183+
184+
JMSWorker worker = new JMSWorker();
185+
worker.configure(getPropertiesConfig(connectorProps));
186+
worker.connect();
187+
188+
try {
189+
String jsonText = "{ \"id\": 123, \"name\": \"test\", \"active\": true }";
190+
TextMessage textMessage = getJmsContext().createTextMessage(jsonText);
191+
textMessage.setStringProperty("source", "system-a");
192+
textMessage.setIntProperty("retryCount", 3);
193+
textMessage.setDoubleProperty("threshold", 0.95);
194+
textMessage.setBooleanProperty("enabled", true);
195+
196+
Map<String, Long> sourceOffset = new HashMap<>();
197+
sourceOffset.put("sequence-id", 4L);
198+
199+
Map<String, String> sourcePartition = new HashMap<>();
200+
sourcePartition.put("source", "myqmgr/myq");
201+
202+
SourceRecord sourceRecord = worker.toSourceRecord(textMessage, true, sourceOffset, sourcePartition);
203+
204+
assertThat(sourceRecord).isNotNull();
205+
assertThat(sourceRecord.value()).isInstanceOf(Map.class);
206+
assertNull(sourceRecord.valueSchema()); // JSON with no schema
207+
208+
// Verify JSON data
209+
@SuppressWarnings("unchecked")
210+
Map<String, Object> value = (Map<String, Object>) sourceRecord.value();
211+
assertEquals(123L, value.get("id"));
212+
assertEquals("test", value.get("name"));
213+
assertEquals(true, value.get("active"));
214+
215+
// Verify JMS properties are copied to Kafka headers
216+
Headers headers = sourceRecord.headers();
217+
assertThat(headers.lastWithName("source").value()).isEqualTo("system-a");
218+
assertThat(headers.lastWithName("retryCount").value()).isEqualTo("3");
219+
assertThat(headers.lastWithName("threshold").value()).isEqualTo("0.95");
220+
assertThat(headers.lastWithName("enabled").value()).isEqualTo("true");
221+
} finally {
222+
worker.stop();
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)