diff --git a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java index a4f79d94..f330b24e 100644 --- a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java +++ b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java @@ -14,6 +14,7 @@ import javasabr.mqtt.network.user.NetworkMqttUserFactory; import javasabr.mqtt.service.AuthorizationService; import javasabr.mqtt.service.AuthenticationService; +import javasabr.mqtt.service.AuthorizationService; import javasabr.mqtt.service.ClientIdRegistry; import javasabr.mqtt.service.ConnectionService; import javasabr.mqtt.service.CredentialSource; @@ -254,24 +255,20 @@ ConnectionService externalMqttConnectionService(Collection expectedUserType; - SubscriptionService subscriptionService; MessageOutFactoryService messageOutFactoryService; @Override - public PublishHandlingResult handle(Publish publish, SingleSubscriber subscriber) { + public final void handle(Publish publish, SingleSubscriber subscriber) { MqttUser user = subscriber.resolveUser(); if (!expectedUserType.isInstance(user)) { log.warning(user.clientId(), user.getClass(), "[%s] Not expected user of type:[%s]"::formatted); - return PublishHandlingResult.NOT_EXPECTED_CLIENT; + return; } U expectedUser = expectedUserType.cast(user); MqttSession session = expectedUser.session(); if (session == null) { log.warning(user.clientId(), "[%s] Session is already closed"::formatted); - return PublishHandlingResult.SESSION_IS_ALREADY_CLOSED; + return; } - publish = reconstruct(expectedUser, session, publish); - if (publish == null) { - return PublishHandlingResult.SKIPPED; + Publish publishToSend = reconstruct(expectedUser, session, publish); + if (publishToSend != null) { + handleImpl(expectedUser, session, publishToSend); } - return handleImpl(expectedUser, session, publish); } @Nullable protected abstract Publish reconstruct(U user, MqttSession session, Publish original); - protected PublishHandlingResult handleImpl(U user, MqttSession session, Publish publish) { + protected void handleImpl(U user, MqttSession session, Publish publish) { send(user, publish); - return PublishHandlingResult.SUCCESS; } protected void send(U user, Publish publish) { - MqttOutMessage outMessage = messageOutFactoryService + user.sendInBackground(messageOutFactoryService .resolveFactory(user) .newPublish( publish.messageId(), @@ -68,7 +62,6 @@ protected void send(U user, Publish publish) { publish.payloadFormat(), publish.responseTopicName(), publish.correlationData(), - publish.userProperties()); - user.sendInBackground(outMessage); + publish.userProperties())); } } diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java index 8803aa3a..b7825bc9 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java @@ -6,16 +6,13 @@ import javasabr.mqtt.model.session.MqttSession; import javasabr.mqtt.network.impl.ExternalNetworkMqttUser; import javasabr.mqtt.service.MessageOutFactoryService; -import javasabr.mqtt.service.SubscriptionService; import org.jspecify.annotations.Nullable; public class Qos0MqttPublishOutMessageHandler extends AbstractMqttPublishOutMessageHandler { - public Qos0MqttPublishOutMessageHandler( - SubscriptionService subscriptionService, - MessageOutFactoryService messageOutFactoryService) { - super(ExternalNetworkMqttUser.class, subscriptionService, messageOutFactoryService); + public Qos0MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) { + super(ExternalNetworkMqttUser.class, messageOutFactoryService); } @Override diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java index 072f7dca..38a8ff52 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java @@ -10,17 +10,14 @@ import javasabr.mqtt.network.impl.ExternalNetworkMqttUser; import javasabr.mqtt.network.message.in.PublishAckMqttInMessage; import javasabr.mqtt.service.MessageOutFactoryService; -import javasabr.mqtt.service.SubscriptionService; import lombok.CustomLog; import org.jspecify.annotations.Nullable; @CustomLog public class Qos1MqttPublishOutMessageHandler extends TrackableMqttPublishOutMessageHandler { - public Qos1MqttPublishOutMessageHandler( - SubscriptionService subscriptionService, - MessageOutFactoryService messageOutFactoryService) { - super(subscriptionService, messageOutFactoryService); + public Qos1MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) { + super(messageOutFactoryService); } @Override diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java index 532b0b8f..62632168 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java @@ -143,22 +143,23 @@ private void handleMessageIdIsInUse(ExternalNetworkMqttUser user, int messageId) } private boolean handleReceivedTrackableMessage(MqttUser user, MqttSession session, TrackableMqttMessage message) { - ExternalNetworkMqttUser networkUser = (ExternalNetworkMqttUser) user; + ExternalNetworkMqttUser networkMqttUser = expectedUserType.cast(user); + String clientId = networkMqttUser.clientId(); int messageId = message.messageId(); MessageTacker messageTacker = session.inMessageTracker(); TrackedMessageMeta messageMeta = messageTacker.stored(messageId); if (messageMeta == null) { - log.warning(networkUser.clientId(), messageId, "[%s] No any stored information for messageId:[%d]"::formatted); + log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted); return true; } if (messageMeta.messageType() != MqttMessageType.PUBLISH) { - log.warning(networkUser.clientId(), messageMeta, messageId, + log.warning(clientId, messageMeta, messageId, "[%s] Not expected tracked message meta:[%s] for messageId:[%d]"::formatted); return true; } else if (!(message instanceof PublishReleaseMqttInMessage release)) { - log.warning(networkUser.clientId(), message, "[%s] Not expected message:[%s]"::formatted); + log.warning(clientId, message.messageType(), "[%s] Not expected message:[%s]"::formatted); return true; } @@ -168,10 +169,10 @@ private boolean handleReceivedTrackableMessage(MqttUser user, MqttSession sessio PublishCompletedReasonCode.SUCCESS); MqttOutMessage response = messageOutFactoryService - .resolveFactory(networkUser) + .resolveFactory(networkMqttUser) .newPublishCompleted(message.messageId(), PublishCompletedReasonCode.SUCCESS); - sendFeedback(networkUser, session, response, messageId); + sendFeedback(networkMqttUser, session, response, messageId); return true; } } diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java index 040a8721..38c4949c 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java @@ -13,17 +13,14 @@ import javasabr.mqtt.network.message.in.PublishCompleteMqttInMessage; import javasabr.mqtt.network.message.in.PublishReceivedMqttInMessage; import javasabr.mqtt.service.MessageOutFactoryService; -import javasabr.mqtt.service.SubscriptionService; import lombok.CustomLog; import org.jspecify.annotations.Nullable; @CustomLog public class Qos2MqttPublishOutMessageHandler extends TrackableMqttPublishOutMessageHandler { - public Qos2MqttPublishOutMessageHandler( - SubscriptionService subscriptionService, - MessageOutFactoryService messageOutFactoryService) { - super(subscriptionService, messageOutFactoryService); + public Qos2MqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) { + super(messageOutFactoryService); } @Override diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java index 7977f9a7..2405c913 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java @@ -15,9 +15,7 @@ import javasabr.mqtt.model.session.TrackedMessageMeta; import javasabr.mqtt.network.impl.ExternalNetworkMqttUser; import javasabr.mqtt.service.MessageOutFactoryService; -import javasabr.mqtt.service.SubscriptionService; import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory; -import javasabr.mqtt.service.publish.handler.PublishHandlingResult; import lombok.AccessLevel; import lombok.CustomLog; import lombok.experimental.FieldDefaults; @@ -31,10 +29,8 @@ public abstract class TrackableMqttPublishOutMessageHandler extends TrackableMessageCallback trackableMessageCallback; PublishRetryer publishRetryer; - protected TrackableMqttPublishOutMessageHandler( - SubscriptionService subscriptionService, - MessageOutFactoryService messageOutFactoryService) { - super(ExternalNetworkMqttUser.class, subscriptionService, messageOutFactoryService); + protected TrackableMqttPublishOutMessageHandler(MessageOutFactoryService messageOutFactoryService) { + super(ExternalNetworkMqttUser.class, messageOutFactoryService); this.trackableMessageCallback = this::handleReceivedTrackableMessage; this.publishRetryer = this::retryDelivering; } @@ -51,30 +47,25 @@ protected Publish reconstruct(ExternalNetworkMqttUser user, MqttSession session, } @Override - protected PublishHandlingResult handleImpl(ExternalNetworkMqttUser user, MqttSession session, Publish publish) { + protected final void handleImpl(ExternalNetworkMqttUser user, MqttSession session, Publish publish) { // register message id MessageTacker messageTacker = session.outMessageTracker(); messageTacker.add(publish.messageId(), MqttMessageType.PUBLISH); // register callback and retrier ProcessingPublishes processingPublishes = session.outProcessingPublishes(); processingPublishes.register(publish, trackableMessageCallback, publishRetryer); - return super.handleImpl(user, session, publish); + super.handleImpl(user, session, publish); } - protected boolean handleReceivedTrackableMessage( + protected final boolean handleReceivedTrackableMessage( MqttUser user, MqttSession session, TrackableMqttMessage message) { - + ExternalNetworkMqttUser networkMqttUser = expectedUserType.cast(user); int messageId = message.messageId(); MessageTacker messageTacker = session.outMessageTracker(); TrackedMessageMeta trackedMessageMeta = messageTacker.stored(messageId); - - return handleReceivedTrackableMessageImpl( - expectedUserType.cast(user), - session, - message, - trackedMessageMeta); + return handleReceivedTrackableMessageImpl(networkMqttUser, session, message, trackedMessageMeta); } protected abstract boolean handleReceivedTrackableMessageImpl( @@ -83,24 +74,20 @@ protected abstract boolean handleReceivedTrackableMessageImpl( TrackableMqttMessage message, @Nullable TrackedMessageMeta trackedMessageMeta); - protected void retryDelivering(MqttUser user, MqttSession session, Publish publish) { - retryDeliveringImpl(expectedUserType.cast(user), session, publish); - } - - protected void retryDeliveringImpl(ExternalNetworkMqttUser user, MqttSession session, Publish publish) { - String clientId = user.clientId(); + protected final void retryDelivering(MqttUser user, MqttSession session, Publish publish) { + ExternalNetworkMqttUser networkMqttUser = expectedUserType.cast(user); + String clientId = networkMqttUser.clientId(); int messageId = publish.messageId(); - TrackedMessageMeta messageMeta = session - .outMessageTracker() - .stored(messageId); - if (messageMeta == null) { + MessageTacker outMessageTracker = session.outMessageTracker(); + TrackedMessageMeta trackedMessageMeta = outMessageTracker.stored(messageId); + if (trackedMessageMeta == null) { log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted); - } else if (messageMeta.messageType() != MqttMessageType.PUBLISH) { - log.warning(clientId, messageMeta, messageId, + } else if (trackedMessageMeta.messageType() != MqttMessageType.PUBLISH) { + log.warning(clientId, trackedMessageMeta, messageId, "[%s] Not expected tracked message meta:[%s] for messageId:[%d]"::formatted); } else { log.debug(clientId, messageId, "[%s] Retry to deliver publish:[%s]"::formatted); - send(user, publish.withDuplicated()); + send(networkMqttUser, publish.withDuplicated()); } } diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java index 9b331f69..080d12ca 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java +++ b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java @@ -14,9 +14,9 @@ import lombok.experimental.FieldDefaults; @CustomLog +@Accessors @ToString(of = "clientId") @EqualsAndHashCode(of = "clientId") -@Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PRIVATE) public class InMemoryNetworkMqttSession implements ConfigurableNetworkMqttSession { diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy index 2da0a9f0..ca195f54 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy @@ -64,9 +64,9 @@ abstract class IntegrationServiceSpecification extends Specification { @Shared def defaultPublishDeliveringService = new DefaultPublishDeliveringService([ - new Qos0MqttPublishOutMessageHandler(defaultSubscriptionService, defaultMessageOutFactoryService), - new Qos1MqttPublishOutMessageHandler(defaultSubscriptionService, defaultMessageOutFactoryService), - new Qos2MqttPublishOutMessageHandler(defaultSubscriptionService, defaultMessageOutFactoryService) + new Qos0MqttPublishOutMessageHandler(defaultMessageOutFactoryService), + new Qos1MqttPublishOutMessageHandler(defaultMessageOutFactoryService), + new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) ]) @Shared diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandlerTest.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandlerTest.groovy index 2781da71..3184e8f4 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandlerTest.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandlerTest.groovy @@ -8,15 +8,12 @@ import javasabr.mqtt.model.subscriber.SingleSubscriber import javasabr.mqtt.model.subscription.Subscription import javasabr.mqtt.network.message.out.PublishMqtt5OutMessage import javasabr.mqtt.service.TestExternalNetworkMqttUser -import javasabr.mqtt.service.publish.handler.PublishHandlingResult class Qos0MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandlerTest { def "should deliver publish to subscriber"() { given: - def publishOutHandler = new Qos0MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos0MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def testTopicName = defaultTopicService.createTopicName(user, "Qos0MqttPublishOutMessageHandlerTest/1") @@ -27,9 +24,8 @@ class Qos0MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS with(user.nextSentMessage(PublishMqtt5OutMessage)) { qos() == QoS.AT_MOST_ONCE !duplicate() diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandlerTest.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandlerTest.groovy index d1adf56a..b8f60514 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandlerTest.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandlerTest.groovy @@ -16,15 +16,12 @@ import javasabr.mqtt.network.message.in.PublishReceivedMqttInMessage import javasabr.mqtt.network.message.out.DisconnectMqtt5OutMessage import javasabr.mqtt.network.message.out.PublishMqtt5OutMessage import javasabr.mqtt.service.TestExternalNetworkMqttUser -import javasabr.mqtt.service.publish.handler.PublishHandlingResult class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandlerTest { def "should deliver publish to subscriber"() { given: - def publishOutHandler = new Qos1MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos1MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def testTopicName = defaultTopicService.createTopicName(user, "Qos1MqttPublishOutMessageHandlerTest/1") @@ -32,12 +29,11 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def subscription = Subscription.minimal(topicFilter, QoS.AT_MOST_ONCE) def subscriber = new SingleSubscriber(user, subscription) def originalMessageId = 60 - def publish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) + def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(publish, subscriber) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS with(user.nextSentMessage(PublishMqtt5OutMessage)) { qos() == QoS.AT_LEAST_ONCE !duplicate() @@ -50,9 +46,7 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should wait for ack response for publish"() { given: - def publishOutHandler = new Qos1MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos1MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -61,15 +55,14 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def subscription = Subscription.minimal(topicFilter, QoS.AT_MOST_ONCE) def subscriber = new SingleSubscriber(user, subscription) def originalMessageId = 60 - def publish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) + def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(publish, subscriber) - def receivedPublish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { - with(stored(receivedPublish.messageId())) { + with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH reasonCode() == null } @@ -79,13 +72,13 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl } when: 'send publish ack' def publishAck = PublishAckMqttInMessage - .of(receivedPublish.messageId(), PublishAckReasonCode.SUCCESS) + .of(publish.messageId(), PublishAckReasonCode.SUCCESS) session .outProcessingPublishes() .apply(user, publishAck) then: with(session.outMessageTracker()) { - stored(receivedPublish.messageId()) == null + stored(publish.messageId()) == null } with(session.outProcessingPublishes()) { size() == 0 @@ -98,9 +91,7 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should correctly handle publish ack when no stored trackable meta about the publish"() { given: - def publishOutHandler = new Qos1MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos1MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -109,15 +100,14 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def subscription = Subscription.minimal(topicFilter, QoS.AT_MOST_ONCE) def subscriber = new SingleSubscriber(user, subscription) def originalMessageId = 60 - def publish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) + def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(publish, subscriber) - def receivedPublish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { - with(stored(receivedPublish.messageId())) { + with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH reasonCode() == null } @@ -128,15 +118,15 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl when: 'remove trackable info and send publish ack' session .outMessageTracker() - .remove(receivedPublish.messageId()) + .remove(publish.messageId()) def publishAck = PublishAckMqttInMessage - .of(receivedPublish.messageId(), PublishAckReasonCode.SUCCESS) + .of(publish.messageId(), PublishAckReasonCode.SUCCESS) session .outProcessingPublishes() .apply(user, publishAck) then: with(session.outMessageTracker()) { - stored(receivedPublish.messageId()) == null + stored(publish.messageId()) == null } with(session.outProcessingPublishes()) { size() == 0 @@ -149,9 +139,7 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should handle as protocol error receiving unexpected response message"() { given: - def publishOutHandler = new Qos1MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos1MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -160,15 +148,14 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def subscription = Subscription.minimal(topicFilter, QoS.AT_MOST_ONCE) def subscriber = new SingleSubscriber(user, subscription) def originalMessageId = 60 - def publish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) + def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(publish, subscriber) - def receivedPublish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { - with(stored(receivedPublish.messageId())) { + with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH reasonCode() == null } @@ -178,7 +165,7 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl } when: 'send unexpected publish receive to get protocol error' def publishReceive = PublishReceivedMqttInMessage - .of(receivedPublish.messageId(), PublishReceivedReasonCode.SUCCESS) + .of(publish.messageId(), PublishReceivedReasonCode.SUCCESS) session .outProcessingPublishes() .apply(user, publishReceive) @@ -191,9 +178,7 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should handle as protocol error for unexpected flow state"() { given: - def publishOutHandler = new Qos1MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos1MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -202,15 +187,14 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def subscription = Subscription.minimal(topicFilter, QoS.AT_MOST_ONCE) def subscriber = new SingleSubscriber(user, subscription) def originalMessageId = 60 - def publish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) + def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(publish, subscriber) - def receivedPublish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { - with(stored(receivedPublish.messageId())) { + with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH reasonCode() == null } @@ -221,9 +205,9 @@ class Qos1MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl when: 'change trackable info to publish release and send publish ack' session .outMessageTracker() - .update(receivedPublish.messageId(), MqttMessageType.PUBLISH_RELEASE, PublishReleaseReasonCode.SUCCESS) + .update(publish.messageId(), MqttMessageType.PUBLISH_RELEASE, PublishReleaseReasonCode.SUCCESS) def publishAck = PublishAckMqttInMessage - .of(receivedPublish.messageId(), PublishAckReasonCode.SUCCESS) + .of(publish.messageId(), PublishAckReasonCode.SUCCESS) session .outProcessingPublishes() .apply(user, publishAck) diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandlerTest.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandlerTest.groovy index d6d0ef56..7dbb467b 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandlerTest.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandlerTest.groovy @@ -19,15 +19,12 @@ import javasabr.mqtt.network.message.out.DisconnectMqtt5OutMessage import javasabr.mqtt.network.message.out.PublishMqtt5OutMessage import javasabr.mqtt.network.message.out.PublishReleaseMqtt5OutMessage import javasabr.mqtt.service.TestExternalNetworkMqttUser -import javasabr.mqtt.service.publish.handler.PublishHandlingResult class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandlerTest { def "should deliver publish to subscriber"() { given: - def publishOutHandler = new Qos2MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def testTopicName = defaultTopicService.createTopicName(user, "Qos2MqttPublishOutMessageHandlerTest/1") @@ -38,9 +35,8 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS with(user.nextSentMessage(PublishMqtt5OutMessage)) { qos() == QoS.EXACTLY_ONCE !duplicate() @@ -53,9 +49,7 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should wait for receive-complete responses for publish"() { given: - def publishOutHandler = new Qos2MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -67,10 +61,9 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) - def publish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH @@ -124,9 +117,7 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should correctly handle publish receive when no stored trackable meta about the publish"() { given: - def publishOutHandler = new Qos2MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -138,10 +129,9 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) - def publish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH @@ -176,9 +166,7 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should handle as protocol error receiving unexpected response message for first stage"() { given: - def publishOutHandler = new Qos2MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -190,10 +178,9 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) - def publish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH @@ -218,9 +205,7 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should handle as protocol error receiving unexpected response message for second stage"() { given: - def publishOutHandler = new Qos2MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -232,10 +217,9 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) - def publish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH @@ -271,9 +255,7 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should handle as protocol error for unexpected flow state for publish received"() { given: - def publishOutHandler = new Qos2MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -285,10 +267,9 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) - def publish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH @@ -316,9 +297,7 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def "should handle as protocol error for unexpected flow state for publish complete"() { given: - def publishOutHandler = new Qos2MqttPublishOutMessageHandler( - defaultSubscriptionService, - defaultMessageOutFactoryService) + def publishOutHandler = new Qos2MqttPublishOutMessageHandler(defaultMessageOutFactoryService) def connection = mockedExternalConnection(MqttVersion.MQTT_5) def user = connection.user() as TestExternalNetworkMqttUser def session = user.session() @@ -330,10 +309,9 @@ class Qos2MqttPublishOutMessageHandlerTest extends QosMqttPublishOutMessageHandl def testPublish = Publish.minimal(originalMessageId, QoS.EXACTLY_ONCE, testTopicName, testPayload) .withDuplicated() when: - def result = publishOutHandler.handle(testPublish, subscriber) - def publish = user.nextSentMessage(PublishMqtt5OutMessage) + publishOutHandler.handle(testPublish, subscriber) then: - result == PublishHandlingResult.SUCCESS + def publish = user.nextSentMessage(PublishMqtt5OutMessage) with(session.outMessageTracker()) { with(stored(publish.messageId())) { messageType() == MqttMessageType.PUBLISH