Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
26129f5
[broker-30] Implement delivering retained messages
crazyrokr Nov 19, 2025
c6ada4d
[broker-30] Rewrite retained messages collecting
crazyrokr Nov 19, 2025
94b32db
Merge branch 'develop' into feature-broker-30
crazyrokr Nov 19, 2025
01bac5b
[broker-30] Cleanup code after merge
crazyrokr Nov 19, 2025
c903510
[broker-30] Fix corner cases in retained messages
crazyrokr Nov 20, 2025
e1f5c2a
[broker-30] Handle SubscribeRetainHandling
crazyrokr Nov 20, 2025
a8829ac
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 1, 2025
a1b87dd
[broker-30] Fix build
crazyrokr Dec 1, 2025
fd848eb
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 1, 2025
be8070b
[broker-30] SubscriberNode refactoring
crazyrokr Dec 2, 2025
0c219dd
[broker-30] Revert wrong access level modifier
crazyrokr Dec 2, 2025
62900ca
[broker-30] Avoid tree branch locking
crazyrokr Dec 3, 2025
dc1d443
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 6, 2025
7314ed1
[broker-30] Introduce AbstractTrieNode
crazyrokr Dec 7, 2025
901e865
[broker-30] Remove redundant code from SubscriberNode
crazyrokr Dec 7, 2025
cc91db2
[broker-30] Revert redundant changes
crazyrokr Dec 7, 2025
42af014
[broker-30] Improve AbstractTrieNode
crazyrokr Dec 7, 2025
b1aa456
[broker-30] Implement retainAsPublished support
crazyrokr Dec 7, 2025
f2f2f67
[broker-30] Update debug message
crazyrokr Dec 7, 2025
bf837eb
[broker-30] Small refactoring
crazyrokr Dec 7, 2025
cc08339
[broker-30] Avoid double message retaining
crazyrokr Dec 7, 2025
bacefb7
[broker-30] Add more tests
crazyrokr Dec 8, 2025
c2ac2ad
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 8, 2025
1d37242
[broker-30] Fix tests
crazyrokr Dec 8, 2025
c3ab5c5
[broker-30] Fix retainAsPublished logic
crazyrokr Dec 8, 2025
b829090
[broker-30] Add tests
crazyrokr Dec 8, 2025
26f45dc
[broker-30] Introduce RetainMessageService
crazyrokr Dec 8, 2025
6c16962
[broker-30] Rework retainAsPublished handling
crazyrokr Dec 9, 2025
12f8069
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 9, 2025
31ff664
[broker-30] Fix formatting
crazyrokr Dec 9, 2025
8191bc8
[broker-30] Improve RetainedMessageTreeTest
crazyrokr Dec 9, 2025
c7c72d3
[broker-30] Improve code readability
crazyrokr Dec 9, 2025
8a23e97
[broker-30] Extract creation of SingleSubscriber to SubscriptionService
crazyrokr Dec 9, 2025
57eeb77
[broker-30] Revert unnecessary changes
crazyrokr Dec 9, 2025
28e73a5
[broker-30] Fix QoS comparison
crazyrokr Dec 9, 2025
2fb16a5
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 10, 2025
f4a6727
[broker-30] Update tests
crazyrokr Dec 10, 2025
751624f
[broker-30] Introduce setRetainedMessage and clearRetainedMessage
crazyrokr Dec 10, 2025
649f57e
[broker-30] Move publish.retained() check to caller
crazyrokr Dec 10, 2025
b4e1232
[broker-30] Extract isRetainHandlingSatisfied() method
crazyrokr Dec 10, 2025
1f3d50a
[broker-30] Revert formatting
crazyrokr Dec 10, 2025
588aae4
[broker-30] Revert SubscriberNode
crazyrokr Dec 10, 2025
07ce54d
[broker-30] Reduce memory allocation of RetainedMessageNode
crazyrokr Dec 11, 2025
3163a8a
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 12, 2025
17359d3
[broker-30] Refactoring
crazyrokr Dec 12, 2025
ffb1c83
[broker-30] Update tests
crazyrokr Dec 12, 2025
a568d4f
Merge branch 'develop' into feature-broker-30
crazyrokr Dec 14, 2025
94b7b17
[broker-30] Move logic from subscription service to subscription handler
crazyrokr Dec 14, 2025
e7c01a9
[broker-30] Separate addRetainedMessage and removeRetainedMessage API
crazyrokr Dec 15, 2025
d6f66eb
[broker-30] Use ArrayBuilder in getRetainedMessages API
crazyrokr Dec 15, 2025
bc83b99
Merge remote-tracking branch 'origin/feature-broker-30' into feature-…
crazyrokr Dec 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.topic.tree.ConcurrentRetainedMessageTree;
import javasabr.mqtt.network.MqttClientFactory;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.MqttConnectionFactory;
Expand Down Expand Up @@ -178,8 +179,9 @@ MqttInMessageHandler disconnectMqttInMessageHandler(MessageOutFactoryService mes
MqttInMessageHandler subscribeMqttInMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService,
TopicService topicService) {
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService);
TopicService topicService,
PublishDeliveringService publishDeliveringService) {
return new SubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService, publishDeliveringService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.model.topic.tree;
package javasabr.mqtt.model.subscribtion.tree;

import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscribtion.Subscription;
Expand All @@ -13,12 +13,12 @@
import org.jspecify.annotations.Nullable;

@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ConcurrentTopicTree implements ThreadSafe {
public class ConcurrentSubscriptionTree implements ThreadSafe {

TopicNode rootNode;
TopicFilterNode rootNode;

public ConcurrentTopicTree() {
this.rootNode = new TopicNode();
public ConcurrentSubscriptionTree() {
this.rootNode = new TopicFilterNode();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.model.topic.tree;
package javasabr.mqtt.model.subscribtion.tree;

import java.util.function.Supplier;
import javasabr.mqtt.base.util.DebugUtils;
Expand All @@ -22,16 +22,16 @@
@Getter(AccessLevel.PACKAGE)
@Accessors(fluent = true, chain = false)
@FieldDefaults(level = AccessLevel.PRIVATE)
class TopicNode extends TopicTreeBase {
class TopicFilterNode extends TopicFilterTreeBase {

private final static Supplier<TopicNode> TOPIC_NODE_FACTORY = TopicNode::new;
private final static Supplier<TopicFilterNode> TOPIC_NODE_FACTORY = TopicFilterNode::new;

static {
DebugUtils.registerIncludedFields("childNodes", "subscribers");
}

@Nullable
volatile LockableRefToRefDictionary<String, TopicNode> childNodes;
volatile LockableRefToRefDictionary<String, TopicFilterNode> childNodes;
@Nullable
volatile LockableArray<Subscriber> subscribers;

Expand All @@ -43,15 +43,15 @@ public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscripti
if (level == topicFilter.levelsCount()) {
return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
}
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
return childNode.subscribe(level + 1, owner, subscription, topicFilter);
}

public boolean unsubscribe(int level, SubscriptionOwner owner, TopicFilter topicFilter) {
if (level == topicFilter.levelsCount()) {
return removeSubscriber(subscribers(), owner, topicFilter);
}
TopicNode childNode = getOrCreateChildNode(topicFilter.segment(level));
TopicFilterNode childNode = getOrCreateChildNode(topicFilter.segment(level));
return childNode.unsubscribe(level + 1, owner, topicFilter);
}

Expand All @@ -67,14 +67,14 @@ private void exactlyTopicMatch(
int lastLevel,
MutableArray<SingleSubscriber> result) {
String segment = topicName.segment(level);
TopicNode topicNode = childNode(segment);
if (topicNode == null) {
TopicFilterNode topicFilterNode = childNode(segment);
if (topicFilterNode == null) {
return;
}
if (level == lastLevel) {
appendSubscribersTo(result, topicNode);
appendSubscribersTo(result, topicFilterNode);
} else if (level < lastLevel) {
topicNode.matchesTo(level + 1, topicName, lastLevel, result);
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
}
}

Expand All @@ -83,31 +83,31 @@ private void singleWildcardTopicMatch(
TopicName topicName,
int lastLevel,
MutableArray<SingleSubscriber> result) {
TopicNode topicNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
if (topicNode == null) {
TopicFilterNode topicFilterNode = childNode(TopicFilter.SINGLE_LEVEL_WILDCARD);
if (topicFilterNode == null) {
return;
}
if (level == lastLevel) {
appendSubscribersTo(result, topicNode);
appendSubscribersTo(result, topicFilterNode);
} else if (level < lastLevel) {
topicNode.matchesTo(level + 1, topicName, lastLevel, result);
topicFilterNode.matchesTo(level + 1, topicName, lastLevel, result);
}
}

private void multiWildcardTopicMatch(MutableArray<SingleSubscriber> result) {
TopicNode topicNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
if (topicNode != null) {
appendSubscribersTo(result, topicNode);
TopicFilterNode topicFilterNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD);
if (topicFilterNode != null) {
appendSubscribersTo(result, topicFilterNode);
}
}

private TopicNode getOrCreateChildNode(String segment) {
LockableRefToRefDictionary<String, TopicNode> childNodes = getOrCreateChildNodes();
private TopicFilterNode getOrCreateChildNode(String segment) {
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = getOrCreateChildNodes();
long stamp = childNodes.readLock();
try {
TopicNode topicNode = childNodes.get(segment);
if (topicNode != null) {
return topicNode;
TopicFilterNode topicFilterNode = childNodes.get(segment);
if (topicFilterNode != null) {
return topicFilterNode;
}
} finally {
childNodes.readUnlock(stamp);
Expand All @@ -122,8 +122,8 @@ private TopicNode getOrCreateChildNode(String segment) {
}

@Nullable
private TopicNode childNode(String segment) {
LockableRefToRefDictionary<String, TopicNode> childNodes = childNodes();
private TopicFilterNode childNode(String segment) {
LockableRefToRefDictionary<String, TopicFilterNode> childNodes = childNodes();
if (childNodes == null) {
return null;
}
Expand All @@ -135,7 +135,7 @@ private TopicNode childNode(String segment) {
}
}

private LockableRefToRefDictionary<String, TopicNode> getOrCreateChildNodes() {
private LockableRefToRefDictionary<String, TopicFilterNode> getOrCreateChildNodes() {
if (childNodes == null) {
synchronized (this) {
if (childNodes == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package javasabr.mqtt.model.topic.tree;
package javasabr.mqtt.model.subscribtion.tree;

import java.util.Objects;
import javasabr.mqtt.model.QoS;
Expand All @@ -18,7 +18,7 @@

@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true)
abstract class TopicTreeBase {
abstract class TopicFilterTreeBase {

/**
* @return previous subscriber with the same owner
Expand Down Expand Up @@ -66,7 +66,7 @@ private static void addSharedSubscriber(
String group = sharedTopicFilter.shareName();
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
.iterations()
.findAny(group, TopicTreeBase::isSharedSubscriberWithGroup);
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);

if (sharedSubscriber == null) {
sharedSubscriber = new SharedSubscriber(sharedTopicFilter);
Expand All @@ -76,8 +76,8 @@ private static void addSharedSubscriber(
sharedSubscriber.addSubscriber(new SingleSubscriber(owner, subscription));
}

protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, TopicNode topicNode) {
LockableArray<Subscriber> subscribers = topicNode.subscribers();
protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, TopicFilterNode topicFilterNode) {
LockableArray<Subscriber> subscribers = topicFilterNode.subscribers();
if (subscribers == null) {
return;
}
Expand Down Expand Up @@ -125,7 +125,7 @@ private static boolean removeSharedSubscriber(
String group = sharedTopicFilter.shareName();
SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers
.iterations()
.findAny(group, TopicTreeBase::isSharedSubscriberWithGroup);
.findAny(group, TopicFilterTreeBase::isSharedSubscriberWithGroup);
if (sharedSubscriber != null) {
boolean removed = sharedSubscriber.removeSubscriberWithOwner(owner);
if (sharedSubscriber.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
@NullMarked
package javasabr.mqtt.model.subscribtion.tree;

import org.jspecify.annotations.NullMarked;
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
public class TopicFilter extends AbstractTopic {

public static final String MULTI_LEVEL_WILDCARD = "#";
public static final char MULTI_LEVEL_WILDCARD_CHAR = '#';
public static final char MULTI_LEVEL_WILDCARD_CHAR = MULTI_LEVEL_WILDCARD.charAt(0);
public static final String SINGLE_LEVEL_WILDCARD = "+";
public static final char SINGLE_LEVEL_WILDCARD_CHAR = '+';
public static final char SINGLE_LEVEL_WILDCARD_CHAR = SINGLE_LEVEL_WILDCARD.charAt(0);
public static final String SPECIAL = "$";

public static final TopicFilter INVALID_TOPIC_FILTER = new TopicFilter("$invalid$") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package javasabr.mqtt.model.topic.tree;

import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.rlib.common.ThreadSafe;
import lombok.AccessLevel;
import lombok.experimental.FieldDefaults;
import org.jspecify.annotations.Nullable;

@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ConcurrentRetainedMessageTree implements ThreadSafe {

TopicMessageNode rootNode;

public ConcurrentRetainedMessageTree() {
this.rootNode = new TopicMessageNode();
}

public void retainMessage(Publish message) {
rootNode.retainMessage(0, message, message.topicName());
}

public @Nullable Publish getRetainedMessage(TopicName topicName) {
return rootNode.getRetainedMessage(0, topicName);
}

public @Nullable Publish getRetainedMessage(TopicFilter topicFilter) {
return rootNode.getRetainedMessage(0, topicFilter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package javasabr.mqtt.model.topic.tree;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javasabr.mqtt.base.util.DebugUtils;
import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.rlib.collections.dictionary.DictionaryFactory;
import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
import org.jspecify.annotations.Nullable;

@Getter(AccessLevel.PACKAGE)
@Accessors(fluent = true, chain = false)
@FieldDefaults(level = AccessLevel.PRIVATE)
class TopicMessageNode {

private final static Supplier<TopicMessageNode> TOPIC_NODE_FACTORY = TopicMessageNode::new;

static {
DebugUtils.registerIncludedFields("childNodes", "retainedMessage");
}

@Nullable
volatile LockableRefToRefDictionary<String, TopicMessageNode> childNodes;
final AtomicReference<@Nullable Publish> retainedMessage = new AtomicReference<>();

public void retainMessage(int level, Publish message, TopicName topicFilter) {
if (level + 1 == topicFilter.levelsCount()) {
retainedMessage.set(message);
return;
}
TopicMessageNode childNode = getOrCreateChildNode(topicFilter.segment(level));
childNode.retainMessage(level + 1, message, topicFilter);
}

@Nullable
public Publish getRetainedMessage(int level, TopicName topicName) {
if (level + 1 == topicName.levelsCount()) {
return retainedMessage.get();
}
TopicMessageNode childNode = getOrCreateChildNode(topicName.segment(level));
return childNode.getRetainedMessage(level + 1, topicName);
}

@Nullable
public Publish getRetainedMessage(int level, TopicFilter topicName) {
if (level + 1 == topicName.levelsCount()) {
return retainedMessage.get();
}
TopicMessageNode childNode = getOrCreateChildNode(topicName.segment(level));
return childNode.getRetainedMessage(level + 1, topicName);
}

private TopicMessageNode getOrCreateChildNode(String segment) {
LockableRefToRefDictionary<String, TopicMessageNode> childNodes = getOrCreateChildNodes();
long stamp = childNodes.readLock();
try {
TopicMessageNode topicFilterNode = childNodes.get(segment);
if (topicFilterNode != null) {
return topicFilterNode;
}
} finally {
childNodes.readUnlock(stamp);
}
stamp = childNodes.writeLock();
try {
return childNodes.getOrCompute(segment, TOPIC_NODE_FACTORY);
} finally {
childNodes.writeUnlock(stamp);
}
}

private LockableRefToRefDictionary<String, TopicMessageNode> getOrCreateChildNodes() {
if (childNodes == null) {
synchronized (this) {
if (childNodes == null) {
childNodes = DictionaryFactory.stampedLockBasedRefToRefDictionary();
}
}
}
//noinspection ConstantConditions
return childNodes;
}

@Override
public String toString() {
return DebugUtils.toJsonString(this);
}
}
Loading
Loading