Skip to content

Commit a990200

Browse files
authored
#69 Improve publish delivering to subscribers part 5 (#139)
1 parent e2d36d1 commit a990200

19 files changed

+126
-203
lines changed

application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import javasabr.mqtt.network.user.NetworkMqttUserFactory;
1515
import javasabr.mqtt.service.AuthorizationService;
1616
import javasabr.mqtt.service.AuthenticationService;
17+
import javasabr.mqtt.service.AuthorizationService;
1718
import javasabr.mqtt.service.ClientIdRegistry;
1819
import javasabr.mqtt.service.ConnectionService;
1920
import javasabr.mqtt.service.CredentialSource;
@@ -254,24 +255,20 @@ ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessa
254255
}
255256

256257
@Bean
257-
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(
258-
SubscriptionService subscriptionService,
259-
MessageOutFactoryService messageOutFactoryService) {
260-
return new Qos0MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
258+
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
259+
return new Qos0MqttPublishOutMessageHandler(messageOutFactoryService);
261260
}
262261

263262
@Bean
264263
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(
265-
SubscriptionService subscriptionService,
266264
MessageOutFactoryService messageOutFactoryService) {
267-
return new Qos1MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
265+
return new Qos1MqttPublishOutMessageHandler(messageOutFactoryService);
268266
}
269267

270268
@Bean
271269
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(
272-
SubscriptionService subscriptionService,
273270
MessageOutFactoryService messageOutFactoryService) {
274-
return new Qos2MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
271+
return new Qos2MqttPublishOutMessageHandler(messageOutFactoryService);
275272
}
276273

277274
@Bean

core-service/src/main/java/javasabr/mqtt/service/PublishDeliveringService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22

33
import javasabr.mqtt.model.publishing.Publish;
44
import javasabr.mqtt.model.subscriber.SingleSubscriber;
5-
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
65

76
public interface PublishDeliveringService {
87

9-
PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber);
8+
void startDelivering(Publish publish, SingleSubscriber subscriber);
109
}

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import javasabr.mqtt.model.subscriber.SingleSubscriber;
77
import javasabr.mqtt.service.PublishDeliveringService;
88
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
9-
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
109
import lombok.AccessLevel;
1110
import lombok.CustomLog;
1211
import lombok.experimental.FieldDefaults;
@@ -45,13 +44,12 @@ public DefaultPublishDeliveringService(
4544
}
4645

4746
@Override
48-
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
47+
public void startDelivering(Publish publish, SingleSubscriber subscriber) {
4948
try {
5049
//noinspection DataFlowIssue
51-
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
50+
publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
5251
} catch (IndexOutOfBoundsException | NullPointerException ex) {
5352
log.warning(publish, "Received not supported publish message:[%s]"::formatted);
54-
return PublishHandlingResult.UNSPECIFIED_ERROR;
5553
}
5654
}
5755

core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishReceivingService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ public void processPublish(NetworkMqttUser user, Publish publish) {
5454
log.warning(user.clientId(), publish, "[%s] Received not supported publish:%s"::formatted);
5555
}
5656
}
57-
58-
57+
5958
private static String buildServiceDescription(
6059
@Nullable MqttPublishInMessageHandler[] publishInMessageHandlers) {
6160
var builder = new StringBuilder();

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ protected boolean requireSession() {
3535
}
3636

3737
@Override
38-
public void processValidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
38+
public final void processValidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
3939
NetworkMqttUser user = connection.user();
4040
if (!expectedUser.isInstance(user)) {
4141
log.warning(user, "Received not expected user:[%s]"::formatted);
@@ -60,7 +60,7 @@ public void processValidMessage(MqttConnection connection, MqttInMessage mqttInM
6060
}
6161

6262
@Override
63-
public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
63+
public final void processInvalidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
6464
NetworkMqttUser user = connection.user();
6565
if (!expectedUser.isInstance(user)) {
6666
log.warning(user, "Received not expected user:[%s]"::formatted);

core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,35 +55,35 @@ protected void processMessageWithValidFields(
5555
MqttConnection connection,
5656
ExternalNetworkMqttUser user,
5757
NetworkMqttSession session,
58-
PublishMqttInMessage publishMessage) {
58+
PublishMqttInMessage message) {
5959

60-
TopicName finalTopicName = resolveFinalTopicName(user, session, publishMessage);
60+
TopicName finalTopicName = resolveFinalTopicName(user, session, message);
6161
if (finalTopicName == null) {
6262
return;
6363
} else if (!authorizationService.authorizePublish(user, finalTopicName)) {
6464
handleNotAuthorize(user);
6565
return;
6666
}
6767

68-
byte[] payload = publishMessage.payload();
69-
TopicName responseTopicName = resolveResponseTopic(user, publishMessage);
68+
byte[] payload = message.payload();
69+
TopicName responseTopicName = resolveResponseTopic(user, message);
7070

7171
//noinspection DataFlowIssue everything is already validated
7272
Publish publish = new Publish(
73-
publishMessage.messageId(),
74-
publishMessage.qos(),
73+
message.messageId(),
74+
message.qos(),
7575
finalTopicName,
7676
responseTopicName,
7777
payload,
78-
publishMessage.duplicate(),
79-
publishMessage.retain(),
80-
publishMessage.contentType(),
81-
publishMessage.subscriptionIds(),
82-
publishMessage.correlationData(),
83-
publishMessage.messageExpiryInterval(),
84-
publishMessage.topicAlias(),
85-
publishMessage.payloadFormat(),
86-
publishMessage.userProperties());
78+
message.duplicate(),
79+
message.retain(),
80+
message.contentType(),
81+
message.subscriptionIds(),
82+
message.correlationData(),
83+
message.messageExpiryInterval(),
84+
message.topicAlias(),
85+
message.payloadFormat(),
86+
message.userProperties());
8787

8888
publishReceivingService.processPublish(user, publish);
8989
}
@@ -92,12 +92,12 @@ protected void processMessageWithValidFields(
9292
private TopicName resolveFinalTopicName(
9393
ExternalNetworkMqttUser user,
9494
NetworkMqttSession session,
95-
PublishMqttInMessage publishMessage) {
95+
PublishMqttInMessage message) {
9696

9797
TopicNameMapping topicNameMapping = session.topicNameMapping();
98-
String rawTopicName = publishMessage.rawTopicName();
98+
String rawTopicName = message.rawTopicName();
9999
boolean providedRawTopicName = !StringUtils.isEmpty(rawTopicName);
100-
int topicAlias = publishMessage.topicAlias();
100+
int topicAlias = message.topicAlias();
101101

102102
TopicName topicNameByAlias;
103103
TopicName finalTopicName;

core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishOutMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ public interface MqttPublishOutMessageHandler {
88

99
QoS qos();
1010

11-
PublishHandlingResult handle(Publish publish, SingleSubscriber subscriber);
11+
void handle(Publish publish, SingleSubscriber subscriber);
1212
}

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public abstract class AbstractMqttPublishInMessageHandler<U extends NetworkMqttU
3131
MessageOutFactoryService messageOutFactoryService;
3232

3333
@Override
34-
public void handle(NetworkMqttUser user, Publish publish) {
34+
public final void handle(NetworkMqttUser user, Publish publish) {
3535
if (!expectedUserType.isInstance(user)) {
3636
log.warning(user.clientId(), user.getClass(), "[%s] Not expected user of type:[%s]"::formatted);
3737
return;

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishOutMessageHandler.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,9 @@
44
import javasabr.mqtt.model.publishing.Publish;
55
import javasabr.mqtt.model.session.MqttSession;
66
import javasabr.mqtt.model.subscriber.SingleSubscriber;
7-
import javasabr.mqtt.network.message.out.MqttOutMessage;
87
import javasabr.mqtt.network.user.NetworkMqttUser;
98
import javasabr.mqtt.service.MessageOutFactoryService;
10-
import javasabr.mqtt.service.SubscriptionService;
119
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
12-
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
1310
import lombok.AccessLevel;
1411
import lombok.CustomLog;
1512
import lombok.RequiredArgsConstructor;
@@ -23,39 +20,36 @@ public abstract class AbstractMqttPublishOutMessageHandler<U extends NetworkMqtt
2320
implements MqttPublishOutMessageHandler {
2421

2522
Class<U> expectedUserType;
26-
SubscriptionService subscriptionService;
2723
MessageOutFactoryService messageOutFactoryService;
2824

2925
@Override
30-
public PublishHandlingResult handle(Publish publish, SingleSubscriber subscriber) {
26+
public final void handle(Publish publish, SingleSubscriber subscriber) {
3127
MqttUser user = subscriber.resolveUser();
3228
if (!expectedUserType.isInstance(user)) {
3329
log.warning(user.clientId(), user.getClass(), "[%s] Not expected user of type:[%s]"::formatted);
34-
return PublishHandlingResult.NOT_EXPECTED_CLIENT;
30+
return;
3531
}
3632
U expectedUser = expectedUserType.cast(user);
3733
MqttSession session = expectedUser.session();
3834
if (session == null) {
3935
log.warning(user.clientId(), "[%s] Session is already closed"::formatted);
40-
return PublishHandlingResult.SESSION_IS_ALREADY_CLOSED;
36+
return;
4137
}
42-
publish = reconstruct(expectedUser, session, publish);
43-
if (publish == null) {
44-
return PublishHandlingResult.SKIPPED;
38+
Publish publishToSend = reconstruct(expectedUser, session, publish);
39+
if (publishToSend != null) {
40+
handleImpl(expectedUser, session, publishToSend);
4541
}
46-
return handleImpl(expectedUser, session, publish);
4742
}
4843

4944
@Nullable
5045
protected abstract Publish reconstruct(U user, MqttSession session, Publish original);
5146

52-
protected PublishHandlingResult handleImpl(U user, MqttSession session, Publish publish) {
47+
protected void handleImpl(U user, MqttSession session, Publish publish) {
5348
send(user, publish);
54-
return PublishHandlingResult.SUCCESS;
5549
}
5650

5751
protected void send(U user, Publish publish) {
58-
MqttOutMessage outMessage = messageOutFactoryService
52+
user.sendInBackground(messageOutFactoryService
5953
.resolveFactory(user)
6054
.newPublish(
6155
publish.messageId(),
@@ -68,7 +62,6 @@ protected void send(U user, Publish publish) {
6862
publish.payloadFormat(),
6963
publish.responseTopicName(),
7064
publish.correlationData(),
71-
publish.userProperties());
72-
user.sendInBackground(outMessage);
65+
publish.userProperties()));
7366
}
7467
}

core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@
66
import javasabr.mqtt.model.session.MqttSession;
77
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
88
import javasabr.mqtt.service.MessageOutFactoryService;
9-
import javasabr.mqtt.service.SubscriptionService;
109
import org.jspecify.annotations.Nullable;
1110

1211
public class Qos0MqttPublishOutMessageHandler
1312
extends AbstractMqttPublishOutMessageHandler<ExternalNetworkMqttUser> {
1413

15-
public Qos0MqttPublishOutMessageHandler(
16-
SubscriptionService subscriptionService,
17-
MessageOutFactoryService messageOutFactoryService) {
18-
super(ExternalNetworkMqttUser.class, subscriptionService, messageOutFactoryService);
14+
public Qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
15+
super(ExternalNetworkMqttUser.class, messageOutFactoryService);
1916
}
2017

2118
@Override

0 commit comments

Comments
 (0)