Skip to content

Commit 8a23e97

Browse files
committed
[broker-30] Extract creation of SingleSubscriber to SubscriptionService
1 parent c7c72d3 commit 8a23e97

File tree

8 files changed

+74
-66
lines changed

8 files changed

+74
-66
lines changed

core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,17 @@ private SubscribeAckReasonCode addSubscription(MqttUser user, MqttSession sessio
7979
return SubscribeAckReasonCode.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED;
8080
}
8181
ActiveSubscriptions activeSubscriptions = session.activeSubscriptions();
82-
SingleSubscriber previous = subscriberTree.subscribe(user, subscription);
83-
if (previous != null) {
84-
activeSubscriptions.remove(previous.subscription());
82+
SingleSubscriber newSubscriber = new SingleSubscriber(user, subscription);
83+
SingleSubscriber previousSubscriber = subscriberTree.subscribe(newSubscriber);
84+
if (previousSubscriber != null) {
85+
activeSubscriptions.remove(previousSubscriber.subscription());
8586
}
8687
QoS subscriptionQoS = subscription.qos();
8788
SubscribeRetainHandling retainHandling = subscription.retainHandling();
8889
boolean isRetainHandlingSatisfied =
89-
retainHandling == SEND || (retainHandling == SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST && previous == null);
90+
retainHandling == SEND || (retainHandling == SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST && previousSubscriber == null);
9091
if (subscriptionQoS.isValid() && isRetainHandlingSatisfied) {
91-
sendRetainedMessages(user, subscription);
92+
sendRetainedMessages(newSubscriber);
9293
}
9394
activeSubscriptions.add(subscription);
9495
return subscriptionQoS.subscribeAckReasonCode();
@@ -140,17 +141,17 @@ public void restoreSubscriptions(MqttUser user, MqttSession session) {
140141
.activeSubscriptions()
141142
.subscriptions();
142143
for (Subscription subscription : subscriptions) {
143-
subscriberTree.subscribe(user, subscription);
144+
SingleSubscriber singleSubscriber = new SingleSubscriber(user, subscription);
145+
subscriberTree.subscribe(singleSubscriber);
144146
}
145147
}
146148

147-
private void sendRetainedMessages(MqttUser user, Subscription subscription) {
149+
private void sendRetainedMessages(SingleSubscriber singleSubscriber) {
148150
int count = 0;
149-
String clientId = user.clientId();
150-
PublishHandlingResult errorResult = null;
151-
SingleSubscriber singleSubscriber = new SingleSubscriber(user, subscription);
151+
String clientId = singleSubscriber.user().clientId();
152152
var results = retainMessageService.deliverRetainedMessages(singleSubscriber);
153153
for (PublishHandlingResult result : results) {
154+
PublishHandlingResult errorResult = null;
154155
if (result.error()) {
155156
errorResult = result;
156157
} else if (result == PublishHandlingResult.SUCCESS) {

core-service/src/test/groovy/javasabr/mqtt/service/impl/InMemorySubscriptionServiceTest.groovy

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,10 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
360360
true,
361361
true))
362362
and:
363-
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
363+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
364364
defaultRetainMessageService.retainMessage(publishWithRetain)
365365
and:
366-
def publishWithoutRetain = TestPublishFactory.createPublishWithoutRetain("topic/filter/1", "payload2")
366+
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
367367
defaultRetainMessageService.retainMessage(publishWithoutRetain)
368368
when:
369369
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -389,7 +389,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
389389
true)
390390
def subscriptions = Array.of(subscription)
391391
and:
392-
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
392+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
393393
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
394394
when:
395395
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -436,10 +436,10 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
436436
true,
437437
true))
438438
and:
439-
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
439+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
440440
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
441441
and:
442-
def publishWithoutRetain = TestPublishFactory.createPublishWithoutRetain("topic/filter/1", "payload2")
442+
def publishWithoutRetain = TestPublishFactory.makePublishWithoutRetain("topic/filter/1", "payload2")
443443
defaultPublishDeliveringService.startDelivering(publishWithoutRetain, new SingleSubscriber(mqttUser, subscription))
444444
when:
445445
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -468,7 +468,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
468468
false)
469469
def subscriptions = Array.of(subscription)
470470
and:
471-
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
471+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
472472
defaultRetainMessageService.retainMessage(publishWithRetain)
473473
when:
474474
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -495,7 +495,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
495495
def subscriptions = Array.of(subscription)
496496

497497
when:
498-
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
498+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
499499
defaultRetainMessageService.retainMessage(publishWithRetain)
500500
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
501501
then:
@@ -520,7 +520,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
520520
true)
521521
def subscriptions = Array.of(subscription)
522522
and:
523-
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
523+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
524524
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
525525
when:
526526
defaultSubscriptionService.subscribe(mqttUser, mqttUser.session(), subscriptions)
@@ -580,7 +580,7 @@ class InMemorySubscriptionServiceTest extends IntegrationServiceSpecification {
580580
true)
581581
def subscriptions = Array.of(subscription)
582582
and:
583-
def publishWithRetain = TestPublishFactory.createPublishWithRetain("topic/filter/1", "payload1")
583+
def publishWithRetain = TestPublishFactory.makePublishWithRetain("topic/filter/1", "payload1")
584584
defaultPublishDeliveringService.startDelivering(publishWithRetain, new SingleSubscriber(mqttUser, subscription))
585585
when:
586586
defaultSubscriptionService.subscribe(anotherUser, mqttUser.session(), subscriptions)

model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import javasabr.mqtt.model.MqttUser;
44
import javasabr.mqtt.model.subscriber.SingleSubscriber;
5-
import javasabr.mqtt.model.subscription.Subscription;
65
import javasabr.mqtt.model.topic.TopicFilter;
76
import javasabr.mqtt.model.topic.TopicName;
87
import javasabr.rlib.collections.array.Array;
@@ -22,8 +21,8 @@ public ConcurrentSubscriberTree() {
2221
}
2322

2423
@Nullable
25-
public SingleSubscriber subscribe(MqttUser user, Subscription subscription) {
26-
return rootNode.subscribe(0, user, subscription, subscription.topicFilter());
24+
public SingleSubscriber subscribe(SingleSubscriber subscriber) {
25+
return rootNode.subscribe(0, subscriber, subscriber.subscription().topicFilter());
2726
}
2827

2928
public boolean unsubscribe(MqttUser user, TopicFilter topicFilter) {

model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import javasabr.mqtt.model.MqttUser;
66
import javasabr.mqtt.model.subscriber.SingleSubscriber;
77
import javasabr.mqtt.model.subscriber.Subscriber;
8-
import javasabr.mqtt.model.subscription.Subscription;
98
import javasabr.mqtt.model.topic.TopicFilter;
109
import javasabr.mqtt.model.topic.TopicName;
1110
import javasabr.rlib.collections.array.ArrayFactory;
@@ -40,12 +39,12 @@ protected Supplier<SubscriberNode> getNodeFactory() {
4039
* @return the previous subscription from the same owner
4140
*/
4241
@Nullable
43-
protected SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) {
42+
protected SingleSubscriber subscribe(int level, SingleSubscriber subscriber, TopicFilter topicFilter) {
4443
if (level == topicFilter.levelsCount()) {
45-
return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter);
44+
return addSubscriber(getOrCreateSubscribers(), subscriber, topicFilter);
4645
}
4746
SubscriberNode childNode = getOrCreateChildNode(topicFilter.segment(level));
48-
return childNode.subscribe(level + 1, owner, subscription, topicFilter);
47+
return childNode.subscribe(level + 1, subscriber, topicFilter);
4948
}
5049

5150
protected boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) {

model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import javasabr.mqtt.model.subscriber.SharedSubscriber;
88
import javasabr.mqtt.model.subscriber.SingleSubscriber;
99
import javasabr.mqtt.model.subscriber.Subscriber;
10-
import javasabr.mqtt.model.subscription.Subscription;
1110
import javasabr.mqtt.model.topic.SharedTopicFilter;
1211
import javasabr.mqtt.model.topic.TopicFilter;
1312
import javasabr.rlib.collections.array.LockableArray;
@@ -27,17 +26,16 @@ abstract class SubscriberTreeBase extends AbstractTrieNode<SubscriberNode> {
2726
@Nullable
2827
protected static SingleSubscriber addSubscriber(
2928
LockableArray<Subscriber> subscribers,
30-
MqttUser user,
31-
Subscription subscription,
29+
SingleSubscriber subscriber,
3230
TopicFilter topicFilter) {
3331
long stamp = subscribers.writeLock();
3432
try {
3533
if (topicFilter instanceof SharedTopicFilter stf) {
36-
addSharedSubscriber(subscribers, user, subscription, stf);
34+
addSharedSubscriber(subscribers, subscriber, stf);
3735
return null;
3836
} else {
39-
SingleSubscriber previous = removePreviousIfExist(subscribers, user);
40-
subscribers.add(new SingleSubscriber(user, subscription));
37+
SingleSubscriber previous = removePreviousIfExist(subscribers, subscriber.user());
38+
subscribers.add(subscriber);
4139
return previous;
4240
}
4341
} finally {
@@ -58,8 +56,7 @@ private static SingleSubscriber removePreviousIfExist(LockableArray<Subscriber>
5856

5957
private static void addSharedSubscriber(
6058
LockableArray<Subscriber> subscribers,
61-
MqttUser user,
62-
Subscription subscription,
59+
SingleSubscriber subscriber,
6360
SharedTopicFilter sharedTopicFilter) {
6461

6562
String group = sharedTopicFilter.shareName();
@@ -72,7 +69,7 @@ private static void addSharedSubscriber(
7269
subscribers.add(sharedSubscriber);
7370
}
7471

75-
sharedSubscriber.addSubscriber(new SingleSubscriber(user, subscription));
72+
sharedSubscriber.addSubscriber(subscriber);
7673
}
7774

7875
protected static void appendSubscribersTo(MutableArray<SingleSubscriber> result, SubscriberNode subscriberNode) {

model/src/test/groovy/javasabr/mqtt/model/topic/tree/RetainedMessageTreeTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class RetainedMessageTreeTest extends UnitSpecification {
1212
List<String> expectedMessages) {
1313
given:
1414
ConcurrentRetainedMessageTree retainedMessageTree = new ConcurrentRetainedMessageTree();
15-
messages.collect(TestPublishFactory::createPublish).each(retainedMessageTree::retainMessage)
15+
messages.collect(TestPublishFactory::makePublish).each(retainedMessageTree::retainMessage)
1616
def topicFilter = TopicFilter.valueOf(rawTopicFilter)
1717
when:
1818
def retainedMessages = retainedMessageTree.getRetainedMessage(topicFilter)

0 commit comments

Comments
 (0)