Skip to content

Commit 2219fd8

Browse files
authored
[ISSUE #9244] Avoid writing dirty data in consumption mode (#9245)
1 parent cee1963 commit 2219fd8

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
3434
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
3535
import org.apache.rocketmq.common.MixAll;
36+
import org.apache.rocketmq.common.TopicConfig;
3637
import org.apache.rocketmq.common.constant.LoggerName;
3738
import org.apache.rocketmq.common.message.MessageQueue;
3839
import org.apache.rocketmq.common.message.MessageQueueAssignment;
@@ -49,6 +50,7 @@
4950
import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentResponseBody;
5051
import org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
5152
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
53+
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
5254

5355
public class QueryAssignmentProcessor implements NettyRequestProcessor {
5456
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -314,8 +316,20 @@ private RemotingCommand setMessageRequestMode(ChannelHandlerContext ctx,
314316
response.setRemark("retry topic is not allowed to set mode");
315317
return response;
316318
}
319+
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
320+
if (null == topicConfig) {
321+
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
322+
response.setRemark("topic[" + topic + "] not exist");
323+
return response;
324+
}
317325

318326
final String consumerGroup = requestBody.getConsumerGroup();
327+
SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(consumerGroup);
328+
if (null == groupConfig) {
329+
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
330+
response.setRemark("subscription group does not exist");
331+
return response;
332+
}
319333

320334
this.messageRequestModeManager.setMessageRequestMode(topic, consumerGroup, requestBody);
321335
this.messageRequestModeManager.persist();

0 commit comments

Comments
 (0)