@@ -45,9 +45,10 @@ class MqttClient implements ClientContract
4545 public const SOCKET_READ_BUFFER_SIZE = 8192 ;
4646 private string $ clientId ;
4747 private ConnectionSettings $ settings ;
48- private string $ buffer = '' ;
49- private bool $ connected = false ;
50- private ?float $ lastPingAt = null ;
48+ private string $ buffer = '' ;
49+ private bool $ connected = false ;
50+ private ?float $ lastPingAt = null ;
51+ private ?float $ pingResponseExpectedUntil = null ;
5152 private MessageProcessor $ messageProcessor ;
5253 private Repository $ repository ;
5354 private LoggerInterface $ logger ;
@@ -489,6 +490,7 @@ public function disconnect(): void
489490 stream_socket_shutdown ($ this ->socket , STREAM_SHUT_WR );
490491 }
491492
493+ $ this ->socket = null ;
492494 $ this ->connected = false ;
493495 }
494496
@@ -650,6 +652,10 @@ public function loopOnce(float $loopStartedAt, bool $allowSleep = false, int $sl
650652 // This includes published messages, subscribe and unsubscribe requests.
651653 $ this ->resendPendingMessages ();
652654
655+ // If the last ping we sent has not been answered within the configured keep alive interval,
656+ // we assume the connection is dead and try to reconnect if configured to do so.
657+ $ this ->handlePingTimeout ();
658+
653659 // If the last message of the broker has been received more seconds ago
654660 // than specified by the keep alive time, we will send a ping to ensure
655661 // the connection is kept alive.
@@ -869,6 +875,14 @@ protected function handleMessage(Message $message): void
869875 $ this ->writeToSocketWithAutoReconnect ($ this ->messageProcessor ->buildPingResponseMessage ());
870876 return ;
871877 }
878+
879+ // PINGRESP
880+ if ($ message ->getType ()->equals (MessageType::PING_RESPONSE ())) {
881+ // Set the ping response expectation to null, so we know the broker responded.
882+ $ this ->logger ->debug ('Received ping response from the broker. ' );
883+ $ this ->pingResponseExpectedUntil = null ;
884+ return ;
885+ }
872886 }
873887
874888 /**
@@ -1020,6 +1034,39 @@ protected function ping(): void
10201034 $ this ->logger ->debug ('Sending ping to the broker to keep the connection alive. ' );
10211035
10221036 $ this ->writeToSocketWithAutoReconnect ($ this ->messageProcessor ->buildPingRequestMessage ());
1037+
1038+ // Set the deadline for receiving a PINGRESP message according to MQTT specification requirements.
1039+ $ this ->pingResponseExpectedUntil = microtime (true ) + $ this ->settings ->getKeepAliveInterval ();
1040+ }
1041+
1042+ /**
1043+ * Handles a ping timeout by closing the socket and trying to reconnect if configured to do so.
1044+ *
1045+ * @throws DataTransferException
1046+ */
1047+ protected function handlePingTimeout (): void
1048+ {
1049+ if ($ this ->pingResponseExpectedUntil === null || microtime (true ) < $ this ->pingResponseExpectedUntil ) {
1050+ return ;
1051+ }
1052+
1053+ $ this ->logger ->warning (
1054+ 'The broker did not respond to our ping request within the configured keep alive interval. Assuming the connection is dead. '
1055+ );
1056+
1057+ $ this ->closeSocket ();
1058+
1059+ $ this ->lastPingAt = null ;
1060+ $ this ->pingResponseExpectedUntil = null ;
1061+
1062+ if (!$ this ->settings ->shouldReconnectAutomatically ()) {
1063+ throw new DataTransferException (
1064+ DataTransferException::EXCEPTION_RX_DATA ,
1065+ 'No ping response received in time. The connection is dead. '
1066+ );
1067+ }
1068+
1069+ $ this ->reconnect ();
10231070 }
10241071
10251072 /**
@@ -1231,6 +1278,7 @@ protected function closeSocket(): void
12311278 ]);
12321279 }
12331280
1234- $ this ->socket = null ;
1281+ $ this ->socket = null ;
1282+ $ this ->connected = false ;
12351283 }
12361284}
0 commit comments