Skip to content

Commit 263dac8

Browse files
authored
Merge pull request #91 from hengboy/master
🚀 新增lpop获取管道阻塞队列内消息事件
2 parents df6c83a + 0c8e0e7 commit 263dac8

File tree

5 files changed

+146
-33
lines changed

5 files changed

+146
-33
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.minbox.framework.message.pipe.server.processing;
2+
3+
import org.springframework.context.ApplicationEvent;
4+
import org.springframework.context.ApplicationEventPublisher;
5+
import org.springframework.context.ApplicationEventPublisherAware;
6+
import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
7+
import org.springframework.data.redis.listener.PatternTopic;
8+
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
9+
10+
import java.util.regex.Matcher;
11+
import java.util.regex.Pattern;
12+
13+
import static org.minbox.framework.message.pipe.core.PipeConstants.PIPE_NAME_PATTERN;
14+
15+
/**
16+
* The {@link KeyspaceEventMessageListener} subclass
17+
* <p>
18+
* Encapsulate {@link KeyspaceEventMessageListener} ,
19+
* provide a method for publishing Spring {@link org.springframework.context.ApplicationEvent}
20+
*
21+
* @author 恒宇少年
22+
*/
23+
public abstract class EventPublisherKeyspaceMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
24+
private ApplicationEventPublisher applicationEventPublisher;
25+
26+
public abstract PatternTopic patternTopicUsed();
27+
28+
public EventPublisherKeyspaceMessageListener(RedisMessageListenerContainer listenerContainer) {
29+
super(listenerContainer);
30+
}
31+
32+
@Override
33+
protected void doRegister(RedisMessageListenerContainer container) {
34+
container.addMessageListener(this, this.patternTopicUsed());
35+
}
36+
37+
@Override
38+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
39+
this.applicationEventPublisher = applicationEventPublisher;
40+
}
41+
42+
/**
43+
* Publish given {@link ApplicationEvent} instance
44+
*
45+
* @param event The {@link ApplicationEvent} instance
46+
*/
47+
protected void publishEvent(ApplicationEvent event) {
48+
this.applicationEventPublisher.publishEvent(event);
49+
}
50+
51+
/**
52+
* Extract the pipeline name based on the Key in redis
53+
*
54+
* @param redisQueueKey The redis queue key
55+
* example:"test.queue"
56+
* @return The name of message pipe,if the key does not match the expression, it returns null
57+
*/
58+
protected String extractPipeName(String redisQueueKey) {
59+
Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN);
60+
Matcher matcher = pipeKeyPattern.matcher(redisQueueKey);
61+
return matcher.find() ? matcher.group(1) : null;
62+
}
63+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.minbox.framework.message.pipe.server.processing.pop;
2+
3+
import lombok.Getter;
4+
import org.minbox.framework.message.pipe.server.MessagePipe;
5+
import org.springframework.context.ApplicationEvent;
6+
7+
/**
8+
* @author 恒宇少年
9+
*/
10+
@Getter
11+
public class PopMessageEvent extends ApplicationEvent {
12+
/**
13+
* The name of {@link MessagePipe}
14+
*/
15+
private String pipeName;
16+
17+
public PopMessageEvent(Object source, String pipeName) {
18+
super(source);
19+
this.pipeName = pipeName;
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.minbox.framework.message.pipe.server.processing.pop;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.minbox.framework.message.pipe.server.processing.EventPublisherKeyspaceMessageListener;
5+
import org.springframework.data.redis.connection.Message;
6+
import org.springframework.data.redis.listener.PatternTopic;
7+
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
8+
import org.springframework.util.ObjectUtils;
9+
10+
/**
11+
* Monitor the list in Redis to get data from the left
12+
*
13+
* @author 恒宇少年
14+
*/
15+
@Slf4j
16+
public class PopMessageFromPipeListener extends EventPublisherKeyspaceMessageListener {
17+
/**
18+
* The bean name of {@link PopMessageFromPipeListener}
19+
*/
20+
public static final String BEAN_NAME = "popMessageFromPipeListener";
21+
private static final String LEFT_POP_PATTERN_TOPIC = "__keyevent@*:lpop";
22+
23+
public PopMessageFromPipeListener(RedisMessageListenerContainer listenerContainer) {
24+
super(listenerContainer);
25+
}
26+
27+
@Override
28+
public PatternTopic patternTopicUsed() {
29+
return new PatternTopic(LEFT_POP_PATTERN_TOPIC);
30+
}
31+
32+
@Override
33+
protected void doHandleMessage(Message message) {
34+
String redisQueueKey = message.toString();
35+
String pipeName = extractPipeName(redisQueueKey);
36+
if (ObjectUtils.isEmpty(pipeName)) {
37+
log.warn("The message pipe name was not extracted from Key: {}.", redisQueueKey);
38+
return;
39+
}
40+
// Publish PopMessageEvent
41+
PopMessageEvent popMessageEvent = new PopMessageEvent(this, pipeName);
42+
publishEvent(popMessageEvent);
43+
log.debug("Message Pipe:{},publish PopMessageEvent successfully.", pipeName);
44+
}
45+
}
Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,12 @@
11
package org.minbox.framework.message.pipe.server.processing.push;
22

33
import lombok.extern.slf4j.Slf4j;
4-
import org.springframework.context.ApplicationEventPublisher;
5-
import org.springframework.context.ApplicationEventPublisherAware;
4+
import org.minbox.framework.message.pipe.server.processing.EventPublisherKeyspaceMessageListener;
65
import org.springframework.data.redis.connection.Message;
7-
import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
86
import org.springframework.data.redis.listener.PatternTopic;
97
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
108
import org.springframework.util.ObjectUtils;
119

12-
import java.util.regex.Matcher;
13-
import java.util.regex.Pattern;
14-
15-
import static org.minbox.framework.message.pipe.core.PipeConstants.PIPE_NAME_PATTERN;
16-
1710
/**
1811
* Waiting for the message to be pushed to the listener of the pipeline
1912
* <p>
@@ -22,53 +15,33 @@
2215
* @author 恒宇少年
2316
*/
2417
@Slf4j
25-
public class PushMessageToPipeListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
18+
public class PushMessageToPipeListener extends EventPublisherKeyspaceMessageListener {
2619
/**
2720
* The bean name of {@link PushMessageToPipeListener}
2821
*/
2922
public static final String BEAN_NAME = "pushMessageListener";
3023
private static final String PUSH_PATTERN_TOPIC = "__keyevent@*__:rpush";
31-
private ApplicationEventPublisher applicationEventPublisher;
3224

3325
public PushMessageToPipeListener(RedisMessageListenerContainer listenerContainer) {
3426
super(listenerContainer);
3527
}
3628

3729
@Override
38-
protected void doRegister(RedisMessageListenerContainer container) {
39-
container.addMessageListener(this, new PatternTopic(PUSH_PATTERN_TOPIC));
30+
public PatternTopic patternTopicUsed() {
31+
return new PatternTopic(PUSH_PATTERN_TOPIC);
4032
}
4133

4234
@Override
4335
protected void doHandleMessage(Message message) {
4436
String redisQueueKey = message.toString();
45-
String pipeName = this.extractPipeName(redisQueueKey);
37+
String pipeName = extractPipeName(redisQueueKey);
4638
if (ObjectUtils.isEmpty(pipeName)) {
4739
log.warn("The message pipe name was not extracted from Key: {}.", redisQueueKey);
4840
return;
4941
}
5042
// Publish PushMessageEvent
5143
PushMessageEvent pushMessageEvent = new PushMessageEvent(this, pipeName);
52-
applicationEventPublisher.publishEvent(pushMessageEvent);
44+
publishEvent(pushMessageEvent);
5345
log.debug("Message Pipe:{},publish PushMessageEvent successfully.", pipeName);
5446
}
55-
56-
57-
@Override
58-
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
59-
this.applicationEventPublisher = applicationEventPublisher;
60-
}
61-
62-
/**
63-
* Extract the pipeline name based on the Key in redis
64-
*
65-
* @param redisQueueKey The redis queue key
66-
* example:"test.queue"
67-
* @return The name of message pipe,if the key does not match the expression, it returns null
68-
*/
69-
private String extractPipeName(String redisQueueKey) {
70-
Pattern pipeKeyPattern = Pattern.compile(PIPE_NAME_PATTERN);
71-
Matcher matcher = pipeKeyPattern.matcher(redisQueueKey);
72-
return matcher.find() ? matcher.group(1) : null;
73-
}
7447
}

message-pipe-spring-context/src/main/java/org/minbox/framework/message/pipe/spring/utils/MessagePipeBeanUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.minbox.framework.message.pipe.server.manager.MessagePipeFactoryBean;
77
import org.minbox.framework.message.pipe.server.manager.MessagePipeLoader;
88
import org.minbox.framework.message.pipe.server.manager.DefaultMessagePipeManager;
9+
import org.minbox.framework.message.pipe.server.processing.pop.PopMessageFromPipeListener;
910
import org.minbox.framework.message.pipe.server.processing.push.PushMessageToPipeListener;
1011
import org.minbox.framework.message.pipe.server.service.discovery.ClientServiceDiscovery;
1112
import org.minbox.framework.util.BeanUtils;
@@ -28,6 +29,7 @@ public static void registerServerBeans(BeanDefinitionRegistry registry) {
2829
registerMessagePipeLoader(registry);
2930
registerClientServiceDiscovery(registry);
3031
registerPushMessageListener(registry);
32+
registerPopMessageFromPipeListener(registry);
3133
}
3234

3335
/**
@@ -112,4 +114,13 @@ private static void registerClientServiceDiscovery(BeanDefinitionRegistry regist
112114
private static void registerPushMessageListener(BeanDefinitionRegistry registry) {
113115
BeanUtils.registerInfrastructureBeanIfAbsent(registry, PushMessageToPipeListener.BEAN_NAME, PushMessageToPipeListener.class);
114116
}
117+
118+
/**
119+
* Register {@link PopMessageFromPipeListener}
120+
*
121+
* @param registry The {@link BeanDefinitionRegistry} instance
122+
*/
123+
private static void registerPopMessageFromPipeListener(BeanDefinitionRegistry registry) {
124+
BeanUtils.registerInfrastructureBeanIfAbsent(registry, PopMessageFromPipeListener.BEAN_NAME, PopMessageFromPipeListener.class);
125+
}
115126
}

0 commit comments

Comments
 (0)