From 2c57034a93d9e9e7d37a625ff3e2e9e50ab0efb3 Mon Sep 17 00:00:00 2001 From: andsel Date: Sat, 23 Nov 2024 18:48:15 +0100 Subject: [PATCH] Fixed publish verification to tests, to subscribe to listened action before triggering the action --- ...ServerIntegrationWithoutClientFixture.java | 46 +++-- .../integration/mqtt5/ConnectTest.java | 72 +++---- .../integration/mqtt5/ContentTypeTest.java | 23 ++- .../mqtt5/MessageExpirationTest.java | 18 +- .../mqtt5/PayloadFormatIndicatorTest.java | 43 ++--- .../mqtt5/RequestResponseTest.java | 67 ++++--- .../mqtt5/SharedSubscriptionTest.java | 179 ++++++++++-------- 7 files changed, 238 insertions(+), 210 deletions(-) diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java index e8d23526a..0d48821fa 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java @@ -148,35 +148,39 @@ protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer action, MqttQos expectedQos, + protected static void verifyNoPublish(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer action, Duration timeout, String message) throws InterruptedException { + action.accept(null); + Optional publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS); + + // verify no published will in 10 seconds + assertFalse(publishedMessage.isPresent(), message); + } + + protected static void verifyPublishedMessage(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer action, MqttQos expectedQos, String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception { - try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) { - action.accept(null); - Optional publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS); - if (!publishMessage.isPresent()) { - fail("Expected to receive a publish message"); - return; - } - Mqtt5Publish msgPub = publishMessage.get(); - final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals(expectedPayload, payload, errorMessage); - assertEquals(expectedQos, msgPub.getQos()); + action.accept(null); + Optional publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS); + if (!publishMessage.isPresent()) { + fail("Expected to receive a publish message"); + return; } + Mqtt5Publish msgPub = publishMessage.get(); + final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals(expectedPayload, payload, errorMessage); + assertEquals(expectedQos, msgPub.getQos()); } static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) { assertEquals(mqttMessageType, received.fixedHeader().messageType()); } - static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer assertion) throws InterruptedException { - try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - Optional publishMessage = publishes.receive(1, TimeUnit.SECONDS); - if (!publishMessage.isPresent()) { - fail("Expected to receive a publish message"); - return; - } - Mqtt5Publish msgPub = publishMessage.get(); - assertion.accept(msgPub); + static void verifyPublishMessage(Mqtt5BlockingClient.Mqtt5Publishes publishListener, Consumer assertion) throws InterruptedException { + Optional publishMessage = publishListener.receive(1, TimeUnit.SECONDS); + if (!publishMessage.isPresent()) { + fail("Expected to receive a publish message"); + return; } + Mqtt5Publish msgPub = publishMessage.get(); + assertion.accept(msgPub); } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java index 3509b1309..2b7633d19 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java @@ -180,30 +180,32 @@ public void avoidToFirePreviouslyScheduledWillWhenSameClientIDReconnects() throw final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId); final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament(); + try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // client trigger a will message, disconnecting with bad reason code - final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder() - .reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET) - .build(); - clientWithWill.disconnect(malformedPacketReason); + // client trigger a will message, disconnecting with bad reason code + final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder() + .reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET) + .build(); + clientWithWill.disconnect(malformedPacketReason); - // wait no will is published - verifyNoTestamentIsPublished(testamentSubscriber, unused -> { - // reconnect another client with same clientId - final Mqtt5BlockingClient client = MqttClient.builder() - .useMqttVersion5() - .identifier(clientId) - .serverHost("localhost") - .serverPort(1883) - .buildBlocking(); - Mqtt5ConnAck connectAck = client.connect(); - assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected"); - - }, Duration.ofSeconds(10)); + // wait no will is published + verifyNoTestamentIsPublished(testamentListener, unused -> { + // reconnect another client with same clientId + final Mqtt5BlockingClient client = MqttClient.builder() + .useMqttVersion5() + .identifier(clientId) + .serverHost("localhost") + .serverPort(1883) + .buildBlocking(); + Mqtt5ConnAck connectAck = client.connect(); + assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected"); + + }, Duration.ofSeconds(10)); + } } - private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient testamentSubscriber, Consumer action, Duration timeout) throws InterruptedException { - verifyNoPublish(testamentSubscriber, action, timeout, "No will message should be published"); + private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient.Mqtt5Publishes testamentListener, Consumer action, Duration timeout) throws InterruptedException { + verifyNoPublish(testamentListener, action, timeout, "No will message should be published"); } @Test @@ -230,12 +232,14 @@ public void noWillMessageIsFiredOnNormalDisconnection() throws InterruptedExcept final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60); final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament(); + try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // wait no will is published - verifyNoTestamentIsPublished(testamentSubscriber, unused -> { - // normal session disconnection - clientWithWill.disconnect(Mqtt5Disconnect.builder().build()); - }, Duration.ofSeconds(10)); + // wait no will is published + verifyNoTestamentIsPublished(testamentListener, unused -> { + // normal session disconnection + clientWithWill.disconnect(Mqtt5Disconnect.builder().build()); + }, Duration.ofSeconds(10)); + } } @Test @@ -245,14 +249,16 @@ public void givenClientWithWillThatCleanlyDisconnectsWithWillShouldTriggerTheTes final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60); final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament(); - - // wait no will is published - verifyNoTestamentIsPublished(testamentSubscriber, unused -> { - // normal session disconnection with will - clientWithWill.disconnect(Mqtt5Disconnect.builder() - .reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE) - .build()); - }, Duration.ofSeconds(10)); + try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) { + + // wait no will is published + verifyNoTestamentIsPublished(testamentListener, unused -> { + // normal session disconnection with will + clientWithWill.disconnect(Mqtt5Disconnect.builder() + .reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE) + .build()); + }, Duration.ofSeconds(10)); + } } @Test diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java index 267bd7e96..c08da23c4 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; @@ -65,17 +66,19 @@ public void givenAPublishWithContentTypeWhenForwardedToSubscriberThenIsPresent() .send(); Mqtt5BlockingClient publisher = createPublisherClient(); - publisher.publishWith() - .topic("temperature/living") - .payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8)) - .contentType("application/json") - .qos(MqttQos.AT_MOST_ONCE) - .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + publisher.publishWith() + .topic("temperature/living") + .payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8)) + .contentType("application/json") + .qos(MqttQos.AT_MOST_ONCE) + .send(); - verifyPublishMessage(subscriber, msgPub -> { - assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present"); - assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched"); - }); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present"); + assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched"); + }); + } } @Test diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java index 314e8c288..108e6fcf9 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; @@ -68,13 +69,16 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimePassedThenRetainedIs // subscribe to same topic and verify no message Mqtt5BlockingClient subscriber = createSubscriberClient(); - subscriber.subscribeWith() - .topicFilter("temperature/living") - .qos(MqttQos.AT_MOST_ONCE) - .send(); - - verifyNoPublish(subscriber, v -> {}, Duration.ofSeconds(2), - "Subscriber must not receive any retained message"); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + subscriber.subscribeWith() + .topicFilter("temperature/living") + .qos(MqttQos.AT_MOST_ONCE) + .send(); + + verifyNoPublish(publishes, v -> { + }, Duration.ofSeconds(2), + "Subscriber must not receive any retained message"); + } } // TODO verify the elapsed diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java index 54be8ab40..27e32b786 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; @@ -25,8 +26,6 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode; -import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; -import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; @@ -54,7 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.assertj.core.api.Assertions.assertThat; public class PayloadFormatIndicatorTest extends AbstractServerIntegrationTest { @@ -67,30 +65,25 @@ public String clientName() { } @Test - public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException { - MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence()); - client.connect(); - MqttSubscription subscription = new MqttSubscription("temperature/living", 1); - SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector(); - IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, - new IMqttMessageListener[] {publishCollector}); - TestUtils.verifySubscribedSuccessfully(subscribeToken); - - Mqtt5BlockingClient publisher = createPublisherClient(); - publisher.publishWith() - .topic("temperature/living") - .payload("18".getBytes(StandardCharsets.UTF_8)) - .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) - .qos(MqttQos.AT_LEAST_ONCE) + public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException { + Mqtt5BlockingClient subscriber = createSubscriberClient(); + subscriber.subscribeWith() + .topicFilter("temperature/living") + .qos(MqttQos.AT_MOST_ONCE) .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5BlockingClient publisher = createPublisherClient(); + publisher.publishWith() + .topic("temperature/living") + .payload("18".getBytes(StandardCharsets.UTF_8)) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); - // Verify the message is also reflected back to the sender - publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS); - assertEquals("temperature/living", publishCollector.receivedTopic()); - assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match"); - org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage(); - assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos()); - assertTrue(receivedMessage.getProperties().getPayloadFormat()); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); + }); + } } @Test diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index 161a1d3a5..f56a7296d 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -18,6 +18,7 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; @@ -52,13 +53,15 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReply( final Mqtt5BlockingClient responder = createHiveBlockingClient("responder"); - responderRepliesToRequesterPublish(responder, requester, responseTopic); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) { + responderRepliesToRequesterPublish(responder, requester, responseTopic); - verifyPublishMessage(requester, msgPub -> { - assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); - String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals("OK", payload); - }); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + }); + } } private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient responder, Mqtt5BlockingClient requester, String responseTopic) { @@ -126,24 +129,26 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW }); waitForSubAck(subackFuture); - Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() - .topic("requester/door/open") - .responseTopic(responseTopic) - .correlationData("req-open-door".getBytes(StandardCharsets.UTF_8)) - .payload("Please open the door".getBytes(StandardCharsets.UTF_8)) - .qos(MqttQos.AT_LEAST_ONCE) - .send(); - assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(), - "Open door request cannot be published "); - - verifyPublishMessage(requester, msgPub -> { - assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); - String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals("OK", payload); - assertTrue(msgPub.getCorrelationData().isPresent(), "Request correlation data MUST defined in response publish"); - final byte[] correlationData = asByteArray(msgPub.getCorrelationData().get()); - assertEquals("req-open-door", new String(correlationData, StandardCharsets.UTF_8)); - }); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() + .topic("requester/door/open") + .responseTopic(responseTopic) + .correlationData("req-open-door".getBytes(StandardCharsets.UTF_8)) + .payload("Please open the door".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(), + "Open door request cannot be published "); + + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + assertTrue(msgPub.getCorrelationData().isPresent(), "Request correlation data MUST defined in response publish"); + final byte[] correlationData = asByteArray(msgPub.getCorrelationData().get()); + assertEquals("req-open-door", new String(correlationData, StandardCharsets.UTF_8)); + }); + } } private static void waitForSubAck(CompletableFuture<@NotNull Mqtt5SubAck> subackFuture) { @@ -174,12 +179,14 @@ public void givenRequestResponseProtocolAndClientIsConnectedWhenRequestIsIssueTh final Mqtt5BlockingClient responder = createHiveBlockingClient("responder"); - responderRepliesToRequesterPublish(responder, requester, responseTopic); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = requester.publishes(MqttGlobalPublishFilter.ALL)) { + responderRepliesToRequesterPublish(responder, requester, responseTopic); - verifyPublishMessage(requester, msgPub -> { - assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); - String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals("OK", payload); - }); + verifyPublishMessage(publishes, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + }); + } } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java index 877cbf801..47016d496 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java @@ -1,5 +1,6 @@ package io.moquette.integration.mqtt5; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; @@ -130,13 +131,14 @@ public void givenASharedSubscriptionClientReceivesANotification() throws Excepti subscriberClient.subscribeWith() .topicFilter("$share/collectors/metric/temperature/#") .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriberClient.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5BlockingClient publisherClient = createPublisherClient(); - Mqtt5BlockingClient publisherClient = createPublisherClient(); - - verifyPublishedMessage(subscriberClient, unused -> publisherClient.publishWith() - .topic("metric/temperature/living") - .payload("18".getBytes(StandardCharsets.UTF_8)) - .send(), MqttQos.AT_MOST_ONCE, "18", "Shared message must be received", 10); + verifyPublishedMessage(publishes, unused -> publisherClient.publishWith() + .topic("metric/temperature/living") + .payload("18".getBytes(StandardCharsets.UTF_8)) + .send(), MqttQos.AT_MOST_ONCE, "18", "Shared message must be received", 10); + } } @Test @@ -211,21 +213,22 @@ public void whenAClientSubscribeToASharedTopicThenDoesntReceiveAnyRetainedMessag public void givenSharedSubscriptionWithCertainQoSWhenSameClientWithSameShareSubscribeToSameTopicFilterThenQoSUpdates() throws Exception { final Mqtt5BlockingClient subscriberClient = createSubscriberClient(); subscribe(subscriberClient, "$share/collectors/metric/temperature/living", MqttQos.AT_MOST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriberClient.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5BlockingClient publisherClient = createPublisherClient(); - Mqtt5BlockingClient publisherClient = createPublisherClient(); - - verifyPublishedMessage(subscriberClient, - unused -> publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE), - MqttQos.AT_MOST_ONCE, "18", "QoS0 publish message is expected by the subscriber when subscribed with AT_MOST_ONCE", 1); + verifyPublishedMessage(publishes, + unused -> publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE), + MqttQos.AT_MOST_ONCE, "18", "QoS0 publish message is expected by the subscriber when subscribed with AT_MOST_ONCE", 1); - // update QoS for shared subscription - subscribe(subscriberClient, "$share/collectors/metric/temperature/living", MqttQos.AT_LEAST_ONCE); + // update QoS for shared subscription + subscribe(subscriberClient, "$share/collectors/metric/temperature/living", MqttQos.AT_LEAST_ONCE); - // This time the publish reaches the subscription - verifyPublishedMessage(subscriberClient, v -> { - // publish the message again and verify the captured message - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 30); + // This time the publish reaches the subscription + verifyPublishedMessage(publishes, v -> { + // publish the message again and verify the captured message + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 30); + } } private static void publish(Mqtt5BlockingClient publisherClient, String topicName, MqttQos mqttQos) { @@ -254,25 +257,27 @@ public void givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeT // subscribe second client to shared subscription final Mqtt5BlockingClient subscriber2 = createHiveBlockingClient("subscriber2"); subscribe(subscriber2, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); - - // unsubscribe successfully the first subscriber - Mqtt5UnsubAck result = subscriber1.unsubscribeWith() - .topicFilter(fullSharedSubscriptionTopicFilter) - .send(); - assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), - "Unsubscribe of shared subscription must be successful"); - - - // verify it's received from the survivor subscriber2 - Mqtt5BlockingClient publisherClient = createPublisherClient(); - // try 4 times we should hit all the 4 times the subscriber2 - // if the other shared subscription remains active we have 50% of possibility - // to hit the not removed subscriber, so 4 iterations should be enough. - for (int i = 0; i < 4; i++) { - verifyPublishedMessage(subscriber2, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + try (Mqtt5BlockingClient.Mqtt5Publishes subscriber2Listener = subscriber2.publishes(MqttGlobalPublishFilter.ALL)) { + + // unsubscribe successfully the first subscriber + Mqtt5UnsubAck result = subscriber1.unsubscribeWith() + .topicFilter(fullSharedSubscriptionTopicFilter) + .send(); + assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), + "Unsubscribe of shared subscription must be successful"); + + + // verify it's received from the survivor subscriber2 + Mqtt5BlockingClient publisherClient = createPublisherClient(); + // try 4 times we should hit all the 4 times the subscriber2 + // if the other shared subscription remains active we have 50% of possibility + // to hit the not removed subscriber, so 4 iterations should be enough. + for (int i = 0; i < 4; i++) { + verifyPublishedMessage(subscriber2Listener, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + } } } @@ -283,27 +288,29 @@ public void givenASharedSubscriptionWhenLastSubscribedClientUnsubscribeThenTheSh // subscribe client to shared subscription final Mqtt5BlockingClient subscriber = createHiveBlockingClient("subscriber1"); subscribe(subscriber, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // verify subscribed to the shared receives a message - Mqtt5BlockingClient publisherClient = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); - - // unsubscribe the only shared subscription client - Mqtt5UnsubAck result = subscriber.unsubscribeWith() - .topicFilter(fullSharedSubscriptionTopicFilter) - .send(); - assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), - "Unsubscribe of shared subscription must be successful"); - - // verify no publish is propagated by shared subscription - verifyNoPublish(subscriber, v -> { + // verify subscribed to the shared receives a message + Mqtt5BlockingClient publisherClient = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { // push a message to the shared subscription publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, Duration.ofSeconds(2), - "Subscriber must not receive any message from the left shared subscription"); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + + // unsubscribe the only shared subscription client + Mqtt5UnsubAck result = subscriber.unsubscribeWith() + .topicFilter(fullSharedSubscriptionTopicFilter) + .send(); + assertTrue(result.getReasonCodes().stream().allMatch(rc -> rc == Mqtt5UnsubAckReasonCode.SUCCESS), + "Unsubscribe of shared subscription must be successful"); + + // verify no publish is propagated by shared subscription + verifyNoPublish(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, Duration.ofSeconds(2), + "Subscriber must not receive any message from the left shared subscription"); + } } @Test @@ -313,22 +320,24 @@ public void givenASharedSubscriptionWhenLastSubscribedClientSessionTerminatesThe // subscribe client to shared subscription final Mqtt5BlockingClient subscriber = createCleanStartClient("subscriber1"); subscribe(subscriber, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // verify subscribed to the shared receives a message - Mqtt5BlockingClient publisherClient = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); - - // disconnect the subscriber, so becuase it's clean, wipe all shared subscriptions - subscriber.disconnect(); - - // verify that a publish on shared topic doesn't have any side effect - verifyNoPublish(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, Duration.ofSeconds(2), "Shared message must be received"); + // verify subscribed to the shared receives a message + Mqtt5BlockingClient publisherClient = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + + // disconnect the subscriber, so becuase it's clean, wipe all shared subscriptions + subscriber.disconnect(); + + // verify that a publish on shared topic doesn't have any side effect + verifyNoPublish(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, Duration.ofSeconds(2), "Shared message must be received"); + } } @Test @@ -338,25 +347,27 @@ public void givenASharedSubscriptionWhenBrokerRestartsAndClientReconnectsThenSha // subscribe client to shared subscription Mqtt5BlockingClient subscriber = createNonCleanStartClient("subscriber"); subscribe(subscriber, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - // verify subscribed to the shared receives a message - final Mqtt5BlockingClient publisherClient = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); - + // verify subscribed to the shared receives a message + final Mqtt5BlockingClient publisherClient = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { + // push a message to the shared subscription + publish(publisherClient, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + } // restart the broker restartServerWithSuspension(Duration.ofSeconds(2)); // reconnect subscriber subscriber = createNonCleanStartClient("subscriber"); - - // verify after restart the shared subscription becomes again active - final Mqtt5BlockingClient publisherClientReconnected = createPublisherClient(); - verifyPublishedMessage(subscriber, v -> { - // push a message to the shared subscription - publish(publisherClientReconnected, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); - }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + // verify after restart the shared subscription becomes again active + final Mqtt5BlockingClient publisherClientReconnected = createPublisherClient(); + verifyPublishedMessage(publishes, v -> { + // push a message to the shared subscription + publish(publisherClientReconnected, "metric/temperature/living", MqttQos.AT_LEAST_ONCE); + }, MqttQos.AT_LEAST_ONCE, "18", "Shared message must be received", 2); + } } }