Skip to content

Commit

Permalink
[Test] Added wait for SUBACK also to the other test
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Nov 17, 2024
1 parent e4934c3 commit 90a0151
Showing 1 changed file with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,7 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo
});

// wait for the SUBACK in 1 second, else if PUB is sent before the client is fully subscribed, then it's lost
try {
Mqtt5SubAck mqtt5SubAck = subackFuture.get(1, TimeUnit.SECONDS);
assertEquals(1, mqtt5SubAck.getReasonCodes().size());
assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1, mqtt5SubAck.getReasonCodes().iterator().next());
} catch (InterruptedException e) {
fail("Sub ack waiting interrupted before 1 sec expires");
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
fail("Sub ack didn't arrive in 1 second timeout");
}
waitForSubAck(subackFuture);

Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith()
.topic("requester/door/open")
Expand Down Expand Up @@ -123,7 +113,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW
.topicFilter("requester/door/open")
.qos(MqttQos.AT_LEAST_ONCE)
.build();
responder.toAsync().subscribe(subscribeToRequest,
CompletableFuture<@NotNull Mqtt5SubAck> subackFuture = responder.toAsync().subscribe(subscribeToRequest,
(Mqtt5Publish pub) -> {
assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish");
assertTrue(pub.getCorrelationData().isPresent(), "Correlation data MUST defined in request publish");
Expand All @@ -134,6 +124,7 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW
.send();
assertFalse(responseResult.getError().isPresent(), "Open door response cannot be published ");
});
waitForSubAck(subackFuture);

Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith()
.topic("requester/door/open")
Expand All @@ -155,6 +146,20 @@ public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyW
});
}

private static void waitForSubAck(CompletableFuture<@NotNull Mqtt5SubAck> subackFuture) {
try {
Mqtt5SubAck mqtt5SubAck = subackFuture.get(1, TimeUnit.SECONDS);
assertEquals(1, mqtt5SubAck.getReasonCodes().size());
assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1, mqtt5SubAck.getReasonCodes().iterator().next());
} catch (InterruptedException e) {
fail("Sub ack waiting interrupted before 1 sec expires");
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
fail("Sub ack didn't arrive in 1 second timeout");
}
}

private byte[] asByteArray(ByteBuffer byteBuffer) {
byte[] arr = new byte[byteBuffer.remaining()];
byteBuffer.get(arr);
Expand Down

0 comments on commit 90a0151

Please sign in to comment.