Skip to content

Commit 48a755a

Browse files
author
duanlinlin
committed
feat[consumer]: support custom thread pool, providing the possibility for different consumers to reuse thread pools
1 parent b638d4c commit 48a755a

File tree

3 files changed

+94
-27
lines changed

3 files changed

+94
-27
lines changed

client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

+40
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.client.consumer;
1818

19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.ScheduledExecutorService;
21+
import java.util.concurrent.ThreadPoolExecutor;
1922
import org.apache.rocketmq.client.ClientConfig;
2023
import org.apache.rocketmq.client.QueryResult;
2124
import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -170,6 +173,19 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
170173
*/
171174
private long adjustThreadPoolNumsThreshold = 100000;
172175

176+
/**
177+
* Concurrently max span offset.it has no effect on sequential consumption
178+
*/
179+
private ScheduledExecutorService consumeMessageScheduledExecutor;
180+
/**
181+
* Thread pool for handling expired messages
182+
*/
183+
private ScheduledExecutorService cleanExpireMsgScheduledExecutor;
184+
/**
185+
* Thread pool for handling normal messages
186+
*/
187+
private ExecutorService consumeExecutor;
188+
173189
/**
174190
* Concurrently max span offset.it has no effect on sequential consumption
175191
*/
@@ -927,6 +943,30 @@ public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold)
927943
this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
928944
}
929945

946+
public ScheduledExecutorService getConsumeMessageScheduledExecutor() {
947+
return consumeMessageScheduledExecutor;
948+
}
949+
950+
public void setConsumeMessageScheduledExecutor(ScheduledExecutorService consumeMessageScheduledExecutor) {
951+
this.consumeMessageScheduledExecutor = consumeMessageScheduledExecutor;
952+
}
953+
954+
public ScheduledExecutorService getCleanExpireMsgScheduledExecutor() {
955+
return cleanExpireMsgScheduledExecutor;
956+
}
957+
958+
public void setCleanExpireMsgScheduledExecutor(ScheduledExecutorService cleanExpireMsgScheduledExecutor) {
959+
this.cleanExpireMsgScheduledExecutor = cleanExpireMsgScheduledExecutor;
960+
}
961+
962+
public ExecutorService getConsumeExecutor() {
963+
return consumeExecutor;
964+
}
965+
966+
public void setConsumeExecutor(ExecutorService consumeExecutor) {
967+
this.consumeExecutor = consumeExecutor;
968+
}
969+
930970
public int getMaxReconsumeTimes() {
931971
return maxReconsumeTimes;
932972
}

client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

+32-14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.Executors;
2728
import java.util.concurrent.LinkedBlockingQueue;
2829
import java.util.concurrent.RejectedExecutionException;
@@ -54,7 +55,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
5455
private final DefaultMQPushConsumer defaultMQPushConsumer;
5556
private final MessageListenerConcurrently messageListener;
5657
private final BlockingQueue<Runnable> consumeRequestQueue;
57-
private final ThreadPoolExecutor consumeExecutor;
58+
private final ExecutorService consumeExecutor;
5859
private final String consumerGroup;
5960

6061
private final ScheduledExecutorService scheduledExecutorService;
@@ -68,18 +69,30 @@ public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPush
6869
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
6970
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
7071
this.consumeRequestQueue = new LinkedBlockingQueue<>();
71-
7272
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
73-
this.consumeExecutor = new ThreadPoolExecutor(
74-
this.defaultMQPushConsumer.getConsumeThreadMin(),
75-
this.defaultMQPushConsumer.getConsumeThreadMax(),
76-
1000 * 60,
77-
TimeUnit.MILLISECONDS,
78-
this.consumeRequestQueue,
79-
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
80-
81-
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
82-
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
73+
if (this.defaultMQPushConsumer.getConsumeExecutor() != null) {
74+
this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor();
75+
} else {
76+
this.consumeExecutor = new ThreadPoolExecutor(
77+
this.defaultMQPushConsumer.getConsumeThreadMin(),
78+
this.defaultMQPushConsumer.getConsumeThreadMax(),
79+
1000 * 60,
80+
TimeUnit.MILLISECONDS,
81+
this.consumeRequestQueue,
82+
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
83+
}
84+
85+
if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) {
86+
this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor();
87+
} else {
88+
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
89+
}
90+
91+
if (this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor() != null) {
92+
this.cleanExpireMsgExecutors = this.defaultMQPushConsumer.getCleanExpireMsgScheduledExecutor();
93+
} else {
94+
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));
95+
}
8396
}
8497

8598
public void start() {
@@ -108,7 +121,9 @@ public void updateCorePoolSize(int corePoolSize) {
108121
if (corePoolSize > 0
109122
&& corePoolSize <= Short.MAX_VALUE
110123
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
111-
this.consumeExecutor.setCorePoolSize(corePoolSize);
124+
if (consumeExecutor instanceof ThreadPoolExecutor) {
125+
((ThreadPoolExecutor)this.consumeExecutor).setCorePoolSize(corePoolSize);
126+
}
112127
}
113128
}
114129

@@ -124,7 +139,10 @@ public void decCorePoolSize() {
124139

125140
@Override
126141
public int getCorePoolSize() {
127-
return this.consumeExecutor.getCorePoolSize();
142+
if (consumeExecutor instanceof ThreadPoolExecutor) {
143+
return ((ThreadPoolExecutor)this.consumeExecutor).getCorePoolSize();
144+
}
145+
return -1;
128146
}
129147

130148
@Override

client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

+22-13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.List;
2323
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.ExecutorService;
2425
import java.util.concurrent.Executors;
2526
import java.util.concurrent.LinkedBlockingQueue;
2627
import java.util.concurrent.ScheduledExecutorService;
@@ -58,7 +59,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
5859
private final DefaultMQPushConsumer defaultMQPushConsumer;
5960
private final MessageListenerOrderly messageListener;
6061
private final BlockingQueue<Runnable> consumeRequestQueue;
61-
private final ThreadPoolExecutor consumeExecutor;
62+
private final ExecutorService consumeExecutor;
6263
private final String consumerGroup;
6364
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
6465
private final ScheduledExecutorService scheduledExecutorService;
@@ -72,17 +73,24 @@ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsu
7273
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
7374
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
7475
this.consumeRequestQueue = new LinkedBlockingQueue<>();
75-
7676
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
77-
this.consumeExecutor = new ThreadPoolExecutor(
78-
this.defaultMQPushConsumer.getConsumeThreadMin(),
79-
this.defaultMQPushConsumer.getConsumeThreadMax(),
80-
1000 * 60,
81-
TimeUnit.MILLISECONDS,
82-
this.consumeRequestQueue,
83-
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
84-
85-
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
77+
if (this.defaultMQPushConsumer.getConsumeExecutor() != null) {
78+
this.consumeExecutor = this.defaultMQPushConsumer.getConsumeExecutor();
79+
} else {
80+
this.consumeExecutor = new ThreadPoolExecutor(
81+
this.defaultMQPushConsumer.getConsumeThreadMin(),
82+
this.defaultMQPushConsumer.getConsumeThreadMax(),
83+
1000 * 60,
84+
TimeUnit.MILLISECONDS,
85+
this.consumeRequestQueue,
86+
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));
87+
}
88+
89+
if (this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor() != null) {
90+
this.scheduledExecutorService = this.defaultMQPushConsumer.getConsumeMessageScheduledExecutor();
91+
} else {
92+
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));
93+
}
8694
}
8795

8896
@Override
@@ -120,8 +128,9 @@ public void updateCorePoolSize(int corePoolSize) {
120128
if (corePoolSize > 0
121129
&& corePoolSize <= Short.MAX_VALUE
122130
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
123-
this.consumeExecutor.setCorePoolSize(corePoolSize);
124-
}
131+
if (consumeExecutor instanceof ThreadPoolExecutor) {
132+
((ThreadPoolExecutor)this.consumeExecutor).setCorePoolSize(corePoolSize);
133+
} }
125134
}
126135

127136
@Override

0 commit comments

Comments
 (0)