@@ -138,6 +138,36 @@ function onMessageArrived(message) {
138
138
DISCONNECT : 14
139
139
} ;
140
140
141
+ var PROPERTY_TYPE = {
142
+ PayloadFormatIndicator : 1 ,
143
+ MessageExpiryInterval : 2 ,
144
+ ContentType : 3 ,
145
+ ResponseTopic : 8 ,
146
+ CorrelationData : 9 ,
147
+ SubscriptionIdentifier : 11 ,
148
+ SessionExpiryInterval : 17 ,
149
+ AssignedClientIdentifier : 18 ,
150
+ ServerKeepAlive : 19 ,
151
+ AuthenticationMethod : 21 ,
152
+ AuthenticationData : 22 ,
153
+ RequestProblemInformation : 23 ,
154
+ WillDelayInterval : 24 ,
155
+ RequestResponseInformation : 25 ,
156
+ ResponseInformation : 26 ,
157
+ ServerReference : 28 ,
158
+ ReasonString : 31 ,
159
+ ReceiveMaximum : 33 ,
160
+ TopicAliasMaximum : 34 ,
161
+ TopicAlias : 35 ,
162
+ MaximumQoS : 36 ,
163
+ RetainAvailable : 37 ,
164
+ UserProperty : 38 ,
165
+ MaximumPacketSize : 39 ,
166
+ WildcardSubscriptionAvailable : 40 ,
167
+ SubscriptionIdentifierAvailable : 41 ,
168
+ SharedSubscriptionAvailable : 42
169
+ } ;
170
+
141
171
// Collection of utility methods used to simplify module code
142
172
// and promote the DRY pattern.
143
173
@@ -247,6 +277,8 @@ function onMessageArrived(message) {
247
277
var MqttProtoIdentifierv3 = [ 0x00 , 0x06 , 0x4d , 0x51 , 0x49 , 0x73 , 0x64 , 0x70 , 0x03 ] ;
248
278
//MQTT proto/version for 311 4 M Q T T 4
249
279
var MqttProtoIdentifierv4 = [ 0x00 , 0x04 , 0x4d , 0x51 , 0x54 , 0x54 , 0x04 ] ;
280
+ //MQTT proto/version for 5
281
+ var MqttProtoIdentifierv5 = [ 0x00 , 0x04 , 0x4d , 0x51 , 0x54 , 0x54 , 0x05 ] ;
250
282
251
283
/**
252
284
* Construct an MQTT wire protocol message.
@@ -309,6 +341,9 @@ function onMessageArrived(message) {
309
341
case 4 :
310
342
remLength += MqttProtoIdentifierv4 . length + 3 ;
311
343
break ;
344
+ case 5 :
345
+ remLength += MqttProtoIdentifierv5 . length + 4 ; // property byte
346
+ break ;
312
347
}
313
348
314
349
remLength += UTF8Length ( this . clientId ) + 2 ;
@@ -335,6 +370,7 @@ function onMessageArrived(message) {
335
370
}
336
371
remLength += this . requestedQos . length ; // 1 byte for each topic's Qos
337
372
// QoS on Subscribe only
373
+ if ( this . mqttVersion == 5 ) remLength += 1 ; // property byte
338
374
break ;
339
375
340
376
case MESSAGE_TYPE . UNSUBSCRIBE :
@@ -343,6 +379,7 @@ function onMessageArrived(message) {
343
379
topicStrLength [ i ] = UTF8Length ( this . topics [ i ] ) ;
344
380
remLength += topicStrLength [ i ] + 2 ;
345
381
}
382
+ if ( this . mqttVersion == 5 ) remLength += 1 ; // property byte
346
383
break ;
347
384
348
385
case MESSAGE_TYPE . PUBREL :
@@ -355,6 +392,7 @@ function onMessageArrived(message) {
355
392
if ( this . payloadMessage . retained ) first |= 0x01 ;
356
393
destinationNameLength = UTF8Length ( this . payloadMessage . destinationName ) ;
357
394
remLength += destinationNameLength + 2 ;
395
+ if ( this . mqttVersion == 5 ) remLength += 1 ; // property length (0)
358
396
var payloadBytes = this . payloadMessage . payloadBytes ;
359
397
remLength += payloadBytes . byteLength ;
360
398
if ( payloadBytes instanceof ArrayBuffer )
@@ -382,8 +420,12 @@ function onMessageArrived(message) {
382
420
byteStream . set ( mbi , 1 ) ;
383
421
384
422
// If this is a PUBLISH then the variable header starts with a topic
385
- if ( this . type == MESSAGE_TYPE . PUBLISH )
423
+ if ( this . type == MESSAGE_TYPE . PUBLISH ) {
386
424
pos = writeString ( this . payloadMessage . destinationName , destinationNameLength , byteStream , pos ) ;
425
+ if ( this . mqttVersion == 5 ) {
426
+ byteStream [ pos ++ ] = 0 ; // no properties
427
+ }
428
+ }
387
429
// If this is a CONNECT then the variable header contains the protocol name/version, flags and keepalive time
388
430
389
431
else if ( this . type == MESSAGE_TYPE . CONNECT ) {
@@ -396,6 +438,10 @@ function onMessageArrived(message) {
396
438
byteStream . set ( MqttProtoIdentifierv4 , pos ) ;
397
439
pos += MqttProtoIdentifierv4 . length ;
398
440
break ;
441
+ case 5 :
442
+ byteStream . set ( MqttProtoIdentifierv5 , pos ) ;
443
+ pos += MqttProtoIdentifierv5 . length ;
444
+ break ;
399
445
}
400
446
var connectFlags = 0 ;
401
447
if ( this . cleanSession )
@@ -413,6 +459,9 @@ function onMessageArrived(message) {
413
459
connectFlags |= 0x40 ;
414
460
byteStream [ pos ++ ] = connectFlags ;
415
461
pos = writeUint16 ( this . keepAliveInterval , byteStream , pos ) ;
462
+ if ( this . mqttVersion == 5 ) {
463
+ byteStream [ pos ++ ] = 0 ; // no properties
464
+ }
416
465
}
417
466
418
467
// Output the messageIdentifier - if there is one
@@ -447,6 +496,9 @@ function onMessageArrived(message) {
447
496
// break;
448
497
449
498
case MESSAGE_TYPE . SUBSCRIBE :
499
+ if ( this . mqttVersion == 5 ) {
500
+ byteStream [ pos ++ ] = 0 ; // no properties
501
+ }
450
502
// SUBSCRIBE has a list of topic strings and request QoS
451
503
for ( var i = 0 ; i < this . topics . length ; i ++ ) {
452
504
pos = writeString ( this . topics [ i ] , topicStrLength [ i ] , byteStream , pos ) ;
@@ -455,6 +507,9 @@ function onMessageArrived(message) {
455
507
break ;
456
508
457
509
case MESSAGE_TYPE . UNSUBSCRIBE :
510
+ if ( this . mqttVersion == 5 ) {
511
+ byteStream [ pos ++ ] = 0 ; // no properties
512
+ }
458
513
// UNSUBSCRIBE has a list of topic strings
459
514
for ( var i = 0 ; i < this . topics . length ; i ++ )
460
515
pos = writeString ( this . topics [ i ] , topicStrLength [ i ] , byteStream , pos ) ;
@@ -467,7 +522,71 @@ function onMessageArrived(message) {
467
522
return buffer ;
468
523
} ;
469
524
470
- function decodeMessage ( input , pos ) {
525
+ function parseProperties ( input , pos , len ) {
526
+ var endingPos = pos + len ;
527
+ var propContainer = { } ;
528
+ propContainer . userProperties = { } ;
529
+ while ( pos < endingPos ) {
530
+ var propId = input [ pos ++ ] ;
531
+ switch ( propId ) {
532
+ case PROPERTY_TYPE . PayloadFormatIndicator :
533
+ case PROPERTY_TYPE . RequestProblemInformation :
534
+ case PROPERTY_TYPE . RequestResponseInformation :
535
+ case PROPERTY_TYPE . MaximumQoS :
536
+ case PROPERTY_TYPE . RetainAvailable :
537
+ case PROPERTY_TYPE . WildcardSubscriptionAvailable :
538
+ case PROPERTY_TYPE . SubscriptionIdentifierAvailable :
539
+ case PROPERTY_TYPE . SharedSubscriptionAvailable :
540
+ // Byte
541
+ pos ++ ;
542
+ break ;
543
+ case PROPERTY_TYPE . ServerKeepAlive :
544
+ case PROPERTY_TYPE . ReceiveMaximum :
545
+ case PROPERTY_TYPE . TopicAliasMaximum :
546
+ case PROPERTY_TYPE . TopicAlias :
547
+ // Two Byte Integer
548
+ pos += 2 ;
549
+ break ;
550
+ case PROPERTY_TYPE . MessageExpiryInterval :
551
+ case PROPERTY_TYPE . SessionExpiryInterval :
552
+ case PROPERTY_TYPE . WillDelayInterval :
553
+ case PROPERTY_TYPE . MaximumPacketSize :
554
+ // Four Byte Integer
555
+ pos += 4 ;
556
+ break ;
557
+ case PROPERTY_TYPE . ContentType :
558
+ case PROPERTY_TYPE . ResponseTopic :
559
+ case PROPERTY_TYPE . AssignedClientIdentifier :
560
+ case PROPERTY_TYPE . AuthenticationMethod :
561
+ case PROPERTY_TYPE . ResponseInformation :
562
+ case PROPERTY_TYPE . ServerReference :
563
+ case PROPERTY_TYPE . ReasonString :
564
+ // UTF-8 Encoded String
565
+ var strLen = readUint16 ( input , pos ) ;
566
+ pos += 2 ;
567
+ pos += strLen ;
568
+ break ;
569
+ case PROPERTY_TYPE . UserProperty :
570
+ // UTF-8 String Pair
571
+ var keyLen = readUint16 ( input , pos ) ;
572
+ pos += 2 ;
573
+ var key = parseUTF8 ( input , pos , keyLen ) ;
574
+ pos += keyLen ;
575
+ var valLen = readUint16 ( input , pos ) ;
576
+ pos += 2 ;
577
+ var val = parseUTF8 ( input , pos , valLen ) ;
578
+ pos += valLen ;
579
+ propContainer . userProperties [ key ] = val ;
580
+ break ;
581
+ default :
582
+ pos = endingPos ;
583
+ break ;
584
+ }
585
+ }
586
+ return propContainer ;
587
+ } ;
588
+
589
+ function decodeMessage ( input , pos , mqttVersion ) {
471
590
var startingPos = pos ;
472
591
var first = input [ pos ] ;
473
592
var type = first >> 4 ;
@@ -515,6 +634,12 @@ function onMessageArrived(message) {
515
634
wireMessage . messageIdentifier = readUint16 ( input , pos ) ;
516
635
pos += 2 ;
517
636
}
637
+ var properties = { } ;
638
+ if ( mqttVersion == 5 ) {
639
+ var proplen = input [ pos ++ ] ;
640
+ if ( proplen > 0 ) properties = parseProperties ( input , pos , proplen ) ;
641
+ pos += proplen ;
642
+ }
518
643
519
644
var message = new Message ( input . subarray ( pos , endPos ) ) ;
520
645
if ( ( messageInfo & 0x01 ) == 0x01 )
@@ -523,6 +648,7 @@ function onMessageArrived(message) {
523
648
message . duplicate = true ;
524
649
message . qos = qos ;
525
650
message . destinationName = topicName ;
651
+ message . properties = properties ;
526
652
wireMessage . payloadMessage = message ;
527
653
break ;
528
654
@@ -896,6 +1022,7 @@ function onMessageArrived(message) {
896
1022
throw new Error ( format ( ERROR . INVALID_STATE , [ "not connected" ] ) ) ;
897
1023
898
1024
var wireMessage = new WireMessage ( MESSAGE_TYPE . SUBSCRIBE ) ;
1025
+ wireMessage . mqttVersion = this . connectOptions . mqttVersion ;
899
1026
wireMessage . topics = filter . constructor === Array ? filter : [ filter ] ;
900
1027
if ( subscribeOptions . qos === undefined )
901
1028
subscribeOptions . qos = 0 ;
@@ -953,6 +1080,7 @@ function onMessageArrived(message) {
953
1080
954
1081
var wireMessage = new WireMessage ( MESSAGE_TYPE . PUBLISH ) ;
955
1082
wireMessage . payloadMessage = message ;
1083
+ wireMessage . mqttVersion = this . connectOptions . mqttVersion ;
956
1084
957
1085
if ( this . connected ) {
958
1086
// Mark qos 1 & 2 message as "ACK required"
@@ -1239,7 +1367,7 @@ function onMessageArrived(message) {
1239
1367
try {
1240
1368
var offset = 0 ;
1241
1369
while ( offset < byteArray . length ) {
1242
- var result = decodeMessage ( byteArray , offset ) ;
1370
+ var result = decodeMessage ( byteArray , offset , this . connectOptions . mqttVersion ) ;
1243
1371
var wireMessage = result [ 0 ] ;
1244
1372
offset = result [ 1 ] ;
1245
1373
if ( wireMessage !== null ) {
@@ -1606,8 +1734,17 @@ function onMessageArrived(message) {
1606
1734
return ;
1607
1735
}
1608
1736
} else {
1609
- // Otherwise we never had a connection, so indicate that the connect has failed.
1610
- if ( this . connectOptions . mqttVersion === 4 && this . connectOptions . mqttVersionExplicit === false ) {
1737
+ // Otherwise we never had a connection, so indicate that the connect has failed.
1738
+ if ( this . connectOptions . mqttVersion === 5 && this . connectOptions . mqttVersionExplicit === false ) {
1739
+ this . _trace ( "Failed to connect V5, dropping back to V4" ) ;
1740
+ this . connectOptions . mqttVersion = 4 ;
1741
+ if ( this . connectOptions . uris ) {
1742
+ this . hostIndex = 0 ;
1743
+ this . _doConnect ( this . connectOptions . uris [ 0 ] ) ;
1744
+ } else {
1745
+ this . _doConnect ( this . uri ) ;
1746
+ }
1747
+ } else if ( this . connectOptions . mqttVersion === 4 && this . connectOptions . mqttVersionExplicit === false ) {
1611
1748
this . _trace ( "Failed to connect V4, dropping back to V3" ) ;
1612
1749
this . connectOptions . mqttVersion = 3 ;
1613
1750
if ( this . connectOptions . uris ) {
@@ -1951,13 +2088,13 @@ function onMessageArrived(message) {
1951
2088
if ( connectOptions . keepAliveInterval === undefined )
1952
2089
connectOptions . keepAliveInterval = 60 ;
1953
2090
1954
- if ( connectOptions . mqttVersion > 4 || connectOptions . mqttVersion < 3 ) {
2091
+ if ( connectOptions . mqttVersion > 5 || connectOptions . mqttVersion < 3 ) {
1955
2092
throw new Error ( format ( ERROR . INVALID_ARGUMENT , [ connectOptions . mqttVersion , "connectOptions.mqttVersion" ] ) ) ;
1956
2093
}
1957
2094
1958
2095
if ( connectOptions . mqttVersion === undefined ) {
1959
2096
connectOptions . mqttVersionExplicit = false ;
1960
- connectOptions . mqttVersion = 4 ;
2097
+ connectOptions . mqttVersion = 5 ;
1961
2098
} else {
1962
2099
connectOptions . mqttVersionExplicit = true ;
1963
2100
}
@@ -2380,6 +2517,11 @@ function onMessageArrived(message) {
2380
2517
enumerable : true ,
2381
2518
get : function ( ) { return duplicate ; } ,
2382
2519
set : function ( newDuplicate ) { duplicate = newDuplicate ; }
2520
+ } ,
2521
+ "properties" :{
2522
+ enumerable : true ,
2523
+ get : function ( ) { return properties ; } ,
2524
+ set : function ( newProperties ) { properties = newProperties ; }
2383
2525
}
2384
2526
} ) ;
2385
2527
} ;
0 commit comments