Skip to content

Commit bfaace4

Browse files
committed
+ initial commit
0 parents  commit bfaace4

File tree

7 files changed

+360
-0
lines changed

7 files changed

+360
-0
lines changed

dynamic-tcp-autoconfigure/pom.xml

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<groupId>org.spring.integration.dynamic.tcp.autoconfigure</groupId>
6+
<artifactId>dynamic-tcp-autoconfigure</artifactId>
7+
<version>0.0.1-SNAPSHOT</version>
8+
<name>dynamic-tcp-autoconfigure</name>
9+
<description>Autoconfigure for dynamic tcp</description>
10+
11+
<properties>
12+
<java.version>1.8</java.version>
13+
<maven.compiler.source>${java.version}</maven.compiler.source>
14+
<maven.compiler.target>${java.version}</maven.compiler.target>
15+
<spring-boot.version>2.3.1.RELEASE</spring-boot.version>
16+
<dynamic-tcp.version>0.0.1-SNAPSHOT</dynamic-tcp.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.springframework.boot</groupId>
22+
<artifactId>spring-boot</artifactId>
23+
<version>${spring-boot.version}</version>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>org.springframework.boot</groupId>
28+
<artifactId>spring-boot-autoconfigure</artifactId>
29+
<version>${spring-boot.version}</version>
30+
</dependency>
31+
32+
<dependency>
33+
<groupId>org.springframework.boot</groupId>
34+
<artifactId>spring-boot-configuration-processor</artifactId>
35+
<version>2.2.1.RELEASE</version>
36+
<optional>true</optional>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.spring.integration.dynamic</groupId>
41+
<artifactId>dynamic-tcp</artifactId>
42+
<version>${dynamic-tcp.version}</version>
43+
<optional>true</optional>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.springframework.boot</groupId>
48+
<artifactId>spring-boot-starter-test</artifactId>
49+
<version>${spring-boot.version}</version>
50+
<scope>test</scope>
51+
</dependency>
52+
</dependencies>
53+
54+
<build>
55+
<plugins>
56+
<plugin>
57+
<groupId>org.apache.maven.plugins</groupId>
58+
<artifactId>maven-compiler-plugin</artifactId>
59+
<version>3.8.1</version>
60+
<configuration>
61+
<source>${java.version}</source>
62+
<target>${java.version}</target>
63+
</configuration>
64+
</plugin>
65+
</plugins>
66+
</build>
67+
68+
69+
70+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.spring.integration.dynamic.tcp.autoconfigure;
2+
3+
import org.spring.integration.dynamic.tcp.spec.DynamicTcpOutboundGatewaySpec;
4+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
5+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
9+
10+
@Configuration
11+
@ConditionalOnClass(DynamicTcpOutboundGatewaySpec.class)
12+
public class DynamicTcpAutoConfiguration {
13+
14+
@Bean
15+
@ConditionalOnMissingBean
16+
public DynamicTcpOutboundGatewaySpec tcpOutboundGatewaySpec(IntegrationFlowContext flowContext){
17+
return new DynamicTcpOutboundGatewaySpec(flowContext);
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.spring.integration.dynamic.tcp.autoconfigure.DynamicTcpAutoConfiguration

dynamic-tcp/pom.xml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<groupId>org.spring.integration.dynamic</groupId>
6+
<artifactId>dynamic-tcp</artifactId>
7+
<version>0.0.1-SNAPSHOT</version>
8+
<name>dynamic-tcp</name>
9+
<description>Dynamic tcp implementation</description>
10+
11+
<properties>
12+
<java.version>1.8</java.version>
13+
<maven.compiler.source>${java.version}</maven.compiler.source>
14+
<maven.compiler.target>${java.version}</maven.compiler.target>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
</properties>
17+
18+
<dependencies>
19+
20+
<dependency>
21+
<groupId>org.springframework.integration</groupId>
22+
<artifactId>spring-integration-core</artifactId>
23+
<version>5.3.1.RELEASE</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.springframework.integration</groupId>
27+
<artifactId>spring-integration-ip</artifactId>
28+
<version>5.2.3.RELEASE</version>
29+
</dependency>
30+
</dependencies>
31+
32+
<build>
33+
<plugins>
34+
<plugin>
35+
<groupId>org.springframework.boot</groupId>
36+
<artifactId>spring-boot-maven-plugin</artifactId>
37+
</plugin>
38+
</plugins>
39+
</build>
40+
41+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package org.spring.integration.dynamic.tcp;
2+
3+
import org.springframework.core.serializer.Serializer;
4+
import org.springframework.expression.EvaluationContext;
5+
import org.springframework.expression.Expression;
6+
import org.springframework.expression.spel.support.StandardEvaluationContext;
7+
import org.springframework.integration.dsl.IntegrationFlow;
8+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
9+
import org.springframework.integration.expression.ValueExpression;
10+
import org.springframework.integration.handler.MessageProcessor;
11+
import org.springframework.integration.ip.dsl.Tcp;
12+
import org.springframework.integration.ip.dsl.TcpClientConnectionFactorySpec;
13+
import org.springframework.integration.ip.dsl.TcpOutboundGatewaySpec;
14+
import org.springframework.integration.ip.tcp.serializer.AbstractByteArraySerializer;
15+
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
16+
import org.springframework.messaging.Message;
17+
import org.springframework.messaging.MessageChannel;
18+
import org.springframework.util.Assert;
19+
20+
import java.util.Map;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.Executors;
23+
24+
public class DynamicTcpOutboundGateway implements MessageProcessor<MessageChannel> {
25+
public static final String DEFAULT_RESPONSE_CHANNEL_NAME = "serverResponseChannel";
26+
private final Map<String, IntegrationFlowContext.IntegrationFlowRegistration> flowCache = new ConcurrentHashMap<>();
27+
private final IntegrationFlowContext flowContext;
28+
private final EvaluationContext evaluationContext = new StandardEvaluationContext();
29+
private String responseChannel = DEFAULT_RESPONSE_CHANNEL_NAME;
30+
private AbstractByteArraySerializer deserializer = new ByteArrayCrLfSerializer();
31+
private Serializer<?> serializer;
32+
private Expression hostExpression;
33+
private Expression portExpression;
34+
private Expression cacheableExpression = new ValueExpression<>(Boolean.FALSE);
35+
36+
public DynamicTcpOutboundGateway(IntegrationFlowContext flowContext) {
37+
this.flowContext = flowContext;
38+
39+
}
40+
41+
private static String getFlowId(String host, Integer port) {
42+
return String.format("flow_%s_%d.chl", host, port);
43+
}
44+
45+
@Override
46+
public MessageChannel processMessage(Message<?> message) {
47+
final String host = getHost(message);
48+
final int port = getPort(message);
49+
final boolean cacheable = isCacheable(message);
50+
final String flowId = getFlowId(host, port) + (cacheable ? "" : System.nanoTime());
51+
52+
final TcpClientConnectionFactorySpec connectionFactorySpec = Tcp.netClient(host, port);
53+
final TcpOutboundGatewaySpec gatewaySpec = getMessageHandlerSpec(connectionFactorySpec);
54+
IntegrationFlowContext.IntegrationFlowRegistration registration;
55+
if (!cacheable) {
56+
registration = createNonCacheableConnectionFlow(flowId, gatewaySpec);
57+
} else {
58+
registration = flowCache.computeIfAbsent(flowId, id -> createConnectionFlow(id, gatewaySpec));
59+
60+
}
61+
return registration.getInputChannel();
62+
}
63+
64+
private IntegrationFlowContext.IntegrationFlowRegistration createConnectionFlow(String flowId, TcpOutboundGatewaySpec gatewaySpec) {
65+
final IntegrationFlow flow = f -> f
66+
.handle(gatewaySpec)
67+
.channel(responseChannel);
68+
return flowContext
69+
.registration(flow)
70+
.id(flowId)
71+
.register();
72+
}
73+
74+
private IntegrationFlowContext.IntegrationFlowRegistration createNonCacheableConnectionFlow(String flowId, TcpOutboundGatewaySpec gatewaySpec) {
75+
final IntegrationFlow flow = f -> f
76+
.handle(gatewaySpec)
77+
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s.subscribe(subFlow -> subFlow.channel(responseChannel)))
78+
.handle(message -> flowContext.remove(flowId));
79+
return flowContext
80+
.registration(flow)
81+
.id(flowId)
82+
.register();
83+
}
84+
85+
private TcpOutboundGatewaySpec getMessageHandlerSpec(TcpClientConnectionFactorySpec connectionFactorySpec) {
86+
final TcpClientConnectionFactorySpec connectionFactory = connectionFactorySpec
87+
.singleUseConnections(true);
88+
if (serializer != null) {
89+
connectionFactory.serializer(serializer);
90+
}
91+
if (deserializer != null) {
92+
connectionFactory.deserializer(deserializer);
93+
}
94+
return Tcp.outboundGateway(connectionFactory);
95+
}
96+
97+
private boolean isCacheable(Message<?> requestMessage) {
98+
if (this.cacheableExpression != null) {
99+
final Boolean value = this.cacheableExpression.getValue(this.evaluationContext, requestMessage, Boolean.class);
100+
if (value != null) {
101+
return value;
102+
}
103+
}
104+
return true;
105+
}
106+
107+
private String getHost(Message<?> requestMessage) {
108+
Assert.state(this.hostExpression != null, "host expression is missing");
109+
final String host = this.hostExpression.getValue(this.evaluationContext, requestMessage, String.class);
110+
Assert.state(host != null, "host is missing");
111+
return host;
112+
}
113+
114+
private int getPort(Message<?> requestMessage) {
115+
Assert.state(this.portExpression != null, "port expression is missing");
116+
final Integer port = this.portExpression.getValue(this.evaluationContext, requestMessage, Integer.class);
117+
Assert.state(port != null, "port is missing");
118+
return port;
119+
}
120+
121+
public void setDeserializer(AbstractByteArraySerializer deserializer) {
122+
this.deserializer = deserializer;
123+
}
124+
125+
public void setResponseChannel(String responseChannel) {
126+
this.responseChannel = responseChannel;
127+
}
128+
129+
public void setHostExpression(Expression hostExpression) {
130+
this.hostExpression = hostExpression;
131+
}
132+
133+
public void setPortExpression(Expression portExpression) {
134+
this.portExpression = portExpression;
135+
}
136+
137+
public void setSerializer(Serializer<?> serializer) {
138+
this.serializer = serializer;
139+
}
140+
141+
public void setCacheableExpression(Expression cacheableExpression) {
142+
this.cacheableExpression = cacheableExpression;
143+
}
144+
145+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.spring.integration.dynamic.tcp.spec;
2+
3+
import org.spring.integration.dynamic.tcp.DynamicTcpOutboundGateway;
4+
import org.springframework.core.serializer.Serializer;
5+
import org.springframework.expression.spel.standard.SpelExpressionParser;
6+
import org.springframework.integration.dsl.MessageProcessorSpec;
7+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
8+
import org.springframework.integration.expression.FunctionExpression;
9+
import org.springframework.integration.ip.tcp.serializer.AbstractByteArraySerializer;
10+
import org.springframework.messaging.Message;
11+
12+
import java.util.function.Function;
13+
14+
public class DynamicTcpOutboundGatewaySpec extends MessageProcessorSpec<DynamicTcpOutboundGatewaySpec> {
15+
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
16+
17+
public DynamicTcpOutboundGatewaySpec(IntegrationFlowContext flowContext) {
18+
this.target = new DynamicTcpOutboundGateway(flowContext);
19+
}
20+
21+
public DynamicTcpOutboundGatewaySpec deserializer(AbstractByteArraySerializer deserializer) {
22+
getRequestFlow().setDeserializer(deserializer);
23+
return _this();
24+
}
25+
26+
public DynamicTcpOutboundGatewaySpec serializer(Serializer<?> serializer) {
27+
getRequestFlow().setSerializer(serializer);
28+
return _this();
29+
}
30+
31+
public DynamicTcpOutboundGatewaySpec responseChannelName(String responseChannelName) {
32+
getRequestFlow().setResponseChannel(responseChannelName);
33+
return _this();
34+
}
35+
36+
public <P> DynamicTcpOutboundGatewaySpec host(Function<Message<P>, ?> hostExpression) {
37+
getRequestFlow().setHostExpression(new FunctionExpression<>(hostExpression));
38+
return _this();
39+
}
40+
41+
public <P> DynamicTcpOutboundGatewaySpec port(Function<Message<P>, ?> portExpression) {
42+
getRequestFlow().setPortExpression(new FunctionExpression<>(portExpression));
43+
return _this();
44+
}
45+
46+
public <P> DynamicTcpOutboundGatewaySpec cacheable(Function<Message<P>, ?> cacheableExpression) {
47+
getRequestFlow().setCacheableExpression(new FunctionExpression<>(cacheableExpression));
48+
return _this();
49+
}
50+
51+
public DynamicTcpOutboundGatewaySpec host(String expression) {
52+
getRequestFlow().setHostExpression(PARSER.parseExpression(expression));
53+
return _this();
54+
}
55+
56+
public DynamicTcpOutboundGatewaySpec port(String expression) {
57+
getRequestFlow().setPortExpression(PARSER.parseExpression(expression));
58+
return _this();
59+
}
60+
61+
public DynamicTcpOutboundGatewaySpec cacheable(String expression) {
62+
getRequestFlow().setCacheableExpression(PARSER.parseExpression(expression));
63+
return _this();
64+
}
65+
66+
private DynamicTcpOutboundGateway getRequestFlow() {
67+
return (DynamicTcpOutboundGateway) this.target;
68+
}
69+
}

pom.xml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<artifactId>dynamic-tcp-starter</artifactId>
6+
<version>0.0.1-SNAPSHOT</version>
7+
<name>spring-boot-dynamic-tcp-starter</name>
8+
<packaging>pom</packaging>
9+
10+
<modules>
11+
<module>dynamic-tcp</module>
12+
<module>dynamic-tcp-autoconfigure</module>
13+
</modules>
14+
15+
</project>

0 commit comments

Comments
 (0)