Skip to content

Commit

Permalink
Merge pull request #2109 from bosch-io/feature/mqtt-should-resolve-se…
Browse files Browse the repository at this point in the history
…rver-address

#2108 MQTT Connection - add posibility to configure if server address should be resolved by ditto or on demand by mqtt client
  • Loading branch information
thjaeckle authored Feb 3, 2025
2 parents 2ba6fc5 + b4cbc74 commit 156c730
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ final class DefaultMqttConfig implements MqttConfig {
private final int maxQueueSize;
private final int eventLoopThreads;
private final boolean cleanSession;
private final boolean shouldResolveServerAddress;
private final boolean reconnectForRedelivery;
private final Duration reconnectForRedeliveryDelay;
private final SessionExpiryInterval sessionExpiryInterval;
Expand All @@ -54,6 +55,7 @@ final class DefaultMqttConfig implements MqttConfig {
private DefaultMqttConfig(final ScopedConfig config) {
eventLoopThreads = config.getNonNegativeIntOrThrow(MqttConfigValue.EVENT_LOOP_THREADS);
cleanSession = config.getBoolean(MqttConfigValue.CLEAN_SESSION.getConfigPath());
shouldResolveServerAddress = config.getBoolean(MqttConfigValue.SHOULD_RESOLVE_SERVER_ADDRESS.getConfigPath());
reconnectForRedelivery = config.getBoolean(MqttConfigValue.RECONNECT_FOR_REDELIVERY.getConfigPath());
reconnectForRedeliveryDelay =
config.getNonNegativeDurationOrThrow(MqttConfigValue.RECONNECT_FOR_REDELIVERY_DELAY);
Expand Down Expand Up @@ -108,6 +110,11 @@ public boolean isCleanSession() {
return cleanSession;
}

@Override
public boolean shouldResolveServerAddress() {
return shouldResolveServerAddress;
}

@Override
public boolean shouldReconnectForRedelivery() {
return reconnectForRedelivery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface MqttConfig {
*/
boolean isCleanSession();

boolean shouldResolveServerAddress();

/**
* Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
*
Expand Down Expand Up @@ -136,6 +138,11 @@ enum MqttConfigValue implements KnownConfigValue {
*/
CLEAN_SESSION("clean-session", false),

/**
* Indicates whether the provided connection uri should be resolved in-demand by ditto or on-demand.
*/
SHOULD_RESOLVE_SERVER_ADDRESS("should-resolve-server-address", true),

/**
* Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
import com.typesafe.config.Config;

import scala.concurrent.ExecutionContextExecutor;

import io.reactivex.disposables.Disposable;
import scala.concurrent.ExecutionContextExecutor;

/**
* Actor for handling connection to an MQTT broker for protocol versions 3 or 5.
Expand All @@ -89,6 +88,7 @@ public final class MqttClientActor extends BaseClientActor {
private final GenericMqttClientFactory genericMqttClientFactory;
@Nullable private GenericMqttClient genericMqttClient;
private final AtomicBoolean automaticReconnect;
private final RetryTimeoutStrategy retryTimeoutStrategy;
@Nullable private ActorRef publishingActorRef;
private final List<ActorRef> mqttConsumerActorRefs;
@Nullable private Disposable unsolicitedPublishesAutoAckSubscription;
Expand All @@ -105,6 +105,8 @@ private MqttClientActor(final Connection connection,
final var connectivityConfig = connectivityConfig();
final var connectionConfig = connectivityConfig.getConnectionConfig();
mqttConfig = connectionConfig.getMqttConfig();
retryTimeoutStrategy = RetryTimeoutStrategy.newDuplicationRetryTimeoutStrategy(
mqttConfig.getReconnectBackOffConfig().getTimeoutConfig());

mqttSpecificConfig = MqttSpecificConfig.fromConnection(connection, mqttConfig);

Expand Down Expand Up @@ -312,7 +314,6 @@ private static String getClientId(final ClientRole clientRole,
private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
return (context, clientRole) -> {
final var mqttClientReconnector = context.getReconnector();
final var retryTimeoutStrategy = getRetryTimeoutStrategy();

if (0 == mqttClientReconnector.getAttempts()) {
retryTimeoutStrategy.reset();
Expand Down Expand Up @@ -363,11 +364,6 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() {
};
}

private RetryTimeoutStrategy getRetryTimeoutStrategy() {
final var reconnectBackOffConfig = mqttConfig.getReconnectBackOffConfig();
return RetryTimeoutStrategy.newDuplicationRetryTimeoutStrategy(reconnectBackOffConfig.getTimeoutConfig());
}

private static boolean isMqttClientInConnectingState(final MqttClientConfig mqttClientConfig) {
return MqttClientState.CONNECTING == mqttClientConfig.getState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private static MqttClientBuilder getGenericMqttClientBuilder(
final var mqttConfig = hiveMqttClientProperties.getMqttConfig();

return MqttClient.builder()
.serverAddress(getInetSocketAddress(getConnectionUri(hiveMqttClientProperties)))
.serverAddress(getInetSocketAddress(getConnectionUri(hiveMqttClientProperties),
hiveMqttClientProperties.getMqttConfig().shouldResolveServerAddress()))
.executorConfig(getMqttClientExecutorConfig(mqttConfig.getEventLoopThreads()))
.sslConfig(getMqttClientSslConfig(hiveMqttClientProperties).orElse(null))
.addConnectedListener(getConnectedListener(
Expand All @@ -134,8 +135,9 @@ private static URI getConnectionUri(final HiveMqttClientProperties hiveMqttClien
return sshTunnelState.getURI(hiveMqttClientProperties.getMqttConnection());
}

private static InetSocketAddress getInetSocketAddress(final URI connectionUri) {
return new InetSocketAddress(connectionUri.getHost(), connectionUri.getPort());
private static InetSocketAddress getInetSocketAddress(final URI connectionUri, final boolean shouldResolveServerAddress) {
return shouldResolveServerAddress ? new InetSocketAddress(connectionUri.getHost(), connectionUri.getPort()) :
InetSocketAddress.createUnresolved(connectionUri.getHost(), connectionUri.getPort());
}

private static MqttClientExecutorConfig getMqttClientExecutorConfig(final int eventLoopThreadNumber) {
Expand Down
6 changes: 6 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,12 @@ ditto {
clean-session = false
clean-session = ${?CONNECTIVITY_MQTT_CLEAN_SESSION}

# Indicates whether the provided connection uri should be resolved in-demand by ditto or on-demand.
# When true, the address will be resolved before passing it to the used mqtt client.
# When false, the address will be unresolved and will rely on the used mqtt client to resolve it when needed.
should-resolve-server-address = true
should-resolve-server-address = ${?CONNECTIVITY_MQTT_SHOULD_RESOLVE_SERVER_ADDRESS}

# Indicates whether the client should reconnect to enforce a redelivery for a failed acknowledgement.
reconnect-for-redelivery = false
reconnect-for-redelivery = ${?CONNECTIVITY_MQTT_RECONNECT_FOR_REDELIVERY}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void before() {
Mockito.when(mqttConnection.getPassword()).thenReturn(Optional.of(PASSWORD));

Mockito.when(mqttConfig.getEventLoopThreads()).thenReturn(EVENT_LOOP_THREAD_NUMBER);
Mockito.when(mqttConfig.shouldResolveServerAddress()).thenReturn(Boolean.TRUE);

final var connectionConfig = Mockito.mock(ConnectionConfig.class);
Mockito.when(connectionConfig.getMqttConfig()).thenReturn(mqttConfig);
Expand Down Expand Up @@ -396,4 +397,41 @@ public void getMqtt5ClientWithoutLastWillWithSslReturnsExpected() throws NoMqttC
});
}

@Test
public void getMqttClientWithShouldResolveServerAddressFalseAddressShouldBeUnresolved()
throws NoMqttConnectionException {
Mockito.when(mqttConfig.shouldResolveServerAddress()).thenReturn(Boolean.FALSE);
final var hiveMqttClientProperties = HiveMqttClientProperties.builder()
.withMqttConnection(mqttConnection)
.withConnectivityConfig(connectivityConfig)
.withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig))
.withSshTunnelStateSupplier(sshTunnelStateSupplier)
.withConnectionLogger(connectionLogger)
.withActorUuid(ACTOR_UUID)
.withClientConnectedListener(mqttClientConnectedListener)
.withClientDisconnectedListener(mqttClientDisconnectedListener)
.build();

final var mqtt3ClientUnderTest = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties,
MQTT_CLIENT_IDENTIFIER,
ClientRole.CONSUMER_PUBLISHER);

final var mqtt3ClientConfig = mqtt3ClientUnderTest.getConfig();
softly.assertThat(mqtt3ClientConfig.getTransportConfig())
.as("transport config")
.satisfies(transportConfig -> {
softly.assertThat(transportConfig.getServerAddress().isUnresolved()).isTrue();
});

final var mqtt5ClientUnderTest = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties,
MQTT_CLIENT_IDENTIFIER,
ClientRole.CONSUMER_PUBLISHER);

final var mqtt5ClientConfig = mqtt5ClientUnderTest.getConfig();
softly.assertThat(mqtt5ClientConfig.getTransportConfig())
.as("transport config")
.satisfies(transportConfig -> {
softly.assertThat(transportConfig.getServerAddress().isUnresolved()).isTrue();
});
}
}

0 comments on commit 156c730

Please sign in to comment.