Skip to content

Commit

Permalink
Added test to verify that on reconnect with smaller quota the resendi…
Browse files Browse the repository at this point in the history
…ng of in flight messages resume correctly without any lose
  • Loading branch information
andsel committed Oct 12, 2024
1 parent 5bdceb5 commit d961d43
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 25 deletions.
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/LimitedQuota.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public int getMaximum() {

@Override
public int availableSlots() {
return receiveMaximum - receivedQuota;
return receivedQuota;
}

@Override
Expand Down
76 changes: 58 additions & 18 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Session {
// session that doesn't expire, it's ~68 years.
static final int INFINITE_EXPIRY = Integer.MAX_VALUE;
private final boolean resendInflightOnTimeout;
private Collection<Integer> nonAckPacketIds;

static class InFlightPacket implements Delayed {

Expand Down Expand Up @@ -87,6 +88,7 @@ enum SessionStatus {
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private final Map<Integer, MqttPublishMessage> qos2Receiving = new HashMap<>();
private ISessionsRepository.SessionData data;
private boolean resendingNonAcked = false;

Session(ISessionsRepository.SessionData data, boolean clean, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
if (sessionQueue == null) {
Expand Down Expand Up @@ -212,7 +214,11 @@ public void processPubRec(int pubRecPacketId) {
MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);

drainQueueToConnection();
if (resendingNonAcked) {
resendInflightNotAcked();
} else {
drainQueueToConnection();
}
}

public void processPubComp(int messageID) {
Expand Down Expand Up @@ -346,14 +352,19 @@ void pubAckReceived(int ackPacketId) {
cleanFromInflight(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed == null) {
LOG.warn("Received a PUBACK with not matching packetId in the inflight cache");
LOG.warn("Received a PUBACK with not matching packetId({}) in the inflight cache({})",
ackPacketId, inflightWindow.keySet());
return;
}
removed.release();

mqttConnection.sendQuota().releaseSlot();
LOG.debug("Received PUBACK {} for session {}", ackPacketId, getClientID());
drainQueueToConnection();
if (resendingNonAcked) {
resendInflightNotAcked();
} else {
drainQueueToConnection();
}
}

private void cleanFromInflight(int ackPacketId) {
Expand All @@ -365,22 +376,45 @@ public void flushAllQueuedMessages() {
}

public void resendInflightNotAcked() {
Collection<Integer> nonAckPacketIds;
if (resendInflightOnTimeout) {
// MQTT3 behavior, resend on timeout
Collection<InFlightPacket> expired = new ArrayList<>();
inflightTimeouts.drainTo(expired);
nonAckPacketIds = expired.stream().map(p -> p.packetId).collect(Collectors.toList());
if (!resendingNonAcked) {
if (resendInflightOnTimeout) {
// MQTT3 behavior, resend on timeout
Collection<InFlightPacket> expired = new ArrayList<>();
inflightTimeouts.drainTo(expired);
nonAckPacketIds = expired.stream().map(p -> p.packetId).collect(Collectors.toList());
} else {
// MQTT5 behavior resend only not acked present in reopened session.
// need a copy else removing from the nonAckPacketIds would remove also from inflightWindow
nonAckPacketIds = new ArrayList<>(inflightWindow.keySet());
}

debugLogPacketIds(nonAckPacketIds);
}

// TODO rework here because send quota max can be lower than the inflight window width!
if (nonAckPacketIds.size() > mqttConnection.sendQuota().availableSlots()) {
// send quota is smaller than the inflight messages to resend, split it
resendingNonAcked = true;
List<Integer> partition = nonAckPacketIds.stream()
.limit(mqttConnection.sendQuota().availableSlots())
.collect(Collectors.toList());

resendNonAckedIdsPartition(partition);

// clean up the partition sent
for (Integer id : partition) {
nonAckPacketIds.remove(id);
}
} else {
// MQTT5 behavior resend only not acked present in reopened session.
nonAckPacketIds = inflightWindow.keySet();
resendNonAckedIdsPartition(nonAckPacketIds);
resendingNonAcked = false;
}

debugLogPacketIds(nonAckPacketIds);
}

// TODO rework here because send quota max can be lower than the inflight window width!
for (Integer notAckPacketId : nonAckPacketIds) {
final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId);
private void resendNonAckedIdsPartition(Collection<Integer> packetIdsToResend) {
for (Integer notAckPacketId : packetIdsToResend) {
final EnqueuedMessage msg = inflightWindow.get(notAckPacketId);
if (msg == null) {
// Already acked...
continue;
Expand All @@ -392,7 +426,7 @@ public void resendInflightNotAcked() {
}
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg;
final PublishedMessage pubMsg = (PublishedMessage) msg;
final Topic topic = pubMsg.topic;
final MqttQoS qos = pubMsg.publishingQos;
final ByteBuf payload = pubMsg.payload;
Expand All @@ -404,6 +438,8 @@ public void resendInflightNotAcked() {
inflightTimeouts.add(new InFlightPacket(notAckPacketId, FLIGHT_BEFORE_RESEND_MS));
}
mqttConnection.sendPublish(publishMsg);

mqttConnection.sendQuota().consumeSlot();
}
}
}
Expand Down Expand Up @@ -469,8 +505,12 @@ public void reconnectSession() {
LOG.trace("Republishing all saved messages for session {}", this);
resendInflightNotAcked();

// send queued messages while offline
drainQueueToConnection();
if (!resendingNonAcked) {
// if resend of inflight is bigger than send quota, till it's finished
// do not drain the queue.
// send queued messages while offline
drainQueueToConnection();
}
}

public void receivedPublishQos2(int messageID, MqttPublishMessage msg) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import io.moquette.BrokerConstants;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.*;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -51,7 +52,7 @@ void connectLowLevel() {
}

void connectLowLevel(int keepAliveSecs) {
MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs);
MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE);
assertConnectionAccepted(connAck, "Connection must be accepted");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,48 @@ public void givenClientConnectedWithCertainReceiveMaximumWhenInFlightSizeIsSurpa
assertTrue(publish.release(), "Reference of publish should be released");
}

private static void fillInFlightWindow(int inflightWindowSize, Mqtt5BlockingClient publisher) {
for (int i = 0; i < inflightWindowSize; i++) {
private static void fillInFlightWindow(int numPublishToSend, Mqtt5BlockingClient publisher) {
for (int i = 0; i < numPublishToSend; i++) {
publisher.publishWith()
.topic("temperature/living")
.payload(Integer.toString(i).getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.send();
}
}

@Test
public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectlyTheFullListOfPendingMessagesWithoutAnyLose() throws InterruptedException {
// connect subscriber and published
// publisher send 20 events, 10 should be in the inflight, 10 remains on the queue
connectLowLevel();

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123);
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Mqtt5BlockingClient publisher = createPublisherClient();
int inflightWindowSize = 10;
// fill the in flight window so that messages starts to be enqueued
fillInFlightWindow(inflightWindowSize + 10, publisher);

System.out.println("Filled inflight and queue");

// disconnect subscriber
lowLevelClient.disconnect();
lowLevelClient.close();

System.out.println("Closed old client, reconnecting");

// reconnect the subscriber with smaller received maximum
lowLevelClient = new Client("localhost").clientId(clientName());
MqttConnAckMessage connAck = lowLevelClient.connectV5WithReceiveMaximum(5);
assertConnectionAccepted(connAck, "Connection must be re-accepted with smaller window size");
System.out.println("Client reconnected second time");

// should receive all the 20 messages
consumesPublishesInflightWindow(inflightWindowSize + 10);
}
}
10 changes: 7 additions & 3 deletions broker/src/test/java/io/moquette/testclient/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,15 @@ public void connect() {
}

public MqttConnAckMessage connectV5() {
return connectV5(2);
return connectV5(2, BrokerConstants.INFLIGHT_WINDOW_SIZE);
}

public MqttConnAckMessage connectV5WithReceiveMaximum(int receiveMaximumInflight) {
return connectV5(2, receiveMaximumInflight);
}

@NotNull
public MqttConnAckMessage connectV5(int keepAliveSecs) {
public MqttConnAckMessage connectV5(int keepAliveSecs, int receiveMaximumInflight) {
final MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_5);
if (clientId != null) {
builder.clientId(clientId);
Expand All @@ -152,7 +156,7 @@ public MqttConnAckMessage connectV5(int keepAliveSecs) {
final MqttProperties connectProperties = new MqttProperties();
MqttProperties.IntegerProperty receiveMaximum = new MqttProperties.IntegerProperty(
MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value(),
BrokerConstants.INFLIGHT_WINDOW_SIZE);
receiveMaximumInflight);
connectProperties.add(receiveMaximum);

MqttConnectMessage connectMessage = builder
Expand Down

0 comments on commit d961d43

Please sign in to comment.