Skip to content

Commit 1bc841c

Browse files
committed
🚀 新增清理过期message pipe定时线程
1 parent 947f872 commit 1bc841c

File tree

3 files changed

+39
-6
lines changed

3 files changed

+39
-6
lines changed

message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/MessagePipe.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ public synchronized void handleFirst(Function<Message, MessageProcessStatus> fun
162162
log.debug("The message pipe:{} scheduler thread is woken up, handing first message.", name);
163163
Message current = null;
164164
MessageProcessStatus status = MessageProcessStatus.SEND_SUCCESS;
165-
Long currentTimeMillis = System.currentTimeMillis();
166165
RLock takeLock = redissonClient.getLock(takeLockName);
167166
try {
168167
MessagePipeConfiguration.LockTime lockTime = configuration.getLockTime();
@@ -182,7 +181,7 @@ public synchronized void handleFirst(Function<Message, MessageProcessStatus> fun
182181
} catch (Exception e) {
183182
this.doHandleException(e, status, current);
184183
} finally {
185-
lastProcessTimeMillis.set(currentTimeMillis);
184+
lastProcessTimeMillis.set(System.currentTimeMillis());
186185
transfer = true;
187186
if (takeLock.isLocked() && takeLock.isHeldByCurrentThread()) {
188187
takeLock.unlock();
@@ -218,6 +217,7 @@ public synchronized void handleToLast(Function<Message, MessageProcessStatus> fu
218217
} catch (Exception e) {
219218
this.doHandleException(e, status, current);
220219
} finally {
220+
lastProcessTimeMillis.set(System.currentTimeMillis());
221221
transfer = true;
222222
runningHandleAll = false;
223223
if (takeLock.isLocked() && takeLock.isHeldByCurrentThread()) {

message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/config/ServerConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ public class ServerConfiguration {
3838
* The default number of core threads for monitoring and scheduling thread pools
3939
*/
4040
private int coreThreadPoolSize = 20;
41+
/**
42+
* Interval for cleaning up expired message pipe threads, in seconds
43+
*/
44+
private long cleanupExpiredMessagePipeIntervalSeconds = 10;
45+
/**
46+
* The threshold for determining an expired message pipe, in seconds
47+
*/
48+
private long cleanupExpiredMessagePipeThresholdSeconds = 1800;
4149
/**
4250
* Configure the message pipe name to exclude distribution
4351
* <p>

message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/manager/AbstractMessagePipeManager.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
import org.springframework.beans.factory.InitializingBean;
1515
import org.springframework.util.ObjectUtils;
1616

17+
import java.util.ArrayList;
18+
import java.util.List;
1719
import java.util.Map;
18-
import java.util.concurrent.ConcurrentHashMap;
19-
import java.util.concurrent.ConcurrentMap;
20-
import java.util.concurrent.ExecutorService;
21-
import java.util.concurrent.Executors;
20+
import java.util.concurrent.*;
2221
import java.util.regex.Matcher;
2322
import java.util.regex.Pattern;
23+
import java.util.stream.Collectors;
2424

2525
/**
2626
* The {@link MessagePipeManager} abstract implementation class
@@ -36,13 +36,15 @@ public abstract class AbstractMessagePipeManager implements MessagePipeManager,
3636
* The Key of the Map is the name of the {@link MessagePipe}
3737
*/
3838
private static final ConcurrentMap<String, MessagePipe> MESSAGE_PIPE_MAP = new ConcurrentHashMap();
39+
private static final int CLEANUP_EXPIRED_CORE_THREADS = 1;
3940
/**
4041
* Create the configuration object used by the {@link MessagePipe}
4142
*/
4243
private MessagePipeConfiguration sharedConfiguration;
4344
private BeanFactory beanFactory;
4445
private static ExecutorService SCHEDULER_SERVICE;
4546
private static ExecutorService MONITOR_SERVICE;
47+
private static ScheduledExecutorService CLEANUP_EXPIRED_SERVICE;
4648
private ServerConfiguration serverConfiguration;
4749
private MessagePipeFactoryBean messagePipeFactoryBean;
4850
private ServiceDiscovery serviceDiscovery;
@@ -146,10 +148,33 @@ public void afterPropertiesSet() throws Exception {
146148
this.serviceDiscovery = beanFactory.getBean(ServiceDiscovery.class);
147149
SCHEDULER_SERVICE = Executors.newFixedThreadPool(serverConfiguration.getCoreThreadPoolSize());
148150
MONITOR_SERVICE = Executors.newFixedThreadPool(serverConfiguration.getCoreThreadPoolSize());
151+
CLEANUP_EXPIRED_SERVICE = Executors.newScheduledThreadPool(CLEANUP_EXPIRED_CORE_THREADS);
152+
this.startCleanupExpiredThread();
149153
log.info("The MessagePipeManager startup successfully,maximum number of message pipes:{}.",
150154
serverConfiguration.getMaxMessagePipeCount());
151155
}
152156

157+
/**
158+
* Start cleanup expired message pipe thread
159+
*/
160+
private void startCleanupExpiredThread() {
161+
CLEANUP_EXPIRED_SERVICE.scheduleAtFixedRate(() -> {
162+
log.debug("Clean up expired message pipes thread is start working...");
163+
long checkTimeMillis = System.currentTimeMillis();
164+
List<MessagePipe> expiredList = MESSAGE_PIPE_MAP.values().stream()
165+
.filter(messagePipe -> {
166+
long diffSeconds = TimeUnit.MILLISECONDS.toSeconds(checkTimeMillis - messagePipe.getLastProcessTimeMillis());
167+
return diffSeconds > serverConfiguration.getExpiredExcludeThresholdSeconds();
168+
}).collect(Collectors.toList());
169+
if (!ObjectUtils.isEmpty(expiredList)) {
170+
expiredList.stream().forEach(expiredMessagePipe ->
171+
MESSAGE_PIPE_MAP.remove(expiredMessagePipe.getName(), expiredMessagePipe));
172+
log.warn("The cleanup of expired message pipes thread is completed, this cleanup: {}.",
173+
expiredList.stream().map(MessagePipe::getName).collect(Collectors.toList()));
174+
}
175+
}, 1, serverConfiguration.getCleanupExpiredMessagePipeIntervalSeconds(), TimeUnit.SECONDS);
176+
}
177+
153178
@Override
154179
public void destroy() throws Exception {
155180
SCHEDULER_SERVICE.shutdown();

0 commit comments

Comments
 (0)