Skip to content

Commit ea6c66b

Browse files
committed
extend API of network module, improve handling invalid received packets
1 parent 51f9518 commit ea6c66b

File tree

8 files changed

+224
-49
lines changed

8 files changed

+224
-49
lines changed

rlib-collections/src/test/java/javasabr/rlib/collections/array/IntArrayTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static org.assertj.core.api.Assertions.assertThat;
44

5-
import java.util.List;
65
import java.util.stream.Stream;
76
import org.junit.jupiter.api.Assertions;
87
import org.junit.jupiter.api.Test;

rlib-network/src/main/java/javasabr/rlib/network/Network.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public interface Network<C extends Connection<C>> {
1313

1414
NetworkConfig config();
1515

16+
void inNetworkThread(Runnable task);
17+
1618
/**
1719
* Shutdown this network.
1820
*/

rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package javasabr.rlib.network;
22

33
import java.nio.ByteOrder;
4+
import javasabr.rlib.common.util.GroupThreadFactory.ThreadConstructor;
45
import lombok.Builder;
56
import lombok.Getter;
67
import lombok.experimental.Accessors;
@@ -18,9 +19,13 @@ public interface NetworkConfig {
1819
class SimpleNetworkConfig implements NetworkConfig {
1920

2021
@Builder.Default
21-
private String groupName = "NetworkThread";
22+
private String threadGroupName = "NetworkThread";
2223
@Builder.Default
2324
private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
25+
@Builder.Default
26+
private ThreadConstructor threadConstructor = Thread::new;
27+
@Builder.Default
28+
private int threadPriority = Thread.NORM_PRIORITY;
2429

2530
@Builder.Default
2631
private int readBufferSize = 2048;
@@ -46,6 +51,22 @@ public String threadGroupName() {
4651
}
4752
};
4853

54+
/**
55+
* Get a thread constructor which should be used to create network threads.
56+
*/
57+
default ThreadConstructor threadConstructor() {
58+
return Thread::new;
59+
}
60+
61+
/**
62+
* Get a priority of network threads.
63+
*
64+
* @return the priority of network threads.
65+
*/
66+
default int threadPriority() {
67+
return Thread.NORM_PRIORITY;
68+
}
69+
4970
/**
5071
* Get a group name of network threads.
5172
*/

rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,21 +82,4 @@ default int threadGroupMaxSize() {
8282
default int scheduledThreadGroupSize() {
8383
return 1;
8484
}
85-
86-
87-
/**
88-
* Get a thread constructor which should be used to create network threads.
89-
*/
90-
default ThreadConstructor threadConstructor() {
91-
return Thread::new;
92-
}
93-
94-
/**
95-
* Get a priority of network threads.
96-
*
97-
* @return the priority of network threads.
98-
*/
99-
default int threadPriority() {
100-
return Thread.NORM_PRIORITY;
101-
}
10285
}

rlib-network/src/main/java/javasabr/rlib/network/client/impl/DefaultClientNetwork.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
import java.net.InetSocketAddress;
44
import java.net.SocketAddress;
5+
import java.nio.channels.AsynchronousChannelGroup;
56
import java.nio.channels.AsynchronousSocketChannel;
67
import java.nio.channels.CompletionHandler;
78
import java.util.Optional;
89
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.ExecutorService;
911
import java.util.concurrent.Executors;
1012
import java.util.concurrent.ScheduledExecutorService;
1113
import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,6 +45,8 @@ public class DefaultClientNetwork<C extends Connection<C>> extends AbstractNetwo
4345

4446
@Getter
4547
final ScheduledExecutorService scheduledExecutor;
48+
final ExecutorService networkExecutor;
49+
final AsynchronousChannelGroup channelGroup;
4650

4751
@Nullable
4852
@Getter(AccessLevel.PROTECTED)
@@ -57,11 +61,17 @@ public DefaultClientNetwork(
5761
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection) {
5862
super(config, channelToConnection);
5963
this.connecting = new AtomicBoolean(false);
60-
this.scheduledExecutor = Executors
61-
.newSingleThreadScheduledExecutor(new GroupThreadFactory(config.scheduledThreadGroupName()));
64+
this.scheduledExecutor = buildScheduledExecutor(config);
65+
this.networkExecutor = buildExecutor(config);
66+
this.channelGroup = Utils.uncheckedGet(networkExecutor, AsynchronousChannelGroup::withThreadPool);
6267
log.info(config, DefaultClientNetwork::buildConfigDescription);
6368
}
6469

70+
@Override
71+
public void inNetworkThread(Runnable task) {
72+
networkExecutor.execute(task);
73+
}
74+
6575
@Override
6676
public C connect(InetSocketAddress serverAddress) {
6777
return connectAsync(serverAddress).join();
@@ -88,7 +98,7 @@ public CompletableFuture<C> connectAsync(InetSocketAddress serverAddress) {
8898
var asyncResult = new CompletableFuture<C>();
8999

90100
@SuppressWarnings("resource")
91-
var channel = Utils.uncheckedGet(AsynchronousSocketChannel::open);
101+
var channel = Utils.uncheckedGet(channelGroup, AsynchronousSocketChannel::open);
92102
channel.connect(serverAddress, this, new CompletionHandler<>() {
93103
@Override
94104
public void completed(@Nullable Void result, DefaultClientNetwork<C> network) {
@@ -136,6 +146,33 @@ public void shutdown() {
136146
if (connection != null) {
137147
Utils.unchecked(connection, C::close);
138148
}
149+
channelGroup.shutdown();
150+
scheduledExecutor.shutdown();
151+
networkExecutor.shutdown();
152+
}
153+
154+
protected ExecutorService buildExecutor(NetworkConfig config) {
155+
var threadFactory = new GroupThreadFactory(
156+
config.threadGroupName(),
157+
config.threadConstructor(),
158+
config.threadPriority(),
159+
false);
160+
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
161+
// activate the executor
162+
executorService.submit(() -> {});
163+
return executorService;
164+
}
165+
166+
protected ScheduledExecutorService buildScheduledExecutor(NetworkConfig config) {
167+
var threadFactory = new GroupThreadFactory(
168+
config.scheduledThreadGroupName(),
169+
config.threadConstructor(),
170+
config.threadPriority(),
171+
false);
172+
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
173+
// activate the executor
174+
scheduledExecutor.submit(() -> {});
175+
return scheduledExecutor;
139176
}
140177

141178
private static String buildConfigDescription(NetworkConfig conf) {

rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,13 @@ public void onConnected() {}
9797
@Override
9898
public void onReceiveValidPacket(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer) {
9999
validPacketSubscribers.add(consumer);
100-
packetReader().startRead();
100+
network.inNetworkThread(() -> packetReader().startRead());
101101
}
102102

103103
@Override
104104
public void onReceiveInvalidPacket(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer) {
105105
invalidPacketSubscribers.add(consumer);
106-
packetReader().startRead();
106+
network.inNetworkThread(() -> packetReader().startRead());
107107
}
108108

109109
@Override
@@ -129,26 +129,32 @@ protected void registerFluxOnReceivedEvents(
129129
packet, true));
130130
BiConsumer<C, ReadableNetworkPacket<C>> invalidListener =
131131
(connection, packet) -> sink.next(new ReceivedPacketEvent<>(connection,
132-
packet, true));
132+
packet, false));
133133

134134

135-
onReceiveValidPacket(validListener);
136-
onReceiveInvalidPacket(invalidListener);
135+
validPacketSubscribers.add(validListener);
136+
invalidPacketSubscribers.add(invalidListener);
137137

138-
sink.onDispose(() -> validPacketSubscribers.remove(validListener));
139-
sink.onDispose(() -> invalidPacketSubscribers.remove(invalidListener));
138+
sink.onDispose(() -> {
139+
validPacketSubscribers.remove(validListener);
140+
validPacketSubscribers.remove(invalidListener);
141+
});
142+
143+
network.inNetworkThread(() -> packetReader().startRead());
140144
}
141145

142146
protected void registerFluxOnReceivedValidPackets(FluxSink<? super ReadableNetworkPacket<C>> sink) {
143147
BiConsumer<C, ReadableNetworkPacket<C>> listener = (connection, packet) -> sink.next(packet);
144-
onReceiveValidPacket(listener);
148+
validPacketSubscribers.add(listener);
145149
sink.onDispose(() -> validPacketSubscribers.remove(listener));
150+
network.inNetworkThread(() -> packetReader().startRead());
146151
}
147152

148153
protected void registerFluxOnReceivedInvalidPackets(FluxSink<? super ReadableNetworkPacket<C>> sink) {
149154
BiConsumer<C, ReadableNetworkPacket<C>> listener = (connection, packet) -> sink.next(packet);
150-
onReceiveInvalidPacket(listener);
155+
invalidPacketSubscribers.add(listener);
151156
sink.onDispose(() -> invalidPacketSubscribers.remove(listener));
157+
network.inNetworkThread(() -> packetReader().startRead());
152158
}
153159

154160
@Nullable
@@ -222,18 +228,15 @@ public final void send(WritableNetworkPacket<C> packet) {
222228
}
223229

224230
protected void sendImpl(WritableNetworkPacket<C> packet) {
225-
226231
if (closed()) {
227232
return;
228233
}
229-
230234
long stamp = lock.writeLock();
231235
try {
232236
pendingPackets.addLast(packet);
233237
} finally {
234238
lock.unlockWrite(stamp);
235239
}
236-
237240
packetWriter().tryToSendNextPacket();
238241
}
239242

@@ -248,15 +251,11 @@ protected void queueAtFirst(WritableNetworkPacket<C> packet) {
248251

249252
@Override
250253
public CompletableFuture<Boolean> sendWithFeedback(WritableNetworkPacket<C> packet) {
251-
252254
var asyncResult = new CompletableFuture<Boolean>();
253-
254255
sendImpl(new WritablePacketWithFeedback<>(asyncResult, packet));
255-
256256
if (closed()) {
257257
return CompletableFuture.completedFuture(Boolean.FALSE);
258258
}
259-
260259
return asyncResult;
261260
}
262261

@@ -273,11 +272,9 @@ protected void clearWaitPackets() {
273272
}
274273

275274
protected void doClearWaitPackets() {
276-
277275
for (var pendingPacket : pendingPackets) {
278276
handleSentPacket(pendingPacket, false);
279277
}
280-
281278
pendingPackets.clear();
282279
}
283280

rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private interface ServerCompletionHandler<C extends UnsafeConnection<C>> extends
5757
public void completed(AsynchronousSocketChannel channel, DefaultServerNetwork<C> network) {
5858
var connection = network.channelToConnection.apply(network, channel);
5959
log.debug(connection.remoteAddress(), "Accepted new connection:[%s]"::formatted);
60-
network.onAccept(connection);
60+
network.consumeAccepted(connection);
6161
network.acceptNext();
6262
}
6363

@@ -77,18 +77,20 @@ public void failed(Throwable exc, DefaultServerNetwork<C> network) {
7777

7878
@Getter
7979
ScheduledExecutorService scheduledExecutor;
80+
ExecutorService networkExecutor;
8081

81-
AsynchronousChannelGroup group;
82+
AsynchronousChannelGroup channelGroup;
8283
AsynchronousServerSocketChannel channel;
8384
MutableArray<Consumer<? super C>> subscribers;
8485

8586
public DefaultServerNetwork(
8687
ServerNetworkConfig config,
8788
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection) {
8889
super(config, channelToConnection);
89-
this.group = Utils.uncheckedGet(buildExecutor(config), AsynchronousChannelGroup::withThreadPool);
90+
this.networkExecutor = buildExecutor(config);
91+
this.channelGroup = Utils.uncheckedGet(networkExecutor, AsynchronousChannelGroup::withThreadPool);
9092
this.scheduledExecutor = buildScheduledExecutor(config);
91-
this.channel = Utils.uncheckedGet(group, AsynchronousServerSocketChannel::open);
93+
this.channel = Utils.uncheckedGet(channelGroup, AsynchronousServerSocketChannel::open);
9294
this.subscribers = ArrayFactory.copyOnModifyArray(Consumer.class);
9395
log.info(config, DefaultServerNetwork::buildConfigDescription);
9496
}
@@ -109,18 +111,23 @@ public InetSocketAddress start() {
109111
log.info(address, "Started server socket on address:[%s]"::formatted);
110112

111113
if (!subscribers.isEmpty()) {
112-
acceptNext();
114+
inNetworkThread(this::acceptNext);
113115
}
114116

115117
return address;
116118
}
117119

120+
@Override
121+
public void inNetworkThread(Runnable task) {
122+
networkExecutor.execute(task);
123+
}
124+
118125
@Override
119126
public <S extends ServerNetwork<C>> S start(InetSocketAddress serverAddress) {
120127
Utils.unchecked(channel, serverAddress, AsynchronousServerSocketChannel::bind);
121128
log.info(serverAddress, addr -> "Started server socket on address: " + addr);
122129
if (!subscribers.isEmpty()) {
123-
acceptNext();
130+
inNetworkThread(this::acceptNext);
124131
}
125132
return ClassUtils.unsafeNNCast(this);
126133
}
@@ -135,7 +142,7 @@ protected void acceptNext() {
135142
}
136143
}
137144

138-
protected void onAccept(C connection) {
145+
protected void consumeAccepted(C connection) {
139146
connection.onConnected();
140147
subscribers
141148
.iterations()
@@ -145,7 +152,7 @@ protected void onAccept(C connection) {
145152
@Override
146153
public void onAccept(Consumer<? super C> consumer) {
147154
subscribers.add(consumer);
148-
acceptNext();
155+
inNetworkThread(this::acceptNext);
149156
}
150157

151158
@Override
@@ -162,7 +169,9 @@ protected void registerFluxOnAccepted(FluxSink<C> sink) {
162169
@Override
163170
public void shutdown() {
164171
Utils.unchecked(channel, AsynchronousChannel::close);
165-
group.shutdown();
172+
channelGroup.shutdown();
173+
scheduledExecutor.shutdown();
174+
networkExecutor.shutdown();
166175
}
167176

168177
protected ExecutorService buildExecutor(ServerNetworkConfig config) {

0 commit comments

Comments
 (0)