Skip to content

Commit

Permalink
Created model class for subscription identifier and implemented decod…
Browse files Browse the repository at this point in the history
…e and verification in SUBSCRIBE
  • Loading branch information
andsel committed Dec 30, 2023
1 parent 3baad12 commit 88ce042
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 34 deletions.
3 changes: 2 additions & 1 deletion broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverG
.receiveMaximum(INFLIGHT_WINDOW_SIZE)
.retainAvailable(true)
.wildcardSubscriptionAvailable(true)
.subscriptionIdentifiersAvailable(false)
.subscriptionIdentifiersAvailable(false) // TODO make it true
.sharedSubscriptionAvailable(true);
return builder;
}
Expand Down Expand Up @@ -858,6 +858,7 @@ void brokerDisconnect() {
}

void brokerDisconnect(MqttReasonCodes.Disconnect reasonCode) {
this.connected = false;
final MqttMessage disconnectMsg = MqttMessageBuilders.disconnect()
.reasonCode(reasonCode.byteValue())
.build();
Expand Down
60 changes: 38 additions & 22 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,17 @@
package io.moquette.broker;

import io.moquette.broker.scheduler.ScheduledExpirationService;
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.*;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -302,6 +284,7 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
final Session session = sessionRegistry.retrieve(clientID);

final List<SharedSubscriptionData> sharedSubscriptions;
final Optional<SubscriptionIdentifier> subscritionIdOpt;

if (mqttConnection.isProtocolVersion5()) {
sharedSubscriptions = msg.payload().topicSubscriptions().stream()
Expand All @@ -318,8 +301,16 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
session.disconnectFromBroker();
return;
}

try {
subscritionIdOpt = verifyAndExtractMessageIdentifier(msg);
} catch (IllegalArgumentException ex) {
session.disconnectFromBroker();
return;
}
} else {
sharedSubscriptions = Collections.emptyList();
subscritionIdOpt = Optional.empty();
}

List<MqttTopicSubscription> ackTopics;
Expand All @@ -340,7 +331,11 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}).collect(Collectors.toList());

for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos());
if (subscritionIdOpt.isPresent()) {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos(), subscritionIdOpt.get());
} else {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos());
}
}

for (SharedSubscriptionData sharedSubData : sharedSubscriptions) {
Expand All @@ -360,6 +355,27 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}

private static Optional<SubscriptionIdentifier> verifyAndExtractMessageIdentifier(MqttSubscribeMessage msg) {
final List<MqttProperties.MqttProperty<Integer>> subscriptionIdentifierProperties =
(List<MqttProperties.MqttProperty<Integer>>) msg.idAndPropertiesVariableHeader().properties()
.getProperties(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value());
if (subscriptionIdentifierProperties.size() > 1) {
// more than 1 SUBSCRIPTION_IDENTIFIER property during subscribe is a protocol error
LOG.warn("Received a Subscribe with more than one subscription identifier property ({})", subscriptionIdentifierProperties.size());
throw new IllegalArgumentException("More than one subscription identifier properties");
}
if (subscriptionIdentifierProperties.isEmpty()) {
return Optional.empty();
}
Integer value = subscriptionIdentifierProperties.iterator().next().value();
try {
return Optional.of(new SubscriptionIdentifier(value));
} catch (IllegalArgumentException ex) {
LOG.warn("Received a Subscribe with SubscriptionIdentifier value {} out of range 1..268435455", value);
throw ex;
}
}

private void publishRetainedMessagesForSubscriptions(String clientID, List<Subscription> newSubscriptions) {
Session targetSession = this.sessionRegistry.retrieve(clientID);
for (Subscription subscription : newSubscriptions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.moquette.broker.subscriptions;

/**
* Models the subscription identifier for MQTT5 Subscription.
* */
public final class SubscriptionIdentifier {
private final int subscriptionId;

public SubscriptionIdentifier(int value) {
if (value <= 0 || value > 268435455) {
throw new IllegalArgumentException("Value MUST be > 0 and <= 268435455");
}
subscriptionId = value;
}

public int value() {
return subscriptionId;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.mqtt.*;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -40,13 +42,11 @@
import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH;
import static io.moquette.broker.PostOfficePublishTest.ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID;
import static io.moquette.broker.PostOfficePublishTest.SUBSCRIBER_ID;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -114,14 +114,14 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, sut);
}

protected void connect() {
MqttConnectMessage connectMessage = MqttMessageBuilders.connect()
.clientId(FAKE_CLIENT_ID)
.build();
connection.processConnect(connectMessage);
MqttConnAckMessage connAck = channel.readOutbound();
assertEquals(CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Connect must be accepted");
}
// protected void connect() {
// MqttConnectMessage connectMessage = MqttMessageBuilders.connect()
// .clientId(FAKE_CLIENT_ID)
// .build();
// connection.processConnect(connectMessage);
// MqttConnAckMessage connAck = channel.readOutbound();
// assertEquals(CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Connect must be accepted");
// }

@Test
public void testSubscribe() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -357,4 +357,62 @@ public void testExtractFilterFromShared() {
assertEquals("measures/+/1", SharedSubscriptionUtils.extractFilterFromShared("$share/myShared/measures/+/1"));
assertEquals("measures/+/1", SharedSubscriptionUtils.extractFilterFromShared("$share/#/measures/+/1"));
}

@Test
public void givenConnectedMQTT5ClientWhenSubscribeWithMultipleSubscriptionIdentifiersThenTheSessionIsDisconnected() throws ExecutionException, InterruptedException {
connectAsMqtt5(connection);

MqttSubscribeMessage subscribe = createSubscribeWithSubscriptionIdentifiers(123, 456);

sut.subscribeClientToTopics(subscribe, FAKE_CLIENT_ID, "", connection);

Assertions.assertAll("Session and connection move in close state",
() -> {
MqttMessage disconnect = channel.readOutbound();
assertEquals(MqttReasonCodes.Disconnect.MALFORMED_PACKET.byteValue(), ((MqttReasonCodeAndPropertiesVariableHeader) disconnect.variableHeader()).reasonCode());
},
() -> assertFalse(channel.isActive()),
() -> assertFalse(connection.isConnected()));
}

@Test
public void givenConnectedMQTT5ClientWhenSubscribeWithInvalidSubscriptionIdentifierThenTheSessionIsDisconnected() throws ExecutionException, InterruptedException {
connectAsMqtt5(connection);

MqttSubscribeMessage subscribe = createSubscribeWithSubscriptionIdentifiers(-1);

sut.subscribeClientToTopics(subscribe, FAKE_CLIENT_ID, "", connection);

Assertions.assertAll("Session and connection move in close state",
() -> {
MqttMessage disconnect = channel.readOutbound();
assertEquals(MqttReasonCodes.Disconnect.MALFORMED_PACKET.byteValue(), ((MqttReasonCodeAndPropertiesVariableHeader) disconnect.variableHeader()).reasonCode());
},
() -> assertFalse(channel.isActive()),
() -> assertFalse(connection.isConnected()));
}

@NotNull
private static MqttSubscribeMessage createSubscribeWithSubscriptionIdentifiers(int... subscriptionIdentifiers) {
final MqttProperties subscribeProperties = new MqttProperties();
for (int subscriptionId : subscriptionIdentifiers) {
subscribeProperties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(), subscriptionId));
}

return MqttMessageBuilders.subscribe()
.addSubscription(AT_MOST_ONCE, NEWS_TOPIC)
.messageId(1)
.properties(subscribeProperties)
.build();
}

private void connectAsMqtt5(MQTTConnection connection) throws InterruptedException, ExecutionException {
MqttConnectMessage connectMessage = MqttMessageBuilders.connect()
.clientId(FAKE_CLIENT_ID)
.protocolVersion(MqttVersion.MQTT_5)
.build();

connection.processConnect(connectMessage).completableFuture().get();
ConnectionTestUtils.assertConnectAccepted((EmbeddedChannel) connection.channel);
}
}

0 comments on commit 88ce042

Please sign in to comment.