Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public MQTTPublishManager(MQTTSession session, boolean closeMqttConnectionOnPubl
this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
}

public static SimpleString getQoS2ManagementAddressName(SimpleString clientId) {
return SimpleString.of(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX + clientId);
}

synchronized void start() {
this.state = session.getState();
this.outboundStore = state.getOutboundStore();
Expand Down Expand Up @@ -315,7 +319,7 @@ void handlePubRec(int messageId) throws Exception {
*/
private void initQos2Resources() throws Exception {
if (qos2ManagementAddress == null) {
qos2ManagementAddress = SimpleString.of(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX + session.getState().getClientId());
qos2ManagementAddress = MQTTPublishManager.getQoS2ManagementAddressName(SimpleString.of(session.getState().getClientId()));
}
if (qos2ManagementQueue == null) {
qos2ManagementQueue = session.getServer().createQueue(QueueConfiguration.of(qos2ManagementAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
Expand Down Expand Up @@ -112,8 +113,12 @@ public void scanSessions() {
MQTTSessionState state = entry.getValue();
logger.debug("Inspecting session: {}", state);
int sessionExpiryInterval = state.getClientSessionExpiryInterval();
if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) {
toRemove.add(entry.getKey());
if (!state.isAttached()) {
if (sessionExpiryInterval == 0) {
Copy link
Contributor

@tabish121 tabish121 Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could be flattened into one if since it could be an Or on the next condition either its zero or its a positive value and trips the time limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm investigating...

toRemove.add(entry.getKey());
} else if (sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) {
toRemove.add(entry.getKey());
}
}
if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) {
state.getSession().sendWillMessage();
Expand All @@ -127,7 +132,19 @@ public void scanSessions() {
if (state.isWill() && !state.isAttached() && state.isFailed()) {
state.getSession().sendWillMessage();
}
state.getSession().clean(false);
MQTTSession session = state.getSession();
if (session != null) {
session.clean(false);
} else {
// if the in-memory session doesn't exist, then we need to ensure that any other state is cleaned up
for (MqttTopicSubscription mqttTopicSubscription : state.getSubscriptions()) {
MQTTSubscriptionManager.cleanSubscriptionQueue(mqttTopicSubscription.topicFilter(), state.getClientId(), server, (q) -> server.destroyQueue(q, null, true, false, true));
}
Queue qos2ManagementQueue = server.locateQueue(MQTTPublishManager.getQoS2ManagementAddressName(SimpleString.of(state.getClientId())));
if (qos2ManagementQueue != null) {
qos2ManagementQueue.deleteQueue();
}
}
}
} catch (Exception e) {
MQTTLogger.LOGGER.failedToRemoveSessionState(key, e);
Expand Down Expand Up @@ -183,6 +200,14 @@ public void storeDurableSubscriptionState(MQTTSessionState state) throws Excepti
}
}

public long getDurableSubscriptionStateCount() {
if (subscriptionPersistenceEnabled) {
return sessionStore.getMessageCount();
} else {
return 0;
}
}

public static CoreMessage serializeState(MQTTSessionState state, long messageID) {
CoreMessage message = new CoreMessage().initBuffer(50).setMessageID(messageID);
message.setAddress(MQTTUtil.MQTT_SESSION_STORE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
Expand Down Expand Up @@ -265,15 +266,7 @@ short[] removeSubscriptions(List<String> topics, boolean enforceSecurity) throws
consumerQoSLevels.remove(removed.getID());
}

SimpleString internalQueueName = SimpleString.of(MQTTUtil.getCoreQueueFromMqttTopic(topics.get(i), state.getClientId(), session.getServer().getConfiguration().getWildcardConfiguration()));
Queue queue = session.getServer().locateQueue(internalQueueName);
if (queue != null) {
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
} else if (!MQTTUtil.isSharedSubscription(topics.get(i)) || (MQTTUtil.isSharedSubscription(topics.get(i)) && queue.getConsumerCount() == 0)) {
session.getServerSession().deleteQueue(internalQueueName, enforceSecurity);
}
}
cleanSubscriptionQueue(topics.get(i), state.getClientId(), session.getServer(), (q) -> session.getServerSession().deleteQueue(q, enforceSecurity));
} catch (Exception e) {
MQTTLogger.LOGGER.errorRemovingSubscription(e);
reasonCode = MQTTReasonCodes.UNSPECIFIED_ERROR;
Expand All @@ -295,6 +288,18 @@ short[] removeSubscriptions(List<String> topics, boolean enforceSecurity) throws
return reasonCodes;
}

public static void cleanSubscriptionQueue(String topic, String clientId, ActiveMQServer server, SubscriptionQueueDeleter<SimpleString> deleter) throws Exception {
SimpleString internalQueueName = SimpleString.of(MQTTUtil.getCoreQueueFromMqttTopic(topic, clientId, server.getConfiguration().getWildcardConfiguration()));
Queue queue = server.locateQueue(internalQueueName);
if (queue != null) {
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
} else if (!MQTTUtil.isSharedSubscription(topic) || (MQTTUtil.isSharedSubscription(topic) && queue.getConsumerCount() == 0)) {
deleter.delete(internalQueueName);
}
}
}

/**
* As per MQTT Spec. Subscribes this client to a number of MQTT topics.
*
Expand Down Expand Up @@ -355,4 +360,9 @@ void clean(boolean enforceSecurity) throws Exception {
}
removeSubscriptions(topics, enforceSecurity);
}

@FunctionalInterface
public interface SubscriptionQueueDeleter<SimpleString> {
void delete(SimpleString q) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTPublishManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionAccessor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
Expand Down Expand Up @@ -380,6 +381,38 @@ public void testQueueCleanOnRestart() throws Exception {
org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null, 3000, 10);
}

@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testCleanupOnRestart() throws Exception {
String topic = RandomUtil.randomUUIDString();
String clientId = RandomUtil.randomUUIDString();
CountDownLatch latch = new CountDownLatch(1);

MqttClient client = createPahoClient(clientId);
client.setCallback(new LatchedMqttCallback(latch));
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.sessionExpiryInterval(0L)
.build();
client.connect(options);
client.subscribe(topic, EXACTLY_ONCE);
client.publish(topic, new byte[0], EXACTLY_ONCE, true);
assertTrue(latch.await(2, TimeUnit.SECONDS));
assertNotNull(server.locateQueue(MQTTPublishManager.getQoS2ManagementAddressName(SimpleString.of(clientId))));
assertEquals(1, getProtocolManager().getStateManager().getDurableSubscriptionStateCount());
server.stop();
try {
client.disconnect();
} catch (MqttException e) {
// ignore
}
client.close();
server.start();
scanSessions();
assertNull(getSubscriptionQueue(topic, clientId));
assertNull(server.locateQueue(MQTTPublishManager.getQoS2ManagementAddressName(SimpleString.of(clientId))));
assertEquals(0, getProtocolManager().getStateManager().getDurableSubscriptionStateCount());
}

@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testRecursiveWill() throws Exception {
Expand Down