Skip to content

Commit cc78180

Browse files
authored
Connection closing optimization (#697)
1 parent dd9ea01 commit cc78180

File tree

3 files changed

+18
-46
lines changed

3 files changed

+18
-46
lines changed

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static java.nio.charset.StandardCharsets.US_ASCII;
1818
import static java.nio.charset.StandardCharsets.UTF_8;
1919

20-
import com.google.common.collect.Sets;
2120
import io.netty.buffer.ByteBuf;
2221
import io.netty.channel.ChannelHandlerContext;
2322
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -28,7 +27,6 @@
2827
import io.streamnative.pulsar.handlers.amqp.AmqpProtocolHandler;
2928
import java.util.ArrayList;
3029
import java.util.List;
31-
import java.util.Set;
3230
import java.util.concurrent.CompletableFuture;
3331
import java.util.concurrent.atomic.AtomicInteger;
3432
import lombok.Getter;
@@ -232,17 +230,6 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap
232230
}
233231
}
234232
vhost = virtualHostStr;
235-
236-
proxyService.getVhostConnectionMap().compute(vhost, (v, set) -> {
237-
if (set == null) {
238-
Set<ProxyConnection> proxyConnectionSet = Sets.newConcurrentHashSet();
239-
proxyConnectionSet.add(this);
240-
return proxyConnectionSet;
241-
} else {
242-
set.add(this);
243-
return set;
244-
}
245-
});
246233
handleConnect(new AtomicInteger(5));
247234
}
248235

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyHandler.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
8282
channelFuture.addListener(future -> {
8383
if (!future.isSuccess()) {
8484
// Close the connection if the connection attempt has failed.
85-
clientChannel.close();
85+
proxyConnection.close();
8686
}
8787
});
8888
state = State.Init;
@@ -94,8 +94,8 @@ private class ProxyBackendHandler extends ChannelInboundHandlerAdapter implement
9494
ClientMethodProcessor<ClientChannelMethodProcessor>, FutureListener<Void> {
9595

9696
private ChannelHandlerContext cnx;
97-
private AMQMethodBody connectResponseBody;
98-
private AmqpClientDecoder clientDecoder;
97+
private final AMQMethodBody connectResponseBody;
98+
private final AmqpClientDecoder clientDecoder;
9999

100100
ProxyBackendHandler(AMQMethodBody responseBody) {
101101
this.connectResponseBody = responseBody;
@@ -107,11 +107,11 @@ public void operationComplete(Future future) throws Exception {
107107
// This is invoked when the write operation on the paired connection
108108
// is completed
109109
if (future.isSuccess()) {
110-
brokerChannel.read();
110+
cnx.channel().read();
111111
} else {
112112
log.warn("[{}] [{}] Failed to write on proxy connection. Closing both connections.", clientChannel,
113-
brokerChannel, future.cause());
114-
clientChannel.close();
113+
cnx.channel(), future.cause());
114+
proxyConnection.close();
115115
}
116116

117117
}
@@ -123,8 +123,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
123123
super.channelActive(ctx);
124124
for (Object msg : connectMsgList) {
125125
((ByteBuf) msg).retain();
126-
brokerChannel.writeAndFlush(msg).addListener(future -> {
127-
brokerChannel.read();
126+
ctx.channel().writeAndFlush(msg).addListener(future -> {
127+
ctx.channel().read();
128128
});
129129
}
130130
}
@@ -166,6 +166,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
166166
cause.printStackTrace();
167167
log.error("[" + vhost + "] ProxyBackendHandler [exceptionCaught] - msg: " + cause.getMessage(), cause);
168168
state = State.Failed;
169+
proxyConnection.close();
169170
}
170171

171172
@Override

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyService.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@
1616
import static com.google.common.base.Preconditions.checkArgument;
1717
import static com.google.common.base.Preconditions.checkNotNull;
1818

19-
import com.google.common.collect.Maps;
2019
import io.netty.bootstrap.ServerBootstrap;
2120
import io.netty.channel.Channel;
2221
import io.netty.channel.EventLoopGroup;
2322
import io.netty.util.concurrent.DefaultThreadFactory;
2423
import java.io.Closeable;
2524
import java.io.IOException;
26-
import java.util.Map;
27-
import java.util.Set;
2825
import lombok.Getter;
2926
import lombok.extern.slf4j.Slf4j;
3027
import org.apache.pulsar.broker.PulsarService;
@@ -37,23 +34,18 @@
3734
public class ProxyService implements Closeable {
3835

3936
@Getter
40-
private ProxyConfiguration proxyConfig;
37+
private final ProxyConfiguration proxyConfig;
4138
@Getter
42-
private PulsarService pulsarService;
39+
private final PulsarService pulsarService;
4340
@Getter
4441
private LookupHandler lookupHandler;
4542

4643
private Channel listenChannel;
47-
private EventLoopGroup acceptorGroup;
44+
private final EventLoopGroup acceptorGroup;
4845
@Getter
49-
private EventLoopGroup workerGroup;
46+
private final EventLoopGroup workerGroup;
5047

51-
private DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("amqp-redirect-acceptor");
52-
private DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("amqp-redirect-io");
53-
private static final int numThreads = Runtime.getRuntime().availableProcessors();
54-
55-
@Getter
56-
private static final Map<String, Set<ProxyConnection>> vhostConnectionMap = Maps.newConcurrentMap();
48+
private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
5749

5850
private String tenant;
5951

@@ -63,8 +55,10 @@ public ProxyService(ProxyConfiguration proxyConfig, PulsarService pulsarService)
6355
this.proxyConfig = proxyConfig;
6456
this.pulsarService = pulsarService;
6557
this.tenant = this.proxyConfig.getAmqpTenant();
66-
acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false, acceptorThreadFactory);
67-
workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, workerThreadFactory);
58+
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false,
59+
new DefaultThreadFactory("amqp-redirect-acceptor"));
60+
this.workerGroup = EventLoopUtil.newEventLoopGroup(NUM_THREADS, false,
61+
new DefaultThreadFactory("amqp-redirect-io"));
6862
}
6963

7064
private void configValid(ProxyConfiguration proxyConfig) {
@@ -89,16 +83,6 @@ public void start() throws Exception {
8983
this.lookupHandler = new PulsarServiceLookupHandler(proxyConfig, pulsarService);
9084
}
9185

92-
private void releaseConnection(String namespaceName) {
93-
log.info("release connection");
94-
if (vhostConnectionMap.containsKey(namespaceName)) {
95-
Set<ProxyConnection> proxyConnectionSet = vhostConnectionMap.get(namespaceName);
96-
for (ProxyConnection proxyConnection : proxyConnectionSet) {
97-
proxyConnection.close();
98-
}
99-
}
100-
}
101-
10286
@Override
10387
public void close() throws IOException {
10488
if (lookupHandler != null) {

0 commit comments

Comments
 (0)