@@ -137,6 +137,12 @@ public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions option
137
137
}
138
138
}
139
139
140
+ if ( connectResult . ResultCode != MqttClientConnectResultCode . Success )
141
+ {
142
+ _logger . Warning ( "Connecting failed: {0}" , connectResult . ResultCode ) ;
143
+ return connectResult ;
144
+ }
145
+
140
146
_lastPacketSentTimestamp = DateTime . UtcNow ;
141
147
142
148
var keepAliveInterval = Options . KeepAlivePeriod ;
@@ -434,8 +440,7 @@ async Task<MqttClientConnectResult> Authenticate(IMqttChannelAdapter channelAdap
434
440
435
441
if ( receivedPacket is MqttConnAckPacket connAckPacket )
436
442
{
437
- var clientConnectResultFactory = new MqttClientConnectResultFactory ( ) ;
438
- result = clientConnectResultFactory . Create ( connAckPacket , channelAdapter . PacketFormatterAdapter . ProtocolVersion ) ;
443
+ result = MqttClientResultFactory . ConnectResult . Create ( connAckPacket , _adapter . PacketFormatterAdapter . ProtocolVersion ) ;
439
444
}
440
445
else
441
446
{
@@ -447,9 +452,18 @@ async Task<MqttClientConnectResult> Authenticate(IMqttChannelAdapter channelAdap
447
452
throw new MqttConnectingFailedException ( $ "Error while authenticating. { exception . Message } ", exception , null ) ;
448
453
}
449
454
450
- if ( result . ResultCode != MqttClientConnectResultCode . Success )
455
+ // This is no feature. It is basically a backward compatibility option and should be removed in the future.
456
+ // The client should not throw any exception if the transport layer connection was successful and the server
457
+ // did send a proper ACK packet with a non success response.
458
+ if ( options . ThrowOnNonSuccessfulConnectResponse )
451
459
{
452
- throw new MqttConnectingFailedException ( $ "Connecting with MQTT server failed ({ result . ResultCode } ).", null , result ) ;
460
+ _logger . Warning (
461
+ "Client will now throw an _MqttConnectingFailedException_. This is obsolete and will be removed in the future. Consider setting _ThrowOnNonSuccessfulResponseFromServer=False_ in client options." ) ;
462
+
463
+ if ( result . ResultCode != MqttClientConnectResultCode . Success )
464
+ {
465
+ throw new MqttConnectingFailedException ( $ "Connecting with MQTT server failed ({ result . ResultCode } ).", null , result ) ;
466
+ }
453
467
}
454
468
455
469
_logger . Verbose ( "Authenticated MQTT connection with server established." ) ;
@@ -498,9 +512,11 @@ async Task<MqttClientConnectResult> ConnectInternal(IMqttChannelAdapter channelA
498
512
_publishPacketReceiverQueue = new AsyncQueue < MqttPublishPacket > ( ) ;
499
513
500
514
var connectResult = await Authenticate ( channelAdapter , Options , effectiveCancellationToken . Token ) . ConfigureAwait ( false ) ;
501
-
502
- _publishPacketReceiverTask = Task . Run ( ( ) => ProcessReceivedPublishPackets ( backgroundCancellationToken ) , backgroundCancellationToken ) ;
503
- _packetReceiverTask = Task . Run ( ( ) => ReceivePacketsLoop ( backgroundCancellationToken ) , backgroundCancellationToken ) ;
515
+ if ( connectResult . ResultCode == MqttClientConnectResultCode . Success )
516
+ {
517
+ _publishPacketReceiverTask = Task . Run ( ( ) => ProcessReceivedPublishPackets ( backgroundCancellationToken ) , backgroundCancellationToken ) ;
518
+ _packetReceiverTask = Task . Run ( ( ) => ReceivePacketsLoop ( backgroundCancellationToken ) , backgroundCancellationToken ) ;
519
+ }
504
520
505
521
return connectResult ;
506
522
}
@@ -754,6 +770,65 @@ async Task<MqttPacket> Receive(CancellationToken cancellationToken)
754
770
return packet ;
755
771
}
756
772
773
+ async Task ReceivePacketsLoop ( CancellationToken cancellationToken )
774
+ {
775
+ try
776
+ {
777
+ _logger . Verbose ( "Start receiving packets." ) ;
778
+
779
+ while ( ! cancellationToken . IsCancellationRequested )
780
+ {
781
+ var packet = await Receive ( cancellationToken ) . ConfigureAwait ( false ) ;
782
+
783
+ if ( cancellationToken . IsCancellationRequested )
784
+ {
785
+ return ;
786
+ }
787
+
788
+ if ( packet == null )
789
+ {
790
+ await DisconnectInternal ( _packetReceiverTask , null , null ) . ConfigureAwait ( false ) ;
791
+
792
+ return ;
793
+ }
794
+
795
+ await TryProcessReceivedPacket ( packet , cancellationToken ) . ConfigureAwait ( false ) ;
796
+ }
797
+ }
798
+ catch ( Exception exception )
799
+ {
800
+ if ( _cleanDisconnectInitiated )
801
+ {
802
+ return ;
803
+ }
804
+
805
+ if ( exception is AggregateException aggregateException )
806
+ {
807
+ exception = aggregateException . GetBaseException ( ) ;
808
+ }
809
+
810
+ if ( exception is OperationCanceledException )
811
+ {
812
+ }
813
+ else if ( exception is MqttCommunicationException )
814
+ {
815
+ _logger . Warning ( exception , "Communication error while receiving packets." ) ;
816
+ }
817
+ else
818
+ {
819
+ _logger . Error ( exception , "Error while receiving packets." ) ;
820
+ }
821
+
822
+ _packetDispatcher . FailAll ( exception ) ;
823
+
824
+ await DisconnectInternal ( _packetReceiverTask , exception , null ) . ConfigureAwait ( false ) ;
825
+ }
826
+ finally
827
+ {
828
+ _logger . Verbose ( "Stopped receiving packets." ) ;
829
+ }
830
+ }
831
+
757
832
async Task < TResponsePacket > Request < TResponsePacket > ( MqttPacket requestPacket , CancellationToken cancellationToken ) where TResponsePacket : MqttPacket
758
833
{
759
834
cancellationToken . ThrowIfCancellationRequested ( ) ;
@@ -920,65 +995,6 @@ async Task TryProcessReceivedPacket(MqttPacket packet, CancellationToken cancell
920
995
}
921
996
}
922
997
923
- async Task ReceivePacketsLoop ( CancellationToken cancellationToken )
924
- {
925
- try
926
- {
927
- _logger . Verbose ( "Start receiving packets." ) ;
928
-
929
- while ( ! cancellationToken . IsCancellationRequested )
930
- {
931
- var packet = await Receive ( cancellationToken ) . ConfigureAwait ( false ) ;
932
-
933
- if ( cancellationToken . IsCancellationRequested )
934
- {
935
- return ;
936
- }
937
-
938
- if ( packet == null )
939
- {
940
- await DisconnectInternal ( _packetReceiverTask , null , null ) . ConfigureAwait ( false ) ;
941
-
942
- return ;
943
- }
944
-
945
- await TryProcessReceivedPacket ( packet , cancellationToken ) . ConfigureAwait ( false ) ;
946
- }
947
- }
948
- catch ( Exception exception )
949
- {
950
- if ( _cleanDisconnectInitiated )
951
- {
952
- return ;
953
- }
954
-
955
- if ( exception is AggregateException aggregateException )
956
- {
957
- exception = aggregateException . GetBaseException ( ) ;
958
- }
959
-
960
- if ( exception is OperationCanceledException )
961
- {
962
- }
963
- else if ( exception is MqttCommunicationException )
964
- {
965
- _logger . Warning ( exception , "Communication error while receiving packets." ) ;
966
- }
967
- else
968
- {
969
- _logger . Error ( exception , "Error while receiving packets." ) ;
970
- }
971
-
972
- _packetDispatcher . FailAll ( exception ) ;
973
-
974
- await DisconnectInternal ( _packetReceiverTask , exception , null ) . ConfigureAwait ( false ) ;
975
- }
976
- finally
977
- {
978
- _logger . Verbose ( "Stopped receiving packets." ) ;
979
- }
980
- }
981
-
982
998
async Task TrySendKeepAliveMessages ( CancellationToken cancellationToken )
983
999
{
984
1000
try
0 commit comments