Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b369463
working on publish delivering
JavaSaBr Dec 1, 2025
7301ada
working on publish delivering
JavaSaBr Dec 2, 2025
c6c7012
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 2, 2025
d69e8b9
working on publish delivering
JavaSaBr Dec 2, 2025
abdc101
working on publish delivering
JavaSaBr Dec 2, 2025
19ba642
working on publish delivering
JavaSaBr Dec 2, 2025
0ed5c09
finish first part of upgrading delivering publishes to subscribers
JavaSaBr Dec 3, 2025
9263c8e
working on improving testing
JavaSaBr Dec 4, 2025
bfc3ea8
resolve comments
JavaSaBr Dec 4, 2025
0011896
Merge branch 'improve-publish-delivering-to-subscribers' into improve…
JavaSaBr Dec 4, 2025
f8752d0
continuer add tests
JavaSaBr Dec 5, 2025
94766c6
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 5, 2025
1164f82
finish adding tests
JavaSaBr Dec 5, 2025
6c47ae6
update tests for messages
JavaSaBr Dec 5, 2025
bdc6380
fix on code review
JavaSaBr Dec 7, 2025
eaf8d3a
start working on integration ACL
JavaSaBr Dec 8, 2025
dd72fad
Merge branch 'improve-publish-delivering-to-subscribers-part-2' into …
JavaSaBr Dec 8, 2025
49acfdd
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 8, 2025
5776855
Merge branch 'improve-publish-delivering-to-subscribers-part-3' into …
JavaSaBr Dec 8, 2025
7cd86f5
working on updating message validation
JavaSaBr Dec 8, 2025
0bbe921
Merge remote-tracking branch 'origin/develop' into improve-publish-de…
JavaSaBr Dec 8, 2025
8f0945b
working on updating message validation
JavaSaBr Dec 8, 2025
08244dd
working on updating message validation
JavaSaBr Dec 8, 2025
b8c4ada
continue updating
JavaSaBr Dec 9, 2025
2266e33
rename ACL service
JavaSaBr Dec 9, 2025
ee68968
Merge branch 'improve-publish-delivering-to-subscribers-part-4' into …
JavaSaBr Dec 9, 2025
e506129
finish refactoring publishes
JavaSaBr Dec 9, 2025
dd99e05
Merge remote-tracking branch 'origin/develop' into feature/69-impleme…
JavaSaBr Dec 10, 2025
4f0fdef
Merge remote-tracking branch 'origin/develop' into feature/69-impleme…
JavaSaBr Dec 10, 2025
aa5051d
Merge remote-tracking branch 'origin/develop' into feature/69-impleme…
JavaSaBr Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import javasabr.mqtt.network.user.NetworkMqttUserFactory;
import javasabr.mqtt.service.AuthorizationService;
import javasabr.mqtt.service.AuthenticationService;
import javasabr.mqtt.service.AuthorizationService;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.ConnectionService;
import javasabr.mqtt.service.CredentialSource;
Expand Down Expand Up @@ -254,24 +255,20 @@ ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessa
}

@Bean
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
MqttPublishOutMessageHandler qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
return new Qos0MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos1MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos1MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
return new Qos1MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
MqttPublishOutMessageHandler qos2MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
return new Qos2MqttPublishOutMessageHandler(subscriptionService, messageOutFactoryService);
return new Qos2MqttPublishOutMessageHandler(messageOutFactoryService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;

public interface PublishDeliveringService {

PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber);
void startDelivering(Publish publish, SingleSubscriber subscriber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.experimental.FieldDefaults;
Expand Down Expand Up @@ -45,13 +44,12 @@ public DefaultPublishDeliveringService(
}

@Override
public PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber) {
public void startDelivering(Publish publish, SingleSubscriber subscriber) {
try {
//noinspection DataFlowIssue
return publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
publishOutMessageHandlers[subscriber.qos().level()].handle(publish, subscriber);
} catch (IndexOutOfBoundsException | NullPointerException ex) {
log.warning(publish, "Received not supported publish message:[%s]"::formatted);
return PublishHandlingResult.UNSPECIFIED_ERROR;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public void processPublish(NetworkMqttUser user, Publish publish) {
log.warning(user.clientId(), publish, "[%s] Received not supported publish:%s"::formatted);
}
}



private static String buildServiceDescription(
@Nullable MqttPublishInMessageHandler[] publishInMessageHandlers) {
var builder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected boolean requireSession() {
}

@Override
public void processValidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
public final void processValidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
NetworkMqttUser user = connection.user();
if (!expectedUser.isInstance(user)) {
log.warning(user, "Received not expected user:[%s]"::formatted);
Expand All @@ -60,7 +60,7 @@ public void processValidMessage(MqttConnection connection, MqttInMessage mqttInM
}

@Override
public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
public final void processInvalidMessage(MqttConnection connection, MqttInMessage mqttInMessage) {
NetworkMqttUser user = connection.user();
if (!expectedUser.isInstance(user)) {
log.warning(user, "Received not expected user:[%s]"::formatted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,35 @@ protected void processMessageWithValidFields(
MqttConnection connection,
ExternalNetworkMqttUser user,
NetworkMqttSession session,
PublishMqttInMessage publishMessage) {
PublishMqttInMessage message) {

TopicName finalTopicName = resolveFinalTopicName(user, session, publishMessage);
TopicName finalTopicName = resolveFinalTopicName(user, session, message);
if (finalTopicName == null) {
return;
} else if (!authorizationService.authorizePublish(user, finalTopicName)) {
handleNotAuthorize(user);
return;
}

byte[] payload = publishMessage.payload();
TopicName responseTopicName = resolveResponseTopic(user, publishMessage);
byte[] payload = message.payload();
TopicName responseTopicName = resolveResponseTopic(user, message);

//noinspection DataFlowIssue everything is already validated
Publish publish = new Publish(
publishMessage.messageId(),
publishMessage.qos(),
message.messageId(),
message.qos(),
finalTopicName,
responseTopicName,
payload,
publishMessage.duplicate(),
publishMessage.retain(),
publishMessage.contentType(),
publishMessage.subscriptionIds(),
publishMessage.correlationData(),
publishMessage.messageExpiryInterval(),
publishMessage.topicAlias(),
publishMessage.payloadFormat(),
publishMessage.userProperties());
message.duplicate(),
message.retain(),
message.contentType(),
message.subscriptionIds(),
message.correlationData(),
message.messageExpiryInterval(),
message.topicAlias(),
message.payloadFormat(),
message.userProperties());

publishReceivingService.processPublish(user, publish);
}
Expand All @@ -92,12 +92,12 @@ protected void processMessageWithValidFields(
private TopicName resolveFinalTopicName(
ExternalNetworkMqttUser user,
NetworkMqttSession session,
PublishMqttInMessage publishMessage) {
PublishMqttInMessage message) {

TopicNameMapping topicNameMapping = session.topicNameMapping();
String rawTopicName = publishMessage.rawTopicName();
String rawTopicName = message.rawTopicName();
boolean providedRawTopicName = !StringUtils.isEmpty(rawTopicName);
int topicAlias = publishMessage.topicAlias();
int topicAlias = message.topicAlias();

TopicName topicNameByAlias;
TopicName finalTopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ public interface MqttPublishOutMessageHandler {

QoS qos();

PublishHandlingResult handle(Publish publish, SingleSubscriber subscriber);
void handle(Publish publish, SingleSubscriber subscriber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class AbstractMqttPublishInMessageHandler<U extends NetworkMqttU
MessageOutFactoryService messageOutFactoryService;

@Override
public void handle(NetworkMqttUser user, Publish publish) {
public final void handle(NetworkMqttUser user, Publish publish) {
if (!expectedUserType.isInstance(user)) {
log.warning(user.clientId(), user.getClass(), "[%s] Not expected user of type:[%s]"::formatted);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
import javasabr.mqtt.service.publish.handler.PublishHandlingResult;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.RequiredArgsConstructor;
Expand All @@ -23,39 +20,36 @@ public abstract class AbstractMqttPublishOutMessageHandler<U extends NetworkMqtt
implements MqttPublishOutMessageHandler {

Class<U> expectedUserType;
SubscriptionService subscriptionService;
MessageOutFactoryService messageOutFactoryService;

@Override
public PublishHandlingResult handle(Publish publish, SingleSubscriber subscriber) {
public final void handle(Publish publish, SingleSubscriber subscriber) {
MqttUser user = subscriber.resolveUser();
if (!expectedUserType.isInstance(user)) {
log.warning(user.clientId(), user.getClass(), "[%s] Not expected user of type:[%s]"::formatted);
return PublishHandlingResult.NOT_EXPECTED_CLIENT;
return;
}
U expectedUser = expectedUserType.cast(user);
MqttSession session = expectedUser.session();
if (session == null) {
log.warning(user.clientId(), "[%s] Session is already closed"::formatted);
return PublishHandlingResult.SESSION_IS_ALREADY_CLOSED;
return;
}
publish = reconstruct(expectedUser, session, publish);
if (publish == null) {
return PublishHandlingResult.SKIPPED;
Publish publishToSend = reconstruct(expectedUser, session, publish);
if (publishToSend != null) {
handleImpl(expectedUser, session, publishToSend);
}
return handleImpl(expectedUser, session, publish);
}

@Nullable
protected abstract Publish reconstruct(U user, MqttSession session, Publish original);

protected PublishHandlingResult handleImpl(U user, MqttSession session, Publish publish) {
protected void handleImpl(U user, MqttSession session, Publish publish) {
send(user, publish);
return PublishHandlingResult.SUCCESS;
}

protected void send(U user, Publish publish) {
MqttOutMessage outMessage = messageOutFactoryService
user.sendInBackground(messageOutFactoryService
.resolveFactory(user)
.newPublish(
publish.messageId(),
Expand All @@ -68,7 +62,6 @@ protected void send(U user, Publish publish) {
publish.payloadFormat(),
publish.responseTopicName(),
publish.correlationData(),
publish.userProperties());
user.sendInBackground(outMessage);
publish.userProperties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import javasabr.mqtt.model.session.MqttSession;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import org.jspecify.annotations.Nullable;

public class Qos0MqttPublishOutMessageHandler
extends AbstractMqttPublishOutMessageHandler<ExternalNetworkMqttUser> {

public Qos0MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, subscriptionService, messageOutFactoryService);
public Qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(ExternalNetworkMqttUser.class, messageOutFactoryService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.network.message.in.PublishAckMqttInMessage;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import lombok.CustomLog;
import org.jspecify.annotations.Nullable;

@CustomLog
public class Qos1MqttPublishOutMessageHandler extends TrackableMqttPublishOutMessageHandler {

public Qos1MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
super(subscriptionService, messageOutFactoryService);
public Qos1MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(messageOutFactoryService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,22 +143,23 @@ private void handleMessageIdIsInUse(ExternalNetworkMqttUser user, int messageId)
}

private boolean handleReceivedTrackableMessage(MqttUser user, MqttSession session, TrackableMqttMessage message) {
ExternalNetworkMqttUser networkUser = (ExternalNetworkMqttUser) user;
ExternalNetworkMqttUser networkMqttUser = expectedUserType.cast(user);
String clientId = networkMqttUser.clientId();
int messageId = message.messageId();

MessageTacker messageTacker = session.inMessageTracker();
TrackedMessageMeta messageMeta = messageTacker.stored(messageId);
if (messageMeta == null) {
log.warning(networkUser.clientId(), messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
return true;
}

if (messageMeta.messageType() != MqttMessageType.PUBLISH) {
log.warning(networkUser.clientId(), messageMeta, messageId,
log.warning(clientId, messageMeta, messageId,
"[%s] Not expected tracked message meta:[%s] for messageId:[%d]"::formatted);
return true;
} else if (!(message instanceof PublishReleaseMqttInMessage release)) {
log.warning(networkUser.clientId(), message, "[%s] Not expected message:[%s]"::formatted);
log.warning(clientId, message.messageType(), "[%s] Not expected message:[%s]"::formatted);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we consider renaming messageType() -> type()?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr for me it looks logically:
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but message.messageType() doesn't look nice

return true;
}

Expand All @@ -168,10 +169,10 @@ private boolean handleReceivedTrackableMessage(MqttUser user, MqttSession sessio
PublishCompletedReasonCode.SUCCESS);

MqttOutMessage response = messageOutFactoryService
.resolveFactory(networkUser)
.resolveFactory(networkMqttUser)
.newPublishCompleted(message.messageId(), PublishCompletedReasonCode.SUCCESS);

sendFeedback(networkUser, session, response, messageId);
sendFeedback(networkMqttUser, session, response, messageId);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
import javasabr.mqtt.network.message.in.PublishCompleteMqttInMessage;
import javasabr.mqtt.network.message.in.PublishReceivedMqttInMessage;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
import lombok.CustomLog;
import org.jspecify.annotations.Nullable;

@CustomLog
public class Qos2MqttPublishOutMessageHandler extends TrackableMqttPublishOutMessageHandler {

public Qos2MqttPublishOutMessageHandler(
SubscriptionService subscriptionService,
MessageOutFactoryService messageOutFactoryService) {
super(subscriptionService, messageOutFactoryService);
public Qos2MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) {
super(messageOutFactoryService);
}

@Override
Expand Down
Loading
Loading