From d961d4371eeaede13d40640335e88ae2bd125aa2 Mon Sep 17 00:00:00 2001 From: andsel Date: Sat, 12 Oct 2024 19:20:39 +0200 Subject: [PATCH] Added test to verify that on reconnect with smaller quota the resending of in flight messages resume correctly without any lose --- .../java/io/moquette/broker/LimitedQuota.java | 2 +- .../main/java/io/moquette/broker/Session.java | 76 ++++++++++++++----- .../mqtt5/AbstractServerIntegrationTest.java | 3 +- .../integration/mqtt5/FlowControlTest.java | 39 +++++++++- .../java/io/moquette/testclient/Client.java | 10 ++- 5 files changed, 105 insertions(+), 25 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/LimitedQuota.java b/broker/src/main/java/io/moquette/broker/LimitedQuota.java index ed111035c..32931fdcc 100644 --- a/broker/src/main/java/io/moquette/broker/LimitedQuota.java +++ b/broker/src/main/java/io/moquette/broker/LimitedQuota.java @@ -58,7 +58,7 @@ public int getMaximum() { @Override public int availableSlots() { - return receiveMaximum - receivedQuota; + return receivedQuota; } @Override diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 5fa5558ad..e73910e4c 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -43,6 +43,7 @@ class Session { // session that doesn't expire, it's ~68 years. static final int INFINITE_EXPIRY = Integer.MAX_VALUE; private final boolean resendInflightOnTimeout; + private Collection nonAckPacketIds; static class InFlightPacket implements Delayed { @@ -87,6 +88,7 @@ enum SessionStatus { private final DelayQueue inflightTimeouts = new DelayQueue<>(); private final Map qos2Receiving = new HashMap<>(); private ISessionsRepository.SessionData data; + private boolean resendingNonAcked = false; Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue sessionQueue) { if (sessionQueue == null) { @@ -212,7 +214,11 @@ public void processPubRec(int pubRecPacketId) { MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId); mqttConnection.sendIfWritableElseDrop(pubRel); - drainQueueToConnection(); + if (resendingNonAcked) { + resendInflightNotAcked(); + } else { + drainQueueToConnection(); + } } public void processPubComp(int messageID) { @@ -346,14 +352,19 @@ void pubAckReceived(int ackPacketId) { cleanFromInflight(ackPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); if (removed == null) { - LOG.warn("Received a PUBACK with not matching packetId in the inflight cache"); + LOG.warn("Received a PUBACK with not matching packetId({}) in the inflight cache({})", + ackPacketId, inflightWindow.keySet()); return; } removed.release(); mqttConnection.sendQuota().releaseSlot(); LOG.debug("Received PUBACK {} for session {}", ackPacketId, getClientID()); - drainQueueToConnection(); + if (resendingNonAcked) { + resendInflightNotAcked(); + } else { + drainQueueToConnection(); + } } private void cleanFromInflight(int ackPacketId) { @@ -365,22 +376,45 @@ public void flushAllQueuedMessages() { } public void resendInflightNotAcked() { - Collection nonAckPacketIds; - if (resendInflightOnTimeout) { - // MQTT3 behavior, resend on timeout - Collection expired = new ArrayList<>(); - inflightTimeouts.drainTo(expired); - nonAckPacketIds = expired.stream().map(p -> p.packetId).collect(Collectors.toList()); + if (!resendingNonAcked) { + if (resendInflightOnTimeout) { + // MQTT3 behavior, resend on timeout + Collection expired = new ArrayList<>(); + inflightTimeouts.drainTo(expired); + nonAckPacketIds = expired.stream().map(p -> p.packetId).collect(Collectors.toList()); + } else { + // MQTT5 behavior resend only not acked present in reopened session. + // need a copy else removing from the nonAckPacketIds would remove also from inflightWindow + nonAckPacketIds = new ArrayList<>(inflightWindow.keySet()); + } + + debugLogPacketIds(nonAckPacketIds); + } + + // TODO rework here because send quota max can be lower than the inflight window width! + if (nonAckPacketIds.size() > mqttConnection.sendQuota().availableSlots()) { + // send quota is smaller than the inflight messages to resend, split it + resendingNonAcked = true; + List partition = nonAckPacketIds.stream() + .limit(mqttConnection.sendQuota().availableSlots()) + .collect(Collectors.toList()); + + resendNonAckedIdsPartition(partition); + + // clean up the partition sent + for (Integer id : partition) { + nonAckPacketIds.remove(id); + } } else { - // MQTT5 behavior resend only not acked present in reopened session. - nonAckPacketIds = inflightWindow.keySet(); + resendNonAckedIdsPartition(nonAckPacketIds); + resendingNonAcked = false; } - debugLogPacketIds(nonAckPacketIds); + } - // TODO rework here because send quota max can be lower than the inflight window width! - for (Integer notAckPacketId : nonAckPacketIds) { - final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId); + private void resendNonAckedIdsPartition(Collection packetIdsToResend) { + for (Integer notAckPacketId : packetIdsToResend) { + final EnqueuedMessage msg = inflightWindow.get(notAckPacketId); if (msg == null) { // Already acked... continue; @@ -392,7 +426,7 @@ public void resendInflightNotAcked() { } mqttConnection.sendIfWritableElseDrop(pubRel); } else { - final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg; + final PublishedMessage pubMsg = (PublishedMessage) msg; final Topic topic = pubMsg.topic; final MqttQoS qos = pubMsg.publishingQos; final ByteBuf payload = pubMsg.payload; @@ -404,6 +438,8 @@ public void resendInflightNotAcked() { inflightTimeouts.add(new InFlightPacket(notAckPacketId, FLIGHT_BEFORE_RESEND_MS)); } mqttConnection.sendPublish(publishMsg); + + mqttConnection.sendQuota().consumeSlot(); } } } @@ -469,8 +505,12 @@ public void reconnectSession() { LOG.trace("Republishing all saved messages for session {}", this); resendInflightNotAcked(); - // send queued messages while offline - drainQueueToConnection(); + if (!resendingNonAcked) { + // if resend of inflight is bigger than send quota, till it's finished + // do not drain the queue. + // send queued messages while offline + drainQueueToConnection(); + } } public void receivedPublishQos2(int messageID, MqttPublishMessage msg) { diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index d0fea7546..8dd989d01 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -1,6 +1,7 @@ package io.moquette.integration.mqtt5; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import io.moquette.BrokerConstants; import io.moquette.testclient.Client; import io.netty.handler.codec.mqtt.*; import org.jetbrains.annotations.NotNull; @@ -51,7 +52,7 @@ void connectLowLevel() { } void connectLowLevel(int keepAliveSecs) { - MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs); + MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE); assertConnectionAccepted(connAck, "Connection must be accepted"); } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java index 2972d06a2..7df84f543 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java @@ -140,8 +140,8 @@ public void givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpa assertTrue(publish.release(), "Reference of publish should be released"); } - private static void fillInFlightWindow(int inflightWindowSize, Mqtt5BlockingClient publisher) { - for (int i = 0; i < inflightWindowSize; i++) { + private static void fillInFlightWindow(int numPublishToSend, Mqtt5BlockingClient publisher) { + for (int i = 0; i < numPublishToSend; i++) { publisher.publishWith() .topic("temperature/living") .payload(Integer.toString(i).getBytes(StandardCharsets.UTF_8)) @@ -149,4 +149,39 @@ private static void fillInFlightWindow(int inflightWindowSize, Mqtt5BlockingClie .send(); } } + + @Test + public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose() throws InterruptedException { + // connect subscriber and published + // publisher send 20 events, 10 should be in the inflight, 10 remains on the queue + connectLowLevel(); + + // subscribe with an identifier + MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", + MqttQoS.AT_LEAST_ONCE, 123); + verifyOfType(received, MqttMessageType.SUBACK); + + //lowlevel client doesn't ACK any pub, so the in flight window fills up + Mqtt5BlockingClient publisher = createPublisherClient(); + int inflightWindowSize = 10; + // fill the in flight window so that messages starts to be enqueued + fillInFlightWindow(inflightWindowSize + 10, publisher); + + System.out.println("Filled inflight and queue"); + + // disconnect subscriber + lowLevelClient.disconnect(); + lowLevelClient.close(); + + System.out.println("Closed old client, reconnecting"); + + // reconnect the subscriber with smaller received maximum + lowLevelClient = new Client("localhost").clientId(clientName()); + MqttConnAckMessage connAck = lowLevelClient.connectV5WithReceiveMaximum(5); + assertConnectionAccepted(connAck, "Connection must be re-accepted with smaller window size"); + System.out.println("Client reconnected second time"); + + // should receive all the 20 messages + consumesPublishesInflightWindow(inflightWindowSize + 10); + } } diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index d1ca2e6c0..b406bcf38 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -139,11 +139,15 @@ public void connect() { } public MqttConnAckMessage connectV5() { - return connectV5(2); + return connectV5(2, BrokerConstants.INFLIGHT_WINDOW_SIZE); + } + + public MqttConnAckMessage connectV5WithReceiveMaximum(int receiveMaximumInflight) { + return connectV5(2, receiveMaximumInflight); } @NotNull - public MqttConnAckMessage connectV5(int keepAliveSecs) { + public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInflight) { final MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_5); if (clientId != null) { builder.clientId(clientId); @@ -152,7 +156,7 @@ public MqttConnAckMessage connectV5(int keepAliveSecs) { final MqttProperties connectProperties = new MqttProperties(); MqttProperties.IntegerProperty receiveMaximum = new MqttProperties.IntegerProperty( MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value(), - BrokerConstants.INFLIGHT_WINDOW_SIZE); + receiveMaximumInflight); connectProperties.add(receiveMaximum); MqttConnectMessage connectMessage = builder