Skip to content

Commit 2b71a65

Browse files
committedJul 4, 2020
+ tests
1 parent 89d183b commit 2b71a65

File tree

1 file changed

+267
-0
lines changed

1 file changed

+267
-0
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package org.spring.integration.dynamic.tcp;
2+
3+
4+
import org.junit.jupiter.api.BeforeEach;
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.ExtendWith;
7+
import org.spring.integration.dynamic.tcp.spec.DynamicTcpOutboundGatewaySpec;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
import org.springframework.integration.annotation.IntegrationComponentScan;
12+
import org.springframework.integration.annotation.MessagingGateway;
13+
import org.springframework.integration.annotation.ServiceActivator;
14+
import org.springframework.integration.channel.QueueChannel;
15+
import org.springframework.integration.config.EnableIntegration;
16+
import org.springframework.integration.dsl.*;
17+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
18+
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
19+
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
20+
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
21+
import org.springframework.integration.scheduling.PollerMetadata;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.messaging.MessageChannel;
24+
import org.springframework.messaging.handler.annotation.Header;
25+
import org.springframework.messaging.handler.annotation.Payload;
26+
import org.springframework.messaging.support.MessageBuilder;
27+
import org.springframework.test.context.junit.jupiter.SpringExtension;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
32+
@ExtendWith(SpringExtension.class)
33+
public class DynamicTcpOutboundGatewayTest {
34+
public static final String COMMAND = "test";
35+
public static final String FLOW_ID_ONE = DynamicTcpOutboundGateway.getFlowId("127.0.0.1", 5678);
36+
public static final String FLOW_ID_TWO = DynamicTcpOutboundGateway.getFlowId("127.0.0.1", 5679);
37+
public static final String FLOW_ID_THREE = DynamicTcpOutboundGateway.getFlowId("127.0.0.1", 5677);
38+
@Autowired
39+
private IntegrationFlowContext flowContext;
40+
@Autowired
41+
private SendEchoGateway echoGateway;
42+
private DynamicTcpOutboundGatewaySpec gatewaySpec;
43+
44+
@BeforeEach
45+
public void setUp() {
46+
gatewaySpec = new DynamicTcpOutboundGatewaySpec(flowContext);
47+
}
48+
49+
@Test
50+
public void testCacheable() {
51+
final IntegrationFlow flow = IntegrationFlows.from("sendChannel")
52+
.route(gatewaySpec
53+
.host("headers['host']").port("headers['port']")
54+
.cacheable("headers['port']!=5679")
55+
.singleConnection("headers['port']!=5677")
56+
.remoteTimeout(10_000L)
57+
.responseChannelName("serverResponseChannel"))
58+
.get();
59+
flowContext.registration(flow).register();
60+
61+
final String requestReplyOne = echoGateway.send(COMMAND, "127.0.0.1", 5678);
62+
final String requestReplyOneDup = echoGateway.send(COMMAND, "127.0.0.1", 5678);
63+
final String requestReplyTwo = echoGateway.send(COMMAND, "127.0.0.1", 5679);
64+
final String requestReplyThree = echoGateway.send(COMMAND, "127.0.0.1", 5677);
65+
final String requestReplyThreeDup = echoGateway.send(COMMAND, "127.0.0.1", 5677);
66+
final String requestReplyThreeDupDup = echoGateway.send(COMMAND, "127.0.0.1", 5677);
67+
68+
final IntegrationFlowContext.IntegrationFlowRegistration registrationOne = flowContext.getRegistrationById(FLOW_ID_ONE);
69+
final IntegrationFlowContext.IntegrationFlowRegistration registrationTwo = flowContext.getRegistrationById(FLOW_ID_TWO);
70+
final IntegrationFlowContext.IntegrationFlowRegistration registrationThree = flowContext.getRegistrationById(FLOW_ID_THREE);
71+
72+
assertThat(requestReplyOne)
73+
.isEqualTo(String.format("%s: answer", COMMAND))
74+
.isEqualTo(requestReplyOneDup);
75+
assertThat(requestReplyTwo).isEqualTo(String.format("%s: answer", COMMAND));
76+
assertThat(requestReplyThree)
77+
.isEqualTo(String.format("%s: answer", COMMAND))
78+
.isEqualTo(requestReplyThreeDup)
79+
.isEqualTo(requestReplyThreeDupDup);
80+
assertThat(registrationOne).isNotNull();
81+
assertThat(registrationThree).isNotNull();
82+
assertThat(registrationTwo).isNull();
83+
}
84+
85+
@MessagingGateway(defaultRequestChannel = "sendChannel", defaultReplyChannel = "receiveChannel")
86+
private interface SendEchoGateway {
87+
String send(@Payload String command, @Header(name = "host") String host, @Header(name = "port") int port);
88+
}
89+
90+
@Configuration
91+
@EnableIntegration
92+
@IntegrationComponentScan
93+
public static class ClientContextConfiguration {
94+
95+
@Bean
96+
public IntegrationFlow serverResponseFlow() {
97+
return IntegrationFlows.from("serverResponseChannel")
98+
.log()
99+
.transform(Transformers.objectToString())
100+
.channel("receiveChannel")
101+
.get();
102+
}
103+
104+
/// Client config
105+
@Bean
106+
public QueueChannel sendChannel() {
107+
return new QueueChannel();
108+
}
109+
110+
@Bean(name = PollerMetadata.DEFAULT_POLLER)
111+
public PollerMetadata poller() {
112+
return Pollers.fixedRate(500).get();
113+
}
114+
115+
@Bean
116+
public MessageChannel receiveChannel() {
117+
return MessageChannels.publishSubscribe().get();
118+
}
119+
120+
@Bean
121+
public MessageChannel serverResponseChannel() {
122+
return MessageChannels.publishSubscribe().get();
123+
}
124+
}
125+
126+
@Configuration
127+
@EnableIntegration
128+
@IntegrationComponentScan
129+
public static class ServerContextConfigurationOne {
130+
131+
// Server config
132+
@Bean
133+
public TcpNetServerConnectionFactory serverConnectionOne() {
134+
return new TcpNetServerConnectionFactory(5678);
135+
}
136+
137+
@Bean
138+
public TcpReceivingChannelAdapter serverReceiveOne(TcpNetServerConnectionFactory serverConnectionOne) {
139+
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
140+
adapter.setConnectionFactory(serverConnectionOne);
141+
adapter.setOutputChannel(serverInputChannelOne());
142+
return adapter;
143+
}
144+
145+
@ServiceActivator(inputChannel = "serverOutChannelOne")
146+
@Bean
147+
public TcpSendingMessageHandler serverSendOne(TcpNetServerConnectionFactory serverConnectionOne) {
148+
final TcpSendingMessageHandler adapter = new TcpSendingMessageHandler();
149+
adapter.setConnectionFactory(serverConnectionOne);
150+
return adapter;
151+
}
152+
153+
@Bean
154+
public MessageChannel serverInputChannelOne() {
155+
return MessageChannels.queue().get();
156+
}
157+
158+
@Bean
159+
public QueueChannel serverOutChannelOne() {
160+
return new QueueChannel();
161+
}
162+
163+
@ServiceActivator(inputChannel = "serverInputChannelOne", outputChannel = "serverOutChannelOne")
164+
public Message<String> processMessage(Message<String> message) {
165+
return MessageBuilder
166+
.withPayload(message.getPayload() + ": answer")
167+
.copyHeaders(message.getHeaders())
168+
.build();
169+
}
170+
171+
}
172+
173+
@Configuration
174+
@EnableIntegration
175+
@IntegrationComponentScan
176+
public static class ServerContextConfigurationThree {
177+
178+
// Server config
179+
@Bean
180+
public TcpNetServerConnectionFactory serverConnectionThree() {
181+
return new TcpNetServerConnectionFactory(5677);
182+
}
183+
184+
@Bean
185+
public TcpReceivingChannelAdapter serverReceiveThree(TcpNetServerConnectionFactory serverConnectionThree) {
186+
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
187+
adapter.setConnectionFactory(serverConnectionThree);
188+
adapter.setOutputChannel(serverInputChannelThree());
189+
return adapter;
190+
}
191+
192+
@ServiceActivator(inputChannel = "serverOutChannelThree")
193+
@Bean
194+
public TcpSendingMessageHandler serverSendThree(TcpNetServerConnectionFactory serverConnectionThree) {
195+
final TcpSendingMessageHandler adapter = new TcpSendingMessageHandler();
196+
adapter.setConnectionFactory(serverConnectionThree);
197+
return adapter;
198+
}
199+
200+
@Bean
201+
public MessageChannel serverInputChannelThree() {
202+
return MessageChannels.queue().get();
203+
}
204+
205+
@Bean
206+
public QueueChannel serverOutChannelThree() {
207+
return new QueueChannel();
208+
}
209+
210+
@ServiceActivator(inputChannel = "serverInputChannelThree", outputChannel = "serverOutChannelThree")
211+
public Message<String> processMessage(Message<String> message) {
212+
return MessageBuilder
213+
.withPayload(message.getPayload() + ": answer")
214+
.copyHeaders(message.getHeaders())
215+
.build();
216+
}
217+
218+
}
219+
220+
@Configuration
221+
@EnableIntegration
222+
@IntegrationComponentScan
223+
public static class ServerContextConfigurationTwo {
224+
225+
// Server config
226+
@Bean
227+
public TcpNetServerConnectionFactory serverConnectionTwo() {
228+
return new TcpNetServerConnectionFactory(5679);
229+
}
230+
231+
@Bean
232+
public TcpReceivingChannelAdapter serverReceiveTwo(TcpNetServerConnectionFactory serverConnectionTwo) {
233+
final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
234+
adapter.setConnectionFactory(serverConnectionTwo);
235+
adapter.setOutputChannel(serverInputChannelTwo());
236+
return adapter;
237+
}
238+
239+
@ServiceActivator(inputChannel = "serverOutChannelTwo")
240+
@Bean
241+
public TcpSendingMessageHandler serverSendTwo(TcpNetServerConnectionFactory serverConnectionTwo) {
242+
final TcpSendingMessageHandler adapter = new TcpSendingMessageHandler();
243+
adapter.setConnectionFactory(serverConnectionTwo);
244+
return adapter;
245+
}
246+
247+
@Bean
248+
public MessageChannel serverInputChannelTwo() {
249+
return MessageChannels.queue().get();
250+
}
251+
252+
@Bean
253+
public QueueChannel serverOutChannelTwo() {
254+
return new QueueChannel();
255+
}
256+
257+
@ServiceActivator(inputChannel = "serverInputChannelTwo", outputChannel = "serverOutChannelTwo")
258+
public Message<String> processMessage(Message<String> message) {
259+
return MessageBuilder
260+
.withPayload(message.getPayload() + ": answer")
261+
.copyHeaders(message.getHeaders())
262+
.build();
263+
}
264+
265+
}
266+
}
267+

0 commit comments

Comments
 (0)
Please sign in to comment.