Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class ReconnectStrategy {
public static void main(final String[] args) throws InterruptedException {
// defaultReconnect();
// customizedReconnect();
completelyCustom();
// completelyCustom();
gracefulDisconnectExample();
}

public static void defaultReconnect() {
Expand Down Expand Up @@ -109,4 +110,52 @@ private static CompletableFuture<byte[]> getOAuthToken() {
return new byte[] {1, 2, 3};
});
}

/**
* Demonstrates graceful disconnect functionality.
* This example shows how to use disconnectGracefully() to cleanly shut down
* a client even when automatic reconnection is enabled and the client is
* in a reconnecting state.
*/
public static void gracefulDisconnectExample() throws InterruptedException {
System.out.println("=== Graceful Disconnect Example ===");

final Mqtt5BlockingClient client = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
.automaticReconnect()
.initialDelay(1, TimeUnit.SECONDS)
.maxDelay(2, TimeUnit.SECONDS)
.applyAutomaticReconnect()
.addConnectedListener(context -> System.out.println("Connected: " + LocalTime.now()))
.addDisconnectedListener(context -> System.out.println("Disconnected: " + LocalTime.now() +
" (Source: " + context.getSource() + ")"))
.buildBlocking();

try {
// Connect the client
System.out.println("Connecting...");
client.connect();
System.out.println("Connected successfully!");

// Simulate network issues by turning off network (in real scenario)
System.out.println("Simulating network issues...");
System.out.println("Client state: " + client.getState());

// Wait a bit to let reconnection attempts start
TimeUnit.SECONDS.sleep(3);
System.out.println("Client state after network issues: " + client.getState());

// Now demonstrate graceful disconnect
System.out.println("Calling disconnectGracefully()...");
client.disconnectGracefully();
System.out.println("Graceful disconnect completed!");
System.out.println("Final client state: " + client.getState());

} catch (final Exception e) {
System.err.println("Error during graceful disconnect example: " + e.getMessage());
e.printStackTrace();
}

System.out.println("=== End Graceful Disconnect Example ===");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ public void publishes(
return new MqttDisconnectBuilder.Send<>(this::disconnect);
}

@Override
public @NotNull CompletableFuture<Void> disconnectGracefully() {
return RxFutureConverter.toFuture(delegate.disconnectGracefullyUnsafe());
}

@Override
public @NotNull MqttClientConfig getConfig() {
return delegate.getConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ public void disconnect(final @NotNull Mqtt5Disconnect disconnect) {
return new MqttDisconnectBuilder.SendVoid(this::disconnect);
}

@Override
public void disconnectGracefully() {
try {
delegate.disconnectGracefullyUnsafe().blockingAwait();
} catch (final RuntimeException e) {
throw AsyncRuntimeException.fillInStackTrace(e);
}
}

@Override
public @NotNull MqttClientConfig getConfig() {
return delegate.getConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.hivemq.client.internal.mqtt.handler.auth.MqttReAuthCompletable;
import com.hivemq.client.internal.mqtt.handler.connect.MqttConnAckSingle;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectCompletable;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectGracefulCompletable;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttGlobalIncomingPublishFlowable;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlowable;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable;
Expand Down Expand Up @@ -260,6 +261,10 @@ public MqttRxClient(final @NotNull MqttClientConfig clientConfig) {
return new MqttDisconnectCompletable(clientConfig, disconnect);
}

@NotNull Completable disconnectGracefullyUnsafe() {
return new MqttDisconnectGracefulCompletable(clientConfig);
}

@Override
public MqttDisconnectBuilder.@NotNull Nested<Completable> disconnectWith() {
return new MqttDisconnectBuilder.Nested<>(this::disconnect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ private static void reconnect(
}
}

if (reconnector.isReconnect()) {
if (reconnector.isReconnect() && clientConfig.getRawState().get() != DISCONNECTING_GRACEFULLY) {
clientConfig.getRawState().set(DISCONNECTED_RECONNECT);
eventLoop.schedule(() -> {
reconnector.getFuture().whenComplete((ignored, throwable) -> {
if (reconnector.isReconnect()) {
if (reconnector.isReconnect() && clientConfig.getRawState().get() != DISCONNECTING_GRACEFULLY) {
if (clientConfig.getRawState().compareAndSet(DISCONNECTED_RECONNECT, CONNECTING_RECONNECT)) {

clientConfig.setCurrentTransportConfig(reconnector.getTransportConfig());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.client.internal.mqtt.handler.disconnect;

import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.rx.CompletableFlow;
import io.netty.channel.Channel;
import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import io.reactivex.internal.disposables.EmptyDisposable;
import org.jetbrains.annotations.NotNull;

/**
* Completable for gracefully disconnecting the client, canceling any ongoing reconnection attempts.
*
* @author Silvio Giebl
* @since 1.4.0
*/
public class MqttDisconnectGracefulCompletable extends Completable {

private final @NotNull MqttClientConfig clientConfig;

public MqttDisconnectGracefulCompletable(final @NotNull MqttClientConfig clientConfig) {
this.clientConfig = clientConfig;
}

@Override
protected void subscribeActual(final @NotNull CompletableObserver s) {
final MqttClientConnectionConfig connectionConfig = clientConfig.getRawConnectionConfig();
if (connectionConfig == null) {
// If not connected, just complete successfully for graceful disconnect
EmptyDisposable.complete(s);
return;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connectionConfig will be null when a connection was never established due to an unavailable server or wrong configuration. In this case, the disconnection attempt is abandoned here and the auto-reconnect loop will still continue endlessly.

Tested with Mqtt3AsyncClient

final Channel channel = connectionConfig.getChannel();
final MqttDisconnectHandler disconnectHandler =
(MqttDisconnectHandler) channel.pipeline().get(MqttDisconnectHandler.NAME);
if (disconnectHandler == null) {
// If no disconnect handler, just complete successfully for graceful disconnect
EmptyDisposable.complete(s);
return;
}
final CompletableFlow flow = new CompletableFlow(s);
s.onSubscribe(flow);
disconnectHandler.disconnectGracefully(flow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import com.hivemq.client.internal.mqtt.message.connect.MqttConnectRestrictions;
import com.hivemq.client.internal.mqtt.message.connect.connack.MqttConnAck;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnectBuilder;
import com.hivemq.client.internal.rx.CompletableFlow;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.MqttVersion;
import com.hivemq.client.mqtt.exceptions.ConnectionClosedException;
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
Expand Down Expand Up @@ -139,6 +141,14 @@ public void disconnect(final @NotNull MqttDisconnect disconnect, final @NotNull
}
}

public void disconnectGracefully(final @NotNull CompletableFlow flow) {
if (!clientConfig.executeInEventLoop(() -> writeGracefulDisconnect(flow))) {
// If no event loop is available, just complete the flow successfully
// This handles the case where the client is not connected or in an invalid state
flow.onComplete();
}
}

private void writeDisconnect(final @NotNull MqttDisconnect disconnect, final @NotNull CompletableFlow flow) {
final ChannelHandlerContext ctx = this.ctx;
if ((ctx != null) && (state == null)) {
Expand All @@ -149,6 +159,27 @@ private void writeDisconnect(final @NotNull MqttDisconnect disconnect, final @No
}
}

private void writeGracefulDisconnect(final @NotNull CompletableFlow flow) {
// Set the client state to DISCONNECTING_GRACEFULLY to prevent reconnection
clientConfig.getRawState().set(MqttClientState.DISCONNECTING_GRACEFULLY);

final ChannelHandlerContext ctx = this.ctx;
if (ctx != null) {
// If we have an active connection, send a disconnect message
if (state == null) {
state = STATE_CLOSED;
final MqttDisconnect disconnect = new MqttDisconnectBuilder.Default().build();
fireDisconnectEvent(ctx.channel(), new MqttDisconnectEvent.ByUser(disconnect, flow));
} else {
// If already disconnected, just complete the flow
flow.onComplete();
}
} else {
// No active connection, just complete the flow
flow.onComplete();
}
}

@Override
protected void onDisconnectEvent(
final @NotNull ChannelHandlerContext ctx, final @NotNull MqttDisconnectEvent disconnectEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,19 @@ public void publishes(
return future;
}

@Override
public @NotNull CompletableFuture<Void> disconnectGracefully() {
final CompletableFuture<Void> future = new CompletableFuture<>();
delegate.disconnectGracefully().whenComplete((ignored, throwable) -> {
if (throwable != null) {
future.completeExceptionally(Mqtt3ExceptionFactory.map(throwable));
} else {
future.complete(null);
}
});
return future;
}

@Override
public @NotNull Mqtt3ClientConfig getConfig() {
return clientConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ public void disconnect() {
}
}

@Override
public void disconnectGracefully() {
try {
delegate.disconnectGracefully();
} catch (final RuntimeException e) {
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

@Override
public @NotNull Mqtt3ClientConfig getConfig() {
return clientConfig;
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/hivemq/client/mqtt/MqttClientState.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@ public enum MqttClientState {
* This means the client was {@link #DISCONNECTED_RECONNECT}, a Connect message is sent, but the ConnAck message is
* not received yet.
*/
CONNECTING_RECONNECT;
CONNECTING_RECONNECT,
/**
* The client is gracefully disconnecting, canceling any ongoing reconnection attempts.
* <p>
* This state is used when {@code disconnectGracefully()} is called to ensure the client
* transitions to a clean {@link #DISCONNECTED} state.
*/
DISCONNECTING_GRACEFULLY;

private static final @NotNull EnumSet<MqttClientState> CONNECTED_OR_RECONNECT =
EnumSet.of(CONNECTED, DISCONNECTED_RECONNECT, CONNECTING_RECONNECT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,22 @@ void publishes(
*/
@NotNull CompletableFuture<Void> disconnect();

/**
* Gracefully disconnects the client, canceling any ongoing reconnection attempts.
* <p>
* This method can be called from any client state and will ensure the client
* transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state.
* Unlike the regular {@link #disconnect()} method, this will not throw an exception
* when the client is in a reconnecting state.
* <p>
* This is particularly useful when automatic reconnection is enabled and you need
* to cleanly shut down the client regardless of its current connection state.
*
* @return the {@link CompletableFuture} which completes when the client is fully disconnected
* @since 1.4.0
*/
@NotNull CompletableFuture<Void> disconnectGracefully();

@Override
@CheckReturnValue
default @NotNull Mqtt3AsyncClient toAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,21 @@ public interface Mqtt3BlockingClient extends Mqtt3Client {
*/
void disconnect();

/**
* Gracefully disconnects the client, canceling any ongoing reconnection attempts.
* <p>
* This method can be called from any client state and will ensure the client
* transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state.
* Unlike the regular {@link #disconnect()} method, this will not throw an exception
* when the client is in a reconnecting state.
* <p>
* This is particularly useful when automatic reconnection is enabled and you need
* to cleanly shut down the client regardless of its current connection state.
*
* @since 1.4.0
*/
void disconnectGracefully();

@Override
@CheckReturnValue
default @NotNull Mqtt3BlockingClient toBlocking() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,22 @@ void publishes(
@CheckReturnValue
Mqtt5DisconnectBuilder.@NotNull Send<CompletableFuture<Void>> disconnectWith();

/**
* Gracefully disconnects the client, canceling any ongoing reconnection attempts.
* <p>
* This method can be called from any client state and will ensure the client
* transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state.
* Unlike the regular {@link #disconnect()} method, this will not throw an exception
* when the client is in a reconnecting state.
* <p>
* This is particularly useful when automatic reconnection is enabled and you need
* to cleanly shut down the client regardless of its current connection state.
*
* @return the {@link CompletableFuture} which completes when the client is fully disconnected
* @since 1.4.0
*/
@NotNull CompletableFuture<Void> disconnectGracefully();

@Override
@CheckReturnValue
default @NotNull Mqtt5AsyncClient toAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,21 @@ public interface Mqtt5BlockingClient extends Mqtt5Client {
@CheckReturnValue
Mqtt5DisconnectBuilder.@NotNull SendVoid disconnectWith();

/**
* Gracefully disconnects the client, canceling any ongoing reconnection attempts.
* <p>
* This method can be called from any client state and will ensure the client
* transitions to a clean {@link com.hivemq.client.mqtt.MqttClientState#DISCONNECTED DISCONNECTED} state.
* Unlike the regular {@link #disconnect()} method, this will not throw an exception
* when the client is in a reconnecting state.
* <p>
* This is particularly useful when automatic reconnection is enabled and you need
* to cleanly shut down the client regardless of its current connection state.
*
* @since 1.4.0
*/
void disconnectGracefully();

@Override
@CheckReturnValue
default @NotNull Mqtt5BlockingClient toBlocking() {
Expand Down
Loading