Skip to content

Commit

Permalink
Store subscription indentifier into subscriptions trie (#804)
Browse files Browse the repository at this point in the history
Spread the SubscriptionIdentifier of a subscription request down to the SubscriptionDirectory.
Updated the CNode 's addSubscriptionto update subscriptions stored also if the subscription identifier changes (by value, removing or adding int).
  • Loading branch information
andsel authored Jan 1, 2024
1 parent f7a64d1 commit 8480e22
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 20 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Version 0.18-SNAPSHOT:
[feature] subscription identifiers: (issue #801)
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)
[feature] shared subscriptions:
- Initial implementation of shared subscription subscribe and publish part. (#796)
- Added unsubscribe of shared subscriptions. (#799)
Expand Down
22 changes: 19 additions & 3 deletions broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private List<Subscription> sharedSubscriptions() {
// select a subscription randomly
int randIdx = SECURE_RANDOM.nextInt(list.size());
SharedSubscription sub = list.get(randIdx);
selectedSubscriptions.add(new Subscription(sub.clientId(), sub.topicFilter(), sub.requestedQoS(), shareName));
selectedSubscriptions.add(sub.createSubscription());
}
return selectedSubscriptions;
}
Expand Down Expand Up @@ -144,17 +144,33 @@ CNode addSubscription(SubscriptionRequest request) {
if (idx >= 0) {
// Subscription already exists
final Subscription existing = subscriptions.get(idx);
if (existing.getRequestedQos().value() < newSubscription.getRequestedQos().value()) {
if (needsToUpdateExistingSubscription(newSubscription, existing)) {
subscriptions.set(idx, newSubscription);
}
} else {
// insert into the expected index so that the sorting is maintained
this.subscriptions.add(-1 - idx, new Subscription(newSubscription));
this.subscriptions.add(-1 - idx, newSubscription);
}
}
return this;
}

private static boolean needsToUpdateExistingSubscription(Subscription newSubscription, Subscription existing) {
if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) &&
newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())
) {
// if subscription identifier hasn't changed,
// then check QoS but don't lower the requested QoS level
return existing.getRequestedQos().value() < newSubscription.getRequestedQos().value();
}

// subscription identifier changed
// TODO need to understand if requestedQoS has to be also replaced or not, if not
// the existing QoS has to be copied. This to avoid that a subscription identifier
// change silently break the rule of existing qos never lowered.
return true;
}

/**
* @return true iff the subscriptions contained in this node are owned by clientId
* AND at least one subscription is actually present for that clientId
Expand Down
44 changes: 40 additions & 4 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

public class CTrie {

Expand All @@ -20,11 +22,20 @@ public final static class SubscriptionRequest {

private boolean shared = false;
private ShareName shareName;
private Optional<SubscriptionIdentifier> subscriptionIdOpt;

private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) {
this.topicFilter = topicFilter;
this.clientId = clientId;
this.requestedQoS = requestedQoS;
this.subscriptionIdOpt = Optional.of(subscriptionId);
}

private SubscriptionRequest(String clientId, Topic topicFilter, MqttQoS requestedQoS) {
this.topicFilter = topicFilter;
this.clientId = clientId;
this.requestedQoS = requestedQoS;
this.subscriptionIdOpt = Optional.empty();
}

public static SubscriptionRequest buildNonShared(Subscription subscription) {
Expand All @@ -35,12 +46,29 @@ public static SubscriptionRequest buildNonShared(String clientId, Topic topicFil
return new SubscriptionRequest(clientId, topicFilter, requestedQoS);
}

public static SubscriptionRequest buildNonShared(String clientId, Topic topicFilter, MqttQoS requestedQoS,
SubscriptionIdentifier subscriptionId) {
Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null");
return new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId);
}

public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId,
MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) {
Objects.requireNonNull(subscriptionId, "SubscriptionId param can't be null");
return buildSharedHelper(shareName, topicFilter,
() -> new SubscriptionRequest(clientId, topicFilter, requestedQoS, subscriptionId));
}

public static SubscriptionRequest buildShared(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS) {
return buildSharedHelper(shareName, topicFilter,
() -> new SubscriptionRequest(clientId, topicFilter, requestedQoS));
}

private static SubscriptionRequest buildSharedHelper(ShareName shareName, Topic topicFilter, Supplier<SubscriptionRequest> instantiator) {
if (topicFilter.headToken().name().startsWith("$share")) {
throw new IllegalArgumentException("Topic filter of a shared subscription can't contains $share and share name");
}

SubscriptionRequest request = new SubscriptionRequest(clientId, topicFilter, requestedQoS);
SubscriptionRequest request = instantiator.get();
request.shared = true;
request.shareName = shareName;
return request;
Expand All @@ -50,12 +78,20 @@ public Topic getTopicFilter() {
return topicFilter;
}

public MqttQoS getRequestedQoS() {
return requestedQoS;
}

public Subscription subscription() {
return new Subscription(clientId, topicFilter, requestedQoS);
return subscriptionIdOpt
.map(subscriptionIdentifier -> new Subscription(clientId, topicFilter, requestedQoS, subscriptionIdentifier))
.orElseGet(() -> new Subscription(clientId, topicFilter, requestedQoS));
}

public SharedSubscription sharedSubscription() {
return new SharedSubscription(shareName, topicFilter, clientId, requestedQoS);
return subscriptionIdOpt
.map(subId -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS, subId))
.orElseGet(() -> new SharedSubscription(shareName, topicFilter, clientId, requestedQoS));
}

public boolean isShared() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,31 +103,41 @@ public List<Subscription> matchQosSharpening(Topic topicName) {
@Override
public void add(String clientId, Topic filter, MqttQoS requestedQoS) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS);
ctrie.addToTree(subRequest);
subscriptionsRepository.addNewSubscription(subRequest.subscription());
addNonSharedSubscriptionRequest(subRequest);
}

@Override
public void add(String clientId, Topic filter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) {
// TODO implement, save the subscription Id into the ctrie
throw new IllegalStateException("Implement this");
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, requestedQoS, subscriptionId);
addNonSharedSubscriptionRequest(subRequest);
}

private void addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) {
ctrie.addToTree(subRequest);
subscriptionsRepository.addNewSubscription(subRequest.subscription());
}

@Override
public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS) {
SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS);
addSharedSubscriptionRequest(shareSubRequest);
}

private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) {
ctrie.addToTree(shareSubRequest);

subscriptionsRepository.addNewSharedSubscription(clientId, name, topicFilter, requestedQoS);
subscriptionsRepository.addNewSharedSubscription(shareSubRequest.getClientId(), shareSubRequest.getSharedName(),
shareSubRequest.getTopicFilter(), shareSubRequest.getRequestedQoS());

List<SharedSubscription> sharedSubscriptions = clientSharedSubscriptions.computeIfAbsent(clientId, unused -> new ArrayList<>());
List<SharedSubscription> sharedSubscriptions = clientSharedSubscriptions.computeIfAbsent(shareSubRequest.getClientId(), unused -> new ArrayList<>());
sharedSubscriptions.add(shareSubRequest.sharedSubscription());
}

@Override
public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS, SubscriptionIdentifier subscriptionId) {
// TODO implement, save the subscription Id into the ctrie
throw new IllegalStateException("Implement this");
public void addShared(String clientId, ShareName name, Topic topicFilter, MqttQoS requestedQoS,
SubscriptionIdentifier subscriptionId) {
SubscriptionRequest shareSubRequest = SubscriptionRequest.buildShared(name, topicFilter, clientId, requestedQoS, subscriptionId);
addSharedSubscriptionRequest(shareSubRequest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.handler.codec.mqtt.MqttQoS;

import java.util.Objects;
import java.util.Optional;

/**
* Shared subscription data class.
Expand All @@ -27,13 +28,25 @@ public final class SharedSubscription implements Comparable<SharedSubscription>
private final Topic topicFilter;
private final String clientId;
private final MqttQoS requestedQoS;
private final Optional<SubscriptionIdentifier> subscriptionId;

public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS) {
Objects.requireNonNull(requestedQoS, "qos parameter can't be null");
this.shareName = shareName;
this.topicFilter = topicFilter;
this.clientId = clientId;
this.requestedQoS = requestedQoS;
this.subscriptionId = Optional.empty();
}

public SharedSubscription(ShareName shareName, Topic topicFilter, String clientId, MqttQoS requestedQoS
, SubscriptionIdentifier subscriptionId) {
Objects.requireNonNull(requestedQoS, "qos parameter can't be null");
this.shareName = shareName;
this.topicFilter = topicFilter;
this.clientId = clientId;
this.requestedQoS = requestedQoS;
this.subscriptionId = Optional.of(subscriptionId);
}

public String clientId() {
Expand All @@ -52,6 +65,18 @@ public ShareName getShareName() {
return shareName;
}

/**
* Create a new Subscription instance from the data present in SharedSubscription
* */
Subscription createSubscription() {
if (subscriptionId.isPresent()) {
return new Subscription(clientId, topicFilter, requestedQoS, shareName.getShareName(), subscriptionId.get());
} else {
return new Subscription(clientId, topicFilter, requestedQoS, shareName.getShareName());
}
}


@Override
public int compareTo(SharedSubscription o) {
return this.clientId.compareTo(o.clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

/**
* Maintain the information about which Topic a certain ClientID is subscribed and at which QoS
Expand All @@ -32,22 +33,36 @@ public final class Subscription implements Serializable, Comparable<Subscription
final Topic topicFilter;
final String shareName;

// TODO remove transient when the subscription identifier has to be persisted
private transient final Optional<SubscriptionIdentifier> subscriptionId;

public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos) {
this(clientId, topicFilter, requestedQos, "");
this(clientId, topicFilter, requestedQos, "", null);
}

public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, SubscriptionIdentifier subscriptionId) {
this(clientId, topicFilter, requestedQos, "", subscriptionId);
}

public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, String shareName) {
this(clientId, topicFilter, requestedQos, shareName, null);
}

public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos, String shareName,
SubscriptionIdentifier subscriptionId) {
this.requestedQos = requestedQos;
this.clientId = clientId;
this.topicFilter = topicFilter;
this.shareName = shareName;
this.subscriptionId = Optional.ofNullable(subscriptionId);
}

public Subscription(Subscription orig) {
this.requestedQos = orig.requestedQos;
this.clientId = orig.clientId;
this.topicFilter = orig.topicFilter;
this.shareName = orig.shareName;
this.subscriptionId = orig.subscriptionId;
}

public String getClientId() {
Expand All @@ -66,6 +81,14 @@ public boolean qosLessThan(Subscription sub) {
return requestedQos.value() < sub.requestedQos.value();
}

public boolean hasSubscriptionIdentifier() {
return subscriptionId.isPresent();
}

public SubscriptionIdentifier getSubscriptionIdentifier() {
return subscriptionId.get();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.moquette.broker.subscriptions;

import java.util.Objects;

/**
* Models the subscription identifier for MQTT5 Subscription.
* */
Expand All @@ -17,5 +19,21 @@ public int value() {
return subscriptionId;
}

@Override
public String toString() {
return "SubscriptionIdentifier: " + subscriptionId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubscriptionIdentifier that = (SubscriptionIdentifier) o;
return subscriptionId == that.subscriptionId;
}

@Override
public int hashCode() {
return Objects.hash(subscriptionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.util.List;
import static io.moquette.broker.subscriptions.Topic.asTopic;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CTrieSharedSubscriptionDirectoryMatchingTest extends CTrieSubscriptionDirectMatchingCommon {

Expand Down Expand Up @@ -65,4 +69,53 @@ public void givenSessionHasMultipleSharedSubscriptionWhenTheClientIsRemovedThenN
List<Subscription> matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom"));
assertThat(matchingSubscriptions).isEmpty();
}

@Test
public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThenSubscriptionIdIsUpdated() {
// subscribe a client on topic with subscription identifier
sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE,
new SubscriptionIdentifier(1));

// verify it contains the subscription identifier
final List<Subscription> matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b"));
verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1), "share_temp");

// update the subscription of same clientId on same topic filter but with different subscription identifier
sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE,
new SubscriptionIdentifier(123));

// verify the subscription identifier is updated
final List<Subscription> reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b"));
verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123), "share_temp");
}

private static void verifySubscriptionIdentifierIsPresent(List<Subscription> matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) {
assertAll("subscription contains the subscription identifier",
() -> assertEquals(1, matchingSubscriptions.size()),
() -> assertEquals(expectedShareName, matchingSubscriptions.iterator().next().shareName),
() -> assertTrue(matchingSubscriptions.iterator().next().hasSubscriptionIdentifier()),
() -> assertEquals(subscriptionIdentifier, matchingSubscriptions.iterator().next().getSubscriptionIdentifier())
);
}

@Test
public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscriptionIdIsProcessedThenSubscriptionIdIsWiped() {
// subscribe a client on topic with subscription identifier
sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE,
new SubscriptionIdentifier(1));

// verify it contains the subscription identifier
SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1);
verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp");

// update the subscription of same clientId on same topic filter but removing subscription identifier
sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), MqttQoS.AT_MOST_ONCE);

// verify the subscription identifier is removed
final List<Subscription> reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b"));
assertAll("subscription doesn't contain subscription identifier",
() -> assertEquals(1, reloadedSubscriptions.size()),
() -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier())
);
}
}
Loading

0 comments on commit 8480e22

Please sign in to comment.