Skip to content
This repository was archived by the owner on Apr 5, 2022. It is now read-only.

Commit 980d2a6

Browse files
Marius Bogoevicigaryrussell
authored andcommitted
XD-3621 Kafka bus: Pass header propagation config
Allow the headers passed by the kafka bus to be configured. Update kafka-bus.xml Polishing Polishing on Kafka tests
1 parent b10e4f0 commit 980d2a6

File tree

7 files changed

+49
-9
lines changed

7 files changed

+49
-9
lines changed

config/servers.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959

6060
# redis:
6161
# headers:
62-
# comman-delimited list of additional (string-valued) header names to transport
62+
# comma-delimited list of additional header names to transport
6363
# default:
6464
# default bus properties, if not specified at the module level
6565
# backOffInitialInterval: 1000
@@ -71,6 +71,8 @@
7171
# brokers: localhost:9092
7272
# zkAddress: localhost:2181
7373
# mode: embeddedHeaders
74+
# headers:
75+
# comma-delimited list of additional header names to transport
7476
# socketBufferSize: 2097152
7577
# offsetStoreTopic: SpringXdOffsets
7678
# offsetStoreSegmentSize: 25000000

spring-xd-dirt/src/main/resources/application.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ xd:
142142
txSize: 1
143143
redis:
144144
headers:
145-
# comman-delimited list of additional (string-valued) header names to transport
145+
# comma-delimited list of additional header names to transport
146146
default:
147147
# default bus properties, if not specified at the module level
148148
backOffInitialInterval: 1000
@@ -154,6 +154,8 @@ xd:
154154
brokers: localhost:9092
155155
zkAddress: localhost:2181
156156
mode: embeddedHeaders
157+
headers:
158+
# comma-delimited list of additional header names to transport
157159
socketBufferSize: 2097152
158160
offsetStoreTopic: SpringXdOffsets
159161
offsetStoreSegmentSize: 25000000

spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaMessageBusTests.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
2121
import static org.hamcrest.core.IsEqual.equalTo;
2222
import static org.junit.Assert.assertArrayEquals;
23+
import static org.junit.Assert.assertEquals;
2324
import static org.junit.Assert.assertNotNull;
2425
import static org.junit.Assert.assertThat;
26+
import static org.junit.Assert.assertTrue;
2527

2628
import java.util.Arrays;
2729
import java.util.Collection;
@@ -32,10 +34,9 @@
3234
import java.util.concurrent.BlockingQueue;
3335
import java.util.concurrent.TimeUnit;
3436

35-
import kafka.api.OffsetRequest;
3637
import org.apache.kafka.clients.producer.KafkaProducer;
3738
import org.apache.kafka.clients.producer.ProducerConfig;
38-
import org.hamcrest.CoreMatchers;
39+
import org.hamcrest.collection.IsMapContaining;
3940
import org.junit.Ignore;
4041
import org.junit.Rule;
4142
import org.junit.Test;
@@ -50,15 +51,19 @@
5051
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
5152
import org.springframework.integration.kafka.listener.MessageListener;
5253
import org.springframework.integration.kafka.support.ProducerConfiguration;
54+
import org.springframework.integration.test.util.TestUtils;
5355
import org.springframework.messaging.Message;
5456
import org.springframework.messaging.MessageHandler;
5557
import org.springframework.xd.dirt.integration.bus.Binding;
5658
import org.springframework.xd.dirt.integration.bus.BusProperties;
5759
import org.springframework.xd.dirt.integration.bus.MessageBus;
5860
import org.springframework.xd.dirt.integration.bus.PartitionCapableBusTests;
61+
import org.springframework.xd.dirt.integration.bus.XdHeaders;
5962
import org.springframework.xd.dirt.integration.kafka.KafkaMessageBus;
6063
import org.springframework.xd.test.kafka.KafkaTestSupport;
6164

65+
import kafka.api.OffsetRequest;
66+
6267

6368
/**
6469
* Integration tests for the {@link KafkaMessageBus}.
@@ -368,6 +373,35 @@ public void testKafkaSpecificProducerPropertiesAreSet() {
368373
messageBus.unbindProducers("foo" + uniqueBindingId);
369374
}
370375

376+
@Test
377+
public void testMoreHeaders() throws Exception {
378+
KafkaTestMessageBus bus =
379+
new KafkaTestMessageBus(kafkaTestSupport, getCodec(), KafkaMessageBus.Mode.embeddedHeaders,
380+
"propagatedHeader");
381+
Collection<String> headers = Arrays.asList(TestUtils.getPropertyValue(bus.getCoreMessageBus(), "headersToMap",
382+
String[].class));
383+
assertTrue(headers.contains("propagatedHeader"));
384+
assertEquals(XdHeaders.STANDARD_HEADERS.length + 1, headers.size());
385+
DirectChannel moduleOutputChannel = new DirectChannel();
386+
QueueChannel moduleInputChannel = new QueueChannel();
387+
long uniqueBindingId = System.currentTimeMillis();
388+
Properties emptyProperties = new Properties();
389+
bus.bindProducer("foo" + uniqueBindingId + ".0", moduleOutputChannel, emptyProperties);
390+
bus.bindConsumer("foo" + uniqueBindingId + ".0", moduleInputChannel, emptyProperties);
391+
Message<?> message = org.springframework.integration.support.MessageBuilder.
392+
withPayload("payload").setHeader("propagatedHeader","propagatedValue").build();
393+
// Let the consumer actually bind to the producer before sending a msg
394+
busBindUnbindLatency();
395+
moduleOutputChannel.send(message);
396+
Message<?> inbound = moduleInputChannel.receive(2000);
397+
assertThat(inbound.getHeaders(), IsMapContaining.hasKey("propagatedHeader"));
398+
assertThat((String)inbound.getHeaders().get("propagatedHeader"), equalTo("propagatedValue"));
399+
bus.unbindProducers("foo" + uniqueBindingId + ".0");
400+
bus.unbindConsumers("foo" + uniqueBindingId + ".0");
401+
bus.cleanup();
402+
}
403+
404+
371405
@Test
372406
@Ignore("Kafka message bus does not support direct binding")
373407
@Override

spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaTestMessageBus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ public KafkaTestMessageBus(KafkaTestSupport kafkaTestSupport) {
4343

4444

4545
public KafkaTestMessageBus(KafkaTestSupport kafkaTestSupport, Codec codec,
46-
KafkaMessageBus.Mode mode) {
46+
KafkaMessageBus.Mode mode, String... headers) {
4747

4848
try {
4949
ZookeeperConnect zookeeperConnect = new ZookeeperConnect();
5050
zookeeperConnect.setZkConnect(kafkaTestSupport.getZkConnectString());
5151
KafkaMessageBus messageBus = new KafkaMessageBus(zookeeperConnect,
5252
kafkaTestSupport.getBrokerAddress(),
53-
kafkaTestSupport.getZkConnectString(), codec);
53+
kafkaTestSupport.getZkConnectString(), codec, headers);
5454
messageBus.setDefaultBatchingEnabled(false);
5555
messageBus.setMode(mode);
5656
messageBus.afterPropertiesSet();

spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/redis/RedisMessageBusTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@
3737
import org.junit.Rule;
3838
import org.junit.Test;
3939

40-
import org.springframework.amqp.utils.test.TestUtils;
4140
import org.springframework.data.redis.connection.RedisConnectionFactory;
4241
import org.springframework.data.redis.core.RedisTemplate;
4342
import org.springframework.data.redis.serializer.StringRedisSerializer;
4443
import org.springframework.expression.Expression;
4544
import org.springframework.integration.channel.DirectChannel;
4645
import org.springframework.integration.endpoint.AbstractEndpoint;
4746
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;
47+
import org.springframework.integration.test.util.TestUtils;
4848
import org.springframework.messaging.support.GenericMessage;
4949
import org.springframework.retry.support.RetryTemplate;
5050
import org.springframework.xd.dirt.integration.bus.Binding;

spring-xd-messagebus-kafka/src/main/resources/META-INF/spring-xd/bus/kafka-bus.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<constructor-arg value="${xd.messagebus.kafka.brokers}"/>
1717
<constructor-arg value="${xd.messagebus.kafka.zkAddress}"/>
1818
<constructor-arg ref="codec"/>
19-
<constructor-arg value="#{new String[0]}"/>
19+
<constructor-arg value="${xd.messagebus.kafka.headers:}" />
2020

2121
<property name="mode" value="${xd.messagebus.kafka.mode}"/>
2222

spring-xd-yarn/site/config/servers.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ xd:
166166

167167
# redis:
168168
# headers:
169-
# comman-delimited list of additional (string-valued) header names to transport
169+
# comma-delimited list of additional header names to transport
170170
# default:
171171
# default bus properties, if not specified at the module level
172172
# backOffInitialInterval: 1000
@@ -179,6 +179,8 @@ xd:
179179
# brokers: localhost:9092
180180
# zkAddress: localhost:2181
181181
# mode: embeddedHeaders
182+
# headers:
183+
# comma-delimited list of additional header names to transport
182184
# socketBufferSize: 2097152
183185
# offsetStoreTopic: SpringXdOffsets
184186
# offsetStoreSegmentSize: 25000000

0 commit comments

Comments
 (0)