From 0d22c0dbd897e82e186f328bc60785af6eeaa6b0 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Sat, 26 Oct 2024 18:19:20 +0200 Subject: [PATCH] Handled client's receive maximum property to avoid publish flooding (#858) Handle client's receive maximum property to configure the inflight window through the client. The main change contained in this PR is usage of a `sendQuota` in the `MQTTConnection` to keep the inflight zone size instead of `Session`'s `inflightSlots`. The method used to resend inflight messages has been updated to respect the send quota (aka `receive_maximum` property ) expressed by the client on CONNECT message. If the inflight space is bigger than the actual send quota, maybe because the client squeezed it on a reconnection, it partition the inflight to fill up the send quota. The resend of inflight take precedence over queue drain both on processing of QoS1 PUBACK and QoS2 PURBEC. If client connects through MQTT 3 the hardcoded limit (10) is used for inflight zone. If it connects with MQTT5 it reads the `receive_maximum` property or set to default 65535 default value if not specified. --- ChangeLog.txt | 1 + .../java/io/moquette/BrokerConstants.java | 1 + .../moquette/broker/BrokerConfiguration.java | 2 +- .../io/moquette/broker/InflightResender.java | 2 +- .../java/io/moquette/broker/LimitedQuota.java | 21 ++-- .../io/moquette/broker/MQTTConnection.java | 56 ++++++++--- .../main/java/io/moquette/broker/Quota.java | 8 +- .../main/java/io/moquette/broker/Session.java | 99 +++++++++++++------ .../io/moquette/broker/SessionRegistry.java | 1 + .../io/moquette/broker/UnlimitedQuota.java | 52 ---------- .../java/io/moquette/broker/SessionTest.java | 4 +- .../mqtt5/AbstractServerIntegrationTest.java | 42 +++++++- .../integration/mqtt5/ConnectAckTest.java | 1 - .../integration/mqtt5/FlowControlTest.java | 87 ++++++++++++++++ .../mqtt5/MessageExpirationTest.java | 76 ++++++-------- .../java/io/moquette/testclient/Client.java | 20 +++- 16 files changed, 309 insertions(+), 164 deletions(-) delete mode 100644 broker/src/main/java/io/moquette/broker/UnlimitedQuota.java diff --git a/ChangeLog.txt b/ChangeLog.txt index f9e059eb1..a9ee74ba8 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858) [feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848) [feature] Flow-control: implemented publish's quota management on the server side. (#852) [fix] Incorrect reference used in compareAndSet in CTrie.cleanTomb. (#841) diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index e61558d2d..2b05980dd 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -132,6 +132,7 @@ public final class BrokerConstants { public static final int FLIGHT_BEFORE_RESEND_MS = 5_000; public static final int INFLIGHT_WINDOW_SIZE = 10; public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF; + public static final int RECEIVE_MAXIMUM = 65 * 1024; private BrokerConstants() { } diff --git a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java index 210f5b498..2fcced636 100644 --- a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java +++ b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java @@ -66,7 +66,7 @@ class BrokerConfiguration { } } - receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, Integer.MAX_VALUE); + receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM); } // test method diff --git a/broker/src/main/java/io/moquette/broker/InflightResender.java b/broker/src/main/java/io/moquette/broker/InflightResender.java index 514ef5ee6..369e9c42d 100644 --- a/broker/src/main/java/io/moquette/broker/InflightResender.java +++ b/broker/src/main/java/io/moquette/broker/InflightResender.java @@ -55,7 +55,7 @@ public void run() { // Writer is idle - set a new timeout and notify the callback. resenderTimeout = ctx.executor().schedule(this, resenderTimeNanos, TimeUnit.NANOSECONDS); try { - resendNotAcked(ctx/* , event */); + resendNotAcked(ctx); } catch (Throwable t) { ctx.fireExceptionCaught(t); } diff --git a/broker/src/main/java/io/moquette/broker/LimitedQuota.java b/broker/src/main/java/io/moquette/broker/LimitedQuota.java index 4f556ee05..32931fdcc 100644 --- a/broker/src/main/java/io/moquette/broker/LimitedQuota.java +++ b/broker/src/main/java/io/moquette/broker/LimitedQuota.java @@ -18,8 +18,10 @@ package io.moquette.broker; +import io.moquette.BrokerConstants; + class LimitedQuota implements Quota { - private int receiveMaximum; + private final int receiveMaximum; private int receivedQuota; public LimitedQuota(int receiveMaximum) { @@ -29,24 +31,24 @@ public LimitedQuota(int receiveMaximum) { @Override public boolean hasLimit() { - return true; + return receiveMaximum != BrokerConstants.RECEIVE_MAXIMUM; } @Override - public void decrement() { + public void consumeSlot() { assert receivedQuota > 0; receivedQuota--; } @Override - public void increment() { + public void releaseSlot() { receivedQuota++; assert receivedQuota <= receiveMaximum; } @Override - public boolean isConsumed() { - return receivedQuota == 0; + public boolean hasFreeSlots() { + return receivedQuota != 0; } @Override @@ -54,8 +56,13 @@ public int getMaximum() { return receiveMaximum; } + @Override + public int availableSlots() { + return receivedQuota; + } + @Override public String toString() { - return "limited quota to " + receivedQuota; + return "limited quota to " + receivedQuota + " max slots: " + receiveMaximum; } } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index ceef48ff1..7043840e7 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLPeerUnverifiedException; -import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE; import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; @@ -73,6 +72,7 @@ final class MQTTConnection { private Session bindedSession; private int protocolVersion; private Quota receivedQuota; + private Quota sendQuota; MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator, SessionRegistry sessionRegistry, PostOffice postOffice) { @@ -167,11 +167,11 @@ private void processPubAck(MqttMessage msg) { * Factory method * */ public static Quota createQuota(int receiveMaximum) { - if (receiveMaximum == Integer.MAX_VALUE) { - return new UnlimitedQuota(); - } else { - return new LimitedQuota(receiveMaximum); - } + return new LimitedQuota(receiveMaximum); + } + + Quota sendQuota() { + return sendQuota; } PostOffice.RouteResult processConnect(MqttConnectMessage msg) { @@ -190,7 +190,7 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { } final boolean cleanSession = msg.variableHeader().isCleanSession(); final boolean serverGeneratedClientId; - if (clientId == null || clientId.length() == 0) { + if (clientId == null || clientId.isEmpty()) { if (isNotProtocolVersion(msg, MqttVersion.MQTT_5)) { if (!brokerConfig.isAllowZeroByteClientId()) { LOG.info("Broker doesn't permit MQTT empty client ID. Username: {}", username); @@ -227,6 +227,8 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { } receivedQuota = createQuota(brokerConfig.receiveMaximum()); + sendQuota = retrieveSendQuota(msg); + final String sessionId = clientId; protocolVersion = msg.variableHeader().version(); return postOffice.routeCommand(clientId, "CONN", () -> { @@ -236,6 +238,28 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { }); } + private Quota retrieveSendQuota(MqttConnectMessage msg) { + if (isProtocolVersion(msg, MqttVersion.MQTT_3_1) || isProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) { + // for protocol versions that didn't define explicit + // the receiver maximum and without specification of flow control + // define one by the default. + return createQuota(BrokerConstants.INFLIGHT_WINDOW_SIZE); + } + + MqttProperties.IntegerProperty receiveMaximumProperty = (MqttProperties.IntegerProperty) msg.variableHeader() + .properties() + .getProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value()); + if (receiveMaximumProperty == null) { + return createQuota(BrokerConstants.RECEIVE_MAXIMUM); + } + return createQuota(receiveMaximumProperty.value()); + } + + // only for test + protected void assignSendQuota(Quota quota) { + this.sendQuota = quota; + } + private void checkMatchSessionLoop(String clientId) { if (!sessionLoopDebug) { return; @@ -342,6 +366,7 @@ public void operationComplete(ChannelFuture future) throws Exception { LOG.trace("dispatch connection: {}", msg); } } else { + // here the session should still be in CONNECTING state sessionRegistry.connectionClosed(bindedSession); LOG.error("CONNACK send failed, cleanup session and close the connection", future.cause()); channel.close(); @@ -395,7 +420,6 @@ private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverG builder .sessionExpiryInterval(BrokerConstants.INFINITE_SESSION_EXPIRY) - .receiveMaximum(INFLIGHT_WINDOW_SIZE) .retainAvailable(true) .wildcardSubscriptionAvailable(true) .subscriptionIdentifiersAvailable(true) @@ -670,8 +694,8 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { return null; }).ifFailed(msg::release); case AT_LEAST_ONCE: - if (receivedQuota.isConsumed()) { - LOG.info("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota); + if (!receivedQuota.hasFreeSlots()) { + LOG.warn("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota); brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED); disconnectSession(); dropConnection(); @@ -682,16 +706,16 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { checkMatchSessionLoop(clientId); if (!isBoundToSession()) return null; - receivedQuota.decrement(); + receivedQuota.consumeSlot(); postOffice.receivedPublishQos1(this, username, messageID, msg, expiry) .completableFuture().thenRun(() -> { - receivedQuota.increment(); + receivedQuota.releaseSlot(); }); return null; }).ifFailed(msg::release); case EXACTLY_ONCE: { - if (receivedQuota.isConsumed()) { - LOG.info("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota); + if (!receivedQuota.hasFreeSlots()) { + LOG.warn("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota); brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED); disconnectSession(); dropConnection(); @@ -703,7 +727,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { if (!isBoundToSession()) return null; bindedSession.receivedPublishQos2(messageID, msg); - receivedQuota.decrement(); + receivedQuota.consumeSlot(); return null; }); if (!firstStepResult.isSuccess()) { @@ -761,7 +785,7 @@ private void processPubRel(MqttMessage msg) { checkMatchSessionLoop(clientID); executePubRel(messageID); // increment send quota after send PUBCOMP to the client - receivedQuota.increment(); + receivedQuota.releaseSlot(); return null; }); } diff --git a/broker/src/main/java/io/moquette/broker/Quota.java b/broker/src/main/java/io/moquette/broker/Quota.java index 3c8f836da..fcfd99e12 100644 --- a/broker/src/main/java/io/moquette/broker/Quota.java +++ b/broker/src/main/java/io/moquette/broker/Quota.java @@ -22,11 +22,13 @@ interface Quota { boolean hasLimit(); - void decrement(); + void consumeSlot(); - void increment(); + void releaseSlot(); - boolean isConsumed(); + boolean hasFreeSlots(); int getMaximum(); + + int availableSlots(); } diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index df631e63f..bd979dfd4 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -16,7 +16,7 @@ package io.moquette.broker; import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS; -import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; + import io.moquette.broker.SessionRegistry.EnqueuedMessage; import io.moquette.broker.SessionRegistry.PublishedMessage; import io.moquette.broker.subscriptions.Subscription; @@ -33,7 +33,6 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -44,6 +43,7 @@ class Session { // session that doesn't expire, it's ~68 years. static final int INFINITE_EXPIRY = Integer.MAX_VALUE; private final boolean resendInflightOnTimeout; + private Collection nonAckPacketIds; static class InFlightPacket implements Delayed { @@ -87,8 +87,8 @@ enum SessionStatus { // used only in MQTT3 where resends are done on timeout of ACKs. private final DelayQueue inflightTimeouts = new DelayQueue<>(); private final Map qos2Receiving = new HashMap<>(); - private final AtomicInteger inflightSlots = new AtomicInteger(INFLIGHT_WINDOW_SIZE); // this should be configurable private ISessionsRepository.SessionData data; + private boolean resendingNonAcked = false; Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue sessionQueue) { if (sessionQueue == null) { @@ -214,7 +214,11 @@ public void processPubRec(int pubRecPacketId) { MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId); mqttConnection.sendIfWritableElseDrop(pubRel); - drainQueueToConnection(); + if (resendingNonAcked) { + resendInflightNotAcked(); + } else { + drainQueueToConnection(); + } } public void processPubComp(int messageID) { @@ -222,11 +226,11 @@ public void processPubComp(int messageID) { cleanFromInflight(messageID); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); if (removed == null) { - LOG.warn("Received a PUBCOMP with not matching packetId"); + LOG.warn("Received a PUBCOMP with not matching packetId in the inflight cache"); return; } removed.release(); - inflightSlots.incrementAndGet(); + mqttConnection.sendQuota().releaseSlot(); drainQueueToConnection(); // TODO notify the interceptor @@ -301,7 +305,7 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect publishRequest.retain(); if (canSkipQueue(localMqttConnectionRef)) { - inflightSlots.decrementAndGet(); + mqttConnection.sendQuota().consumeSlot(); int packetId = localMqttConnectionRef.nextPacketId(); LOG.debug("Adding into inflight for session {} at QoS {}", getClientID(), publishRequest.getPublishingQos()); @@ -310,7 +314,7 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect // If there already was something, release it. if (old != null) { old.release(); - inflightSlots.incrementAndGet(); + mqttConnection.sendQuota().releaseSlot(); } if (resendInflightOnTimeout) { inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); @@ -332,13 +336,13 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect private boolean canSkipQueue(MQTTConnection localMqttConnectionRef) { return localMqttConnectionRef != null && sessionQueue.isEmpty() && - inflightSlots.get() > 0 && + mqttConnection.sendQuota().hasFreeSlots() && connected() && localMqttConnectionRef.channel.isWritable(); } private boolean inflightHasSlotsAndConnectionIsUp() { - return inflightSlots.get() > 0 && + return mqttConnection.sendQuota().hasFreeSlots() && connected() && mqttConnection.channel.isWritable(); } @@ -348,14 +352,19 @@ void pubAckReceived(int ackPacketId) { cleanFromInflight(ackPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); if (removed == null) { - LOG.warn("Received a PUBACK with not matching packetId"); + LOG.warn("Received a PUBACK with not matching packetId({}) in the inflight cache({})", + ackPacketId, inflightWindow.keySet()); return; } removed.release(); - inflightSlots.incrementAndGet(); + mqttConnection.sendQuota().releaseSlot(); LOG.debug("Received PUBACK {} for session {}", ackPacketId, getClientID()); - drainQueueToConnection(); + if (resendingNonAcked) { + resendInflightNotAcked(); + } else { + drainQueueToConnection(); + } } private void cleanFromInflight(int ackPacketId) { @@ -367,21 +376,45 @@ public void flushAllQueuedMessages() { } public void resendInflightNotAcked() { - Collection nonAckPacketIds; - if (resendInflightOnTimeout) { - // MQTT3 behavior, resend on timeout - Collection expired = new ArrayList<>(INFLIGHT_WINDOW_SIZE); - inflightTimeouts.drainTo(expired); - nonAckPacketIds = expired.stream().map(p -> p.packetId).collect(Collectors.toList()); + if (!resendingNonAcked) { + if (resendInflightOnTimeout) { + // MQTT3 behavior, resend on timeout + Collection expired = new ArrayList<>(); + inflightTimeouts.drainTo(expired); + nonAckPacketIds = expired.stream().map(p -> p.packetId).collect(Collectors.toList()); + } else { + // MQTT5 behavior resend only not acked present in reopened session. + // need a copy else removing from the nonAckPacketIds would remove also from inflightWindow + nonAckPacketIds = new ArrayList<>(inflightWindow.keySet()); + } + + debugLogPacketIds(nonAckPacketIds); + } + + if (nonAckPacketIds.size() > mqttConnection.sendQuota().availableSlots()) { + // Send quota is smaller than the inflight messages to resend, split it. + // Next partition will be sent on PUBREC or PUBACK reception to continue flushing the in flight. + resendingNonAcked = true; + List partition = nonAckPacketIds.stream() + .limit(mqttConnection.sendQuota().availableSlots()) + .collect(Collectors.toList()); + + resendNonAckedIdsPartition(partition); + + // clean up the partition sent + for (Integer id : partition) { + nonAckPacketIds.remove(id); + } } else { - // MQTT5 behavior resend only not acked present in reopened session. - nonAckPacketIds = inflightWindow.keySet(); + resendNonAckedIdsPartition(nonAckPacketIds); + resendingNonAcked = false; } - debugLogPacketIds(nonAckPacketIds); + } - for (Integer notAckPacketId : nonAckPacketIds) { - final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId); + private void resendNonAckedIdsPartition(Collection packetIdsToResend) { + for (Integer notAckPacketId : packetIdsToResend) { + final EnqueuedMessage msg = inflightWindow.get(notAckPacketId); if (msg == null) { // Already acked... continue; @@ -393,7 +426,7 @@ public void resendInflightNotAcked() { } mqttConnection.sendIfWritableElseDrop(pubRel); } else { - final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg; + final PublishedMessage pubMsg = (PublishedMessage) msg; final Topic topic = pubMsg.topic; final MqttQoS qos = pubMsg.publishingQos; final ByteBuf payload = pubMsg.payload; @@ -405,6 +438,8 @@ public void resendInflightNotAcked() { inflightTimeouts.add(new InFlightPacket(notAckPacketId, FLIGHT_BEFORE_RESEND_MS)); } mqttConnection.sendPublish(publishMsg); + + mqttConnection.sendQuota().consumeSlot(); } } } @@ -435,14 +470,14 @@ private void drainQueueToConnection() { return; } - inflightSlots.decrementAndGet(); + mqttConnection.sendQuota().consumeSlot(); int sendPacketId = mqttConnection.nextPacketId(); // Putting it in a map, but the retain is cancelled out by the below release. EnqueuedMessage old = inflightWindow.put(sendPacketId, msg); if (old != null) { old.release(); - inflightSlots.incrementAndGet(); + mqttConnection.sendQuota().releaseSlot(); } if (resendInflightOnTimeout) { inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS)); @@ -470,8 +505,12 @@ public void reconnectSession() { LOG.trace("Republishing all saved messages for session {}", this); resendInflightNotAcked(); - // send queued messages while offline - drainQueueToConnection(); + if (!resendingNonAcked) { + // if resend of inflight is bigger than send quota, till it's finished + // do not drain the queue. + // send queued messages while offline + drainQueueToConnection(); + } } public void receivedPublishQos2(int messageID, MqttPublishMessage msg) { @@ -529,7 +568,7 @@ public String toString() { "clientId='" + data.clientId() + '\'' + ", clean=" + clean + ", status=" + status + - ", inflightSlots=" + inflightSlots + + ", inflightSlots=" + mqttConnection.sendQuota().availableSlots() + '}'; } } diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index e6520971d..b1049a7da 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; +import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; import static io.moquette.broker.Session.INFINITE_EXPIRY; public class SessionRegistry { diff --git a/broker/src/main/java/io/moquette/broker/UnlimitedQuota.java b/broker/src/main/java/io/moquette/broker/UnlimitedQuota.java deleted file mode 100644 index 42ec1f259..000000000 --- a/broker/src/main/java/io/moquette/broker/UnlimitedQuota.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright (c) 2012-2024 The original author or authors - * ------------------------------------------------------ - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - * - */ - -package io.moquette.broker; - -class UnlimitedQuota implements Quota { - - @Override - public boolean hasLimit() { - return false; - } - - @Override - public void decrement() { - // do nothing for unlimited quota - } - - @Override - public void increment() { - // do nothing for unlimited quota - } - - @Override - public boolean isConsumed() { - return false; - } - - @Override - public int getMaximum() { - return Integer.MAX_VALUE; - } - - @Override - public String toString() { - return "unlimited quota"; - } -} diff --git a/broker/src/test/java/io/moquette/broker/SessionTest.java b/broker/src/test/java/io/moquette/broker/SessionTest.java index cf512c67d..99fc6b229 100644 --- a/broker/src/test/java/io/moquette/broker/SessionTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionTest.java @@ -11,7 +11,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS; import io.moquette.broker.subscriptions.Subscription; import java.time.Clock; @@ -19,10 +18,10 @@ import java.util.Arrays; import org.assertj.core.api.Assertions; +import static io.moquette.BrokerConstants.*; import static io.moquette.broker.Session.INFINITE_EXPIRY; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; public class SessionTest { @@ -128,6 +127,7 @@ public void testRemoveSubscription() { private void createConnection(Session client) { BrokerConfiguration brokerConfiguration = new BrokerConfiguration(true, false, false, NO_BUFFER_FLUSH); MQTTConnection mqttConnection = new MQTTConnection(testChannel, brokerConfiguration, null, null, null); + mqttConnection.assignSendQuota(new LimitedQuota(INFLIGHT_WINDOW_SIZE)); client.markConnecting(); client.bind(mqttConnection); client.completeConnection(); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index e18709f39..8dd989d01 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -1,18 +1,30 @@ package io.moquette.integration.mqtt5; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import io.moquette.BrokerConstants; import io.moquette.testclient.Client; -import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.*; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.nio.charset.StandardCharsets; +import java.time.Duration; + import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted; +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public abstract class AbstractServerIntegrationTest extends AbstractServerIntegrationWithoutClientFixture { Client lowLevelClient; + static void sleepSeconds(int secondsInterval) throws InterruptedException { + Thread.sleep(Duration.ofSeconds(secondsInterval).toMillis()); + } + @NotNull Mqtt5BlockingClient createSubscriberClient() { String clientId = clientName(); @@ -40,7 +52,33 @@ void connectLowLevel() { } void connectLowLevel(int keepAliveSecs) { - MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs); + MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE); assertConnectionAccepted(connAck, "Connection must be accepted"); } + + protected void consumesPublishesInflightWindow(int inflightWindowSize) throws InterruptedException { + for (int i = 0; i < inflightWindowSize; i++) { + consumePendingPublishAndAcknowledge(Integer.toString(i)); + } + } + + protected void consumePendingPublishAndAcknowledge(String expectedPayload) throws InterruptedException { + MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(20000)); + assertNotNull(mqttMessage, "A message MUST be received"); + + assertEquals(MqttMessageType.PUBLISH, mqttMessage.fixedHeader().messageType(), "Message received should MqttPublishMessage"); + MqttPublishMessage publish = (MqttPublishMessage) mqttMessage; + assertEquals(expectedPayload, publish.payload().toString(StandardCharsets.UTF_8)); + int packetId = publish.variableHeader().packetId(); + assertTrue(publish.release(), "Reference of publish should be released"); + + acknowledge(packetId); + } + + private void acknowledge(int packetId) { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, + false, 0); + MqttPubAckMessage pubAck = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId)); + lowLevelClient.sendMessage(pubAck); + } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java index 24be72386..12b3479c1 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java @@ -45,7 +45,6 @@ private void verifyNotSet(MqttPropertyType propertyType, MqttProperties props, S public void testAckResponseProperties() { final MqttProperties ackProps = connAck.variableHeader().properties(); verifyProperty(MqttPropertyType.SESSION_EXPIRY_INTERVAL, ackProps, BrokerConstants.INFINITE_SESSION_EXPIRY, "Session expiry is infinite"); - verifyProperty(MqttPropertyType.RECEIVE_MAXIMUM, ackProps, INFLIGHT_WINDOW_SIZE, "Receive maximum property must equals flight window size"); verifyNotSet(MqttPropertyType.MAXIMUM_QOS, ackProps, "Maximum QoS is not set => QoS 2 ready"); verifyProperty(MqttPropertyType.RETAIN_AVAILABLE, ackProps, 1, "Retain feature is available"); verifyNotSet(MqttPropertyType.MAXIMUM_PACKET_SIZE, ackProps, "Maximum packet size is the one defined by specs"); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java index 78f9f129e..7df84f543 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/FlowControlTest.java @@ -18,6 +18,8 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import io.moquette.broker.config.FluentConfig; import io.moquette.broker.config.IConfig; import io.moquette.testclient.Client; @@ -28,11 +30,13 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class FlowControlTest extends AbstractServerIntegrationTest { @@ -97,4 +101,87 @@ private void sendQoS2Publish() { public String clientName() { return "sender"; } + + + @Test + public void givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpassedThenTheServerEnqueueAndDontFloodTheClient() throws InterruptedException { + connectLowLevel(); + + // subscribe with an identifier + MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", + MqttQoS.AT_LEAST_ONCE, 123); + verifyOfType(received, MqttMessageType.SUBACK); + + //lowlevel client doesn't ACK any pub, so the in flight window fills up + Mqtt5BlockingClient publisher = createPublisherClient(); + int inflightWindowSize = 10; + // fill the in flight window so that messages starts to be enqueued + fillInFlightWindow(inflightWindowSize, publisher); + + // send another message, which is enqueued and has an expiry of messageExpiryInterval seconds + publisher.publishWith() + .topic("temperature/living") + .payload(("Enqueued").getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) // Broker enqueues only QoS1 and QoS2001 + .send(); + + // after sending more publish the receive maximum limit is not passed and the connection remain open + assertTrue(lowLevelClient.isConnected()); + + // now subscriber consumes messages, shouldn't receive any message in the form "Enqueued-" + consumesPublishesInflightWindow(inflightWindowSize); + + MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(100)); + assertNotNull(mqttMessage, "A message MUST be received"); + + assertEquals(MqttMessageType.PUBLISH, mqttMessage.fixedHeader().messageType(), "Message received should MqttPublishMessage"); + MqttPublishMessage publish = (MqttPublishMessage) mqttMessage; + assertEquals("Enqueued", publish.payload().toString(StandardCharsets.UTF_8)); + assertTrue(publish.release(), "Reference of publish should be released"); + } + + private static void fillInFlightWindow(int numPublishToSend, Mqtt5BlockingClient publisher) { + for (int i = 0; i < numPublishToSend; i++) { + publisher.publishWith() + .topic("temperature/living") + .payload(Integer.toString(i).getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + } + } + + @Test + public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose() throws InterruptedException { + // connect subscriber and published + // publisher send 20 events, 10 should be in the inflight, 10 remains on the queue + connectLowLevel(); + + // subscribe with an identifier + MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", + MqttQoS.AT_LEAST_ONCE, 123); + verifyOfType(received, MqttMessageType.SUBACK); + + //lowlevel client doesn't ACK any pub, so the in flight window fills up + Mqtt5BlockingClient publisher = createPublisherClient(); + int inflightWindowSize = 10; + // fill the in flight window so that messages starts to be enqueued + fillInFlightWindow(inflightWindowSize + 10, publisher); + + System.out.println("Filled inflight and queue"); + + // disconnect subscriber + lowLevelClient.disconnect(); + lowLevelClient.close(); + + System.out.println("Closed old client, reconnecting"); + + // reconnect the subscriber with smaller received maximum + lowLevelClient = new Client("localhost").clientId(clientName()); + MqttConnAckMessage connAck = lowLevelClient.connectV5WithReceiveMaximum(5); + assertConnectionAccepted(connAck, "Connection must be re-accepted with smaller window size"); + System.out.println("Client reconnected second time"); + + // should receive all the 20 messages + consumesPublishesInflightWindow(inflightWindowSize + 10); + } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java index 20715f10b..14ab71198 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java @@ -147,21 +147,21 @@ public void givenPublishMessageWithExpiryWhenForwarderToSubscriberStillContainsT } @Test - public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanTheExipiryIsNotPublished() throws InterruptedException { - int messageExpiryInterval = 2; // seconds + public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThenExpiryValueHasToBeDeducedByTheTimeSpentInBroker() throws InterruptedException { + int messageExpiryInterval = 10; // seconds // avoid the keep alive period could disconnect - connectLowLevel(messageExpiryInterval * 2); + connectLowLevel((int) (messageExpiryInterval * 1.5)); // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", - MqttQoS.AT_LEAST_ONCE, 123); + MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS); verifyOfType(received, MqttMessageType.SUBACK); //lowlevel client doesn't ACK any pub, so the in flight window fills up Mqtt5BlockingClient publisher = createPublisherClient(); int inflightWindowSize = 10; // fill the in flight window so that messages starts to be enqueued - fillInFlightWindow(inflightWindowSize, publisher, messageExpiryInterval); + fillInFlightWindow(inflightWindowSize, publisher, Integer.MIN_VALUE); // send another message, which is enqueued and has an expiry of messageExpiryInterval seconds publisher.publishWith() @@ -172,31 +172,44 @@ public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanT .send(); // let time flow so that the message in queue passes its expiry time - Thread.sleep(Duration.ofSeconds(messageExpiryInterval + 1).toMillis()); + long sleepMillis = Duration.ofSeconds(messageExpiryInterval / 2).toMillis(); + Thread.sleep(sleepMillis); // now subscriber consumes messages, shouldn't receive any message in the form "Enqueued-" consumesPublishesInflightWindow(inflightWindowSize); + MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(1000)); - MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(100)); - assertNull(mqttMessage, "No other messages MUST be received after consuming the in flight window"); + assertNotNull(mqttMessage, "A publish out of the queue has to be received"); + assertEquals(MqttMessageType.PUBLISH, mqttMessage.fixedHeader().messageType(), "Expected a publish message"); + MqttPublishMessage publishMessage = (MqttPublishMessage) mqttMessage; + + // extract message expiry property + MqttProperties.MqttProperty expiryProp = publishMessage.variableHeader() + .properties() + .getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value()); + assertNotNull(expiryProp, "Publication expiry property can't be null"); + Integer expirySeconds = ((MqttProperties.IntegerProperty) expiryProp).value(); + + assertTrue(expirySeconds < messageExpiryInterval, "Publish's expiry has to be updated"); + assertTrue(publishMessage.release(), "Last reference of publish should be released"); } @Test - public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThenExpiryValueHasToBeDeducedByTheTimeSpentInBroker() throws InterruptedException { - int messageExpiryInterval = 10; // seconds + public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanTheExpiryIsNotPublished() throws InterruptedException { + int messageExpiryInterval = 2; // seconds // avoid the keep alive period could disconnect - connectLowLevel((int) (messageExpiryInterval * 1.5)); + connectLowLevel(messageExpiryInterval * 2); // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", - MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS); + MqttQoS.AT_LEAST_ONCE, 123); verifyOfType(received, MqttMessageType.SUBACK); //lowlevel client doesn't ACK any pub, so the in flight window fills up Mqtt5BlockingClient publisher = createPublisherClient(); int inflightWindowSize = 10; // fill the in flight window so that messages starts to be enqueued - fillInFlightWindow(inflightWindowSize, publisher, Integer.MIN_VALUE); + fillInFlightWindow(inflightWindowSize, publisher, messageExpiryInterval); // send another message, which is enqueued and has an expiry of messageExpiryInterval seconds publisher.publishWith() @@ -207,44 +220,13 @@ public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThe .send(); // let time flow so that the message in queue passes its expiry time - long sleepMillis = Duration.ofSeconds(messageExpiryInterval / 2).toMillis(); - Thread.sleep(sleepMillis); + sleepSeconds(messageExpiryInterval + 1); // now subscriber consumes messages, shouldn't receive any message in the form "Enqueued-" consumesPublishesInflightWindow(inflightWindowSize); - MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(1000)); - assertNotNull(mqttMessage, "A publish out of the queue has to be received"); - assertEquals(MqttMessageType.PUBLISH, mqttMessage.fixedHeader().messageType(), "Expected a publish message"); - MqttPublishMessage publishMessage = (MqttPublishMessage) mqttMessage; - - // extract message expiry property - MqttProperties.MqttProperty expiryProp = publishMessage.variableHeader() - .properties() - .getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value()); - assertNotNull(expiryProp, "Publication expiry property can't be null"); - Integer expirySeconds = ((MqttProperties.IntegerProperty) expiryProp).value(); - - assertTrue(expirySeconds < messageExpiryInterval, "Publish's expiry has to be updated"); - assertTrue(publishMessage.release(), "Last reference of publish should be released"); - } - - private void consumesPublishesInflightWindow(int inflightWindowSize) throws InterruptedException { - for (int i = 0; i < inflightWindowSize; i++) { - MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(20000)); - assertNotNull(mqttMessage, "A message MUST be received"); - - assertEquals(MqttMessageType.PUBLISH, mqttMessage.fixedHeader().messageType(), "Message received should MqttPublishMessage"); - MqttPublishMessage publish = (MqttPublishMessage) mqttMessage; - assertEquals(Integer.toString(i), publish.payload().toString(StandardCharsets.UTF_8)); - int packetId = publish.variableHeader().packetId(); - assertTrue(publish.release(), "Reference of publish should be released"); - - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, - false, 0); - MqttPubAckMessage pubAck = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId)); - lowLevelClient.sendMessage(pubAck); - } + MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(100)); + assertNull(mqttMessage, "No other messages MUST be received after consuming the in flight window"); } private static void fillInFlightWindow(int inflightWindowSize, Mqtt5BlockingClient publisher, int messageExpiryInterval) { diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index ba8e8427a..b406bcf38 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -139,19 +139,31 @@ public void connect() { } public MqttConnAckMessage connectV5() { - return connectV5(2); + return connectV5(2, BrokerConstants.INFLIGHT_WINDOW_SIZE); + } + + public MqttConnAckMessage connectV5WithReceiveMaximum(int receiveMaximumInflight) { + return connectV5(2, receiveMaximumInflight); } @NotNull - public MqttConnAckMessage connectV5(int keepAliveSecs) { + public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInflight) { final MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_5); if (clientId != null) { builder.clientId(clientId); } + + final MqttProperties connectProperties = new MqttProperties(); + MqttProperties.IntegerProperty receiveMaximum = new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value(), + receiveMaximumInflight); + connectProperties.add(receiveMaximum); + MqttConnectMessage connectMessage = builder .keepAlive(keepAliveSecs) // secs .willFlag(false) .willQoS(MqttQoS.AT_MOST_ONCE) + .properties(connectProperties) .build(); return doConnect(connectMessage); @@ -367,4 +379,8 @@ public void close() throws InterruptedException { } workerGroup.shutdownGracefully(); } + + public boolean isConnected() { + return m_channel.isActive(); + } }