diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index 6005e63e9..161a1d3a5 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -81,17 +81,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo }); // wait for the SUBACK in 1 second, else if PUB is sent before the client is fully subscribed, then it's lost - try { - Mqtt5SubAck mqtt5SubAck = subackFuture.get(1, TimeUnit.SECONDS); - assertEquals(1, mqtt5SubAck.getReasonCodes().size()); - assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1, mqtt5SubAck.getReasonCodes().iterator().next()); - } catch (InterruptedException e) { - fail("Sub ack waiting interrupted before 1 sec expires"); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - fail("Sub ack didn't arrive in 1 second timeout"); - } + waitForSubAck(subackFuture); Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() .topic("requester/door/open") @@ -123,7 +113,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW .topicFilter("requester/door/open") .qos(MqttQos.AT_LEAST_ONCE) .build(); - responder.toAsync().subscribe(subscribeToRequest, + CompletableFuture<@NotNull Mqtt5SubAck> subackFuture = responder.toAsync().subscribe(subscribeToRequest, (Mqtt5Publish pub) -> { assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); assertTrue(pub.getCorrelationData().isPresent(), "Correlation data MUST defined in request publish"); @@ -134,6 +124,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW .send(); assertFalse(responseResult.getError().isPresent(), "Open door response cannot be published "); }); + waitForSubAck(subackFuture); Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() .topic("requester/door/open") @@ -155,6 +146,20 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW }); } + private static void waitForSubAck(CompletableFuture<@NotNull Mqtt5SubAck> subackFuture) { + try { + Mqtt5SubAck mqtt5SubAck = subackFuture.get(1, TimeUnit.SECONDS); + assertEquals(1, mqtt5SubAck.getReasonCodes().size()); + assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1, mqtt5SubAck.getReasonCodes().iterator().next()); + } catch (InterruptedException e) { + fail("Sub ack waiting interrupted before 1 sec expires"); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + fail("Sub ack didn't arrive in 1 second timeout"); + } + } + private byte[] asByteArray(ByteBuffer byteBuffer) { byte[] arr = new byte[byteBuffer.remaining()]; byteBuffer.get(arr);