From c9b47406369622a1c6d428b57ad73fa9cd977ea6 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 18 Dec 2024 17:42:22 +0100 Subject: [PATCH] Implemented server keep alive setting and usage in connect (#883) Updates the configuration harness to expose a duration setting name server_keep_alive. Updated the connAck properties creation to include server keep alive if the broker configuration has selected it. Updates the connect processing to leverage the server keep alive selected keep alive instead of the client one. --- ChangeLog.txt | 1 + .../moquette/broker/BrokerConfiguration.java | 10 ++++++ .../io/moquette/broker/MQTTConnection.java | 11 +++++++ .../moquette/broker/config/FluentConfig.java | 8 +++++ .../io/moquette/broker/config/IConfig.java | 1 + .../integration/mqtt5/ConnectTest.java | 31 +++++++++++++++++++ distribution/src/main/resources/moquette.conf | 9 ++++++ 7 files changed, 71 insertions(+) diff --git a/ChangeLog.txt b/ChangeLog.txt index 6c7741633..1c036e81c 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] Server keep alive: added configuration setting so that the broker can specify a keep alive other then the one selected by clients. (#789) [feature] User properties: covered publish with user properties with tests and fixed publish of will messages. (#877) [feature] Topic alias: implemented handling of topic alias received by publishers. (#873) [feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858) diff --git a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java index 421ad5fbc..dfbc14363 100644 --- a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java +++ b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java @@ -19,6 +19,7 @@ import io.moquette.broker.config.IConfig; import java.util.Locale; +import java.util.Optional; class BrokerConfiguration { @@ -30,6 +31,7 @@ class BrokerConfiguration { private final int topicAliasMaximum; // integer max value means that the property is unset private int receiveMaximum; + private Optional serverKeepAlive = Optional.empty(); BrokerConfiguration(IConfig props) { allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true); @@ -70,6 +72,10 @@ class BrokerConfiguration { receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM); topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS); + + if (props.getProperty(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME) != null) { + serverKeepAlive = Optional.of((int) props.durationProp(IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME).toMillis() / 1_000); + } } // test method @@ -133,4 +139,8 @@ public int receiveMaximum() { public int topicAliasMaximum() { return topicAliasMaximum; } + + public Optional getServerKeepAlive() { + return serverKeepAlive; + } } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 74085c400..a7930c860 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -358,6 +358,10 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum); } + if (brokerConfig.getServerKeepAlive().isPresent()) { + connAckPropertiesBuilder.serverKeepAlive(brokerConfig.getServerKeepAlive().get()); + } + final MqttProperties ackProperties = connAckPropertiesBuilder.build(); connAckBuilder.properties(ackProperties); } @@ -464,6 +468,13 @@ private void setupInflightResender(Channel channel) { private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) { int keepAlive = msg.variableHeader().keepAliveTimeSeconds(); + + // force server keep alive if configured + if (brokerConfig.getServerKeepAlive().isPresent()) { + int serverKeepAlive = brokerConfig.getServerKeepAlive().get(); + LOG.info("Forcing server keep alive ({}) over client selection ({})", serverKeepAlive, keepAlive); + keepAlive = serverKeepAlive; + } NettyUtils.keepAlive(channel, keepAlive); NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession()); NettyUtils.clientID(channel, clientId); diff --git a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java index 93d219fb1..46f296e9c 100644 --- a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.Locale; import java.util.Properties; import java.util.function.Consumer; @@ -30,6 +31,7 @@ import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM; +import static io.moquette.broker.config.IConfig.SERVER_KEEP_ALIVE_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE; import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SSL_PROVIDER; @@ -215,6 +217,12 @@ public FluentConfig topicAliasMaximum(int topicAliasMaximum) { return this; } + public FluentConfig serverKeepAlive(Duration keepAliveSeconds) { + int seconds = (int) keepAliveSeconds.toMillis() / 1_000; + configAccumulator.put(SERVER_KEEP_ALIVE_PROPERTY_NAME, seconds + "s"); + return this; + } + public class TLSConfig { private SSLProvider providerType; diff --git a/broker/src/main/java/io/moquette/broker/config/IConfig.java b/broker/src/main/java/io/moquette/broker/config/IConfig.java index ca2ef181d..f73fdad32 100644 --- a/broker/src/main/java/io/moquette/broker/config/IConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/IConfig.java @@ -68,6 +68,7 @@ public abstract class IConfig { public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos"; public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092; public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum"; + public static final String SERVER_KEEP_ALIVE_PROPERTY_NAME = "server_keep_alive"; public abstract void setProperty(String name, String value); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java index a977e7693..906a83970 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java @@ -13,6 +13,8 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublishBuilder; +import io.moquette.broker.config.FluentConfig; +import io.moquette.broker.config.IConfig; import io.moquette.testclient.Client; import io.netty.handler.codec.mqtt.*; import org.awaitility.Awaitility; @@ -70,6 +72,35 @@ public void simpleConnect() { client.disconnect(); } + @Test + public void givenServerKeepAliveConfiguredThenConnectAckMustRespectIt() throws IOException { + stopServer(); + IConfig config = new FluentConfig() + .dataPath(dbPath) + .enablePersistence() + .port(1883) + .disableTelemetry() + .persistentQueueType(FluentConfig.PersistentQueueType.SEGMENTED) + .serverKeepAlive(Duration.ofSeconds(12)) + .build(); + startServer(config); + + Mqtt5BlockingClient client = MqttClient.builder() + .useMqttVersion5() + .identifier("simple_connect_test") + .serverHost("localhost") + .serverPort(1883) + .buildBlocking(); + final Mqtt5ConnAck connectAck = client.connect(); + assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Accept plain connection"); + assertTrue(connectAck.getServerKeepAlive().isPresent()); + connectAck.getServerKeepAlive().ifPresent(serverKeepAlive -> { + assertEquals(12 ,serverKeepAlive); + }); + + client.disconnect(); + } + @Test public void sendConnectOnDisconnectedConnection() throws InterruptedException { MqttConnAckMessage connAck = lowLevelClient.connectV5(); diff --git a/distribution/src/main/resources/moquette.conf b/distribution/src/main/resources/moquette.conf index b863cced4..b13faad63 100644 --- a/distribution/src/main/resources/moquette.conf +++ b/distribution/src/main/resources/moquette.conf @@ -234,3 +234,12 @@ password_file config/password_file.conf # default: 0 (disabled) #********************************************************************* # topic_alias_maximum 16 + +#********************************************************************* +# Keep alive provided by server, only for MQTT5. +# +# server_keep_alive: +# Option used to configure a server preferred keep alive that the client must respect. +# default: empty (disabled) +#********************************************************************* +# server_keep_alive 2s