Skip to content

Commit

Permalink
Fixed publish verification to tests, to subscribe to listened action …
Browse files Browse the repository at this point in the history
…before triggering the action
  • Loading branch information
andsel committed Nov 23, 2024
1 parent 254a845 commit 249967a
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,35 +148,39 @@ protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer<V
}
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer<Void> action, MqttQos expectedQos,
protected static void verifyNoPublish(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer<Void> action, Duration timeout, String message) throws InterruptedException {
action.accept(null);
Optional<Mqtt5Publish> publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS);

// verify no published will in 10 seconds
assertFalse(publishedMessage.isPresent(), message);
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer<Void> action, MqttQos expectedQos,
String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
}

static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) {
assertEquals(mqttMessageType, received.fixedHeader().messageType());
}

static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
static void verifyPublishMessage(Mqtt5BlockingClient.Mqtt5Publishes publishListener, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
Optional<Mqtt5Publish> publishMessage = publishListener.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
}
}
72 changes: 39 additions & 33 deletions broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,30 +180,32 @@ public void avoidToFirePreviouslyScheduledWillWhenSameClientIDReconnects() throw
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// client trigger a will message, disconnecting with bad reason code
final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET)
.build();
clientWithWill.disconnect(malformedPacketReason);
// client trigger a will message, disconnecting with bad reason code
final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET)
.build();
clientWithWill.disconnect(malformedPacketReason);

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// reconnect another client with same clientId
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected");

}, Duration.ofSeconds(10));
// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// reconnect another client with same clientId
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected");

}, Duration.ofSeconds(10));
}
}

private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient testamentSubscriber, Consumer<Void> action, Duration timeout) throws InterruptedException {
verifyNoPublish(testamentSubscriber, action, timeout, "No will message should be published");
private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient.Mqtt5Publishes testamentListener, Consumer<Void> action, Duration timeout) throws InterruptedException {
verifyNoPublish(testamentListener, action, timeout, "No will message should be published");
}

@Test
Expand All @@ -230,12 +232,14 @@ public void noWillMessageIsFiredOnNormalDisconnection() throws InterruptedExcept
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// normal session disconnection
clientWithWill.disconnect(Mqtt5Disconnect.builder().build());
}, Duration.ofSeconds(10));
// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// normal session disconnection
clientWithWill.disconnect(Mqtt5Disconnect.builder().build());
}, Duration.ofSeconds(10));
}
}

@Test
Expand All @@ -245,14 +249,16 @@ public void givenClientWithWillThatCleanlyDisconnectsWithWillShouldTriggerTheTes
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// normal session disconnection with will
clientWithWill.disconnect(Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE)
.build());
}, Duration.ofSeconds(10));
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// normal session disconnection with will
clientWithWill.disconnect(Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE)
.build());
}, Duration.ofSeconds(10));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
Expand Down Expand Up @@ -65,17 +66,19 @@ public void givenAPublishWithContentTypeWhenForwardedToSubscriberThenIsPresent()
.send();

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.contentType("application/json")
.qos(MqttQos.AT_MOST_ONCE)
.send();
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.contentType("application/json")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present");
assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched");
});
verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present");
assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched");
});
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
Expand Down Expand Up @@ -68,13 +69,16 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimePassedThenRetainedIs

// subscribe to same topic and verify no message
Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyNoPublish(subscriber, v -> {}, Duration.ofSeconds(2),
"Subscriber must not receive any retained message");
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyNoPublish(publishes, v -> {
}, Duration.ofSeconds(2),
"Subscriber must not receive any retained message");
}
}

// TODO verify the elapsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
Expand Down Expand Up @@ -54,7 +53,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThat;

public class PayloadFormatIndicatorTest extends AbstractServerIntegrationTest {

Expand All @@ -67,30 +65,25 @@ public String clientName() {
}

@Test
public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException {
MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
client.connect();
MqttSubscription subscription = new MqttSubscription("temperature/living", 1);
SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector();
IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
TestUtils.verifySubscribedSuccessfully(subscribeToken);

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.qos(MqttQos.AT_LEAST_ONCE)
public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException {
Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.qos(MqttQos.AT_MOST_ONCE)
.send();

// Verify the message is also reflected back to the sender
publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS);
assertEquals("temperature/living", publishCollector.receivedTopic());
assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match");
org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage();
assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos());
assertTrue(receivedMessage.getProperties().getPayloadFormat());
verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
});
}
}

@Test
Expand Down
Loading

0 comments on commit 249967a

Please sign in to comment.