Skip to content

Upgrade MQTT demos to v5 #1942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion demos/defender/defender_demo_json/demo_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
* @note This path is relative from the demo binary created. Update
* ROOT_CA_CERT_PATH to the absolute path if this demo is executed from elsewhere.
*/

#ifndef ROOT_CA_CERT_PATH
#define ROOT_CA_CERT_PATH "certificates/AmazonRootCA1.crt"
#endif
Expand All @@ -97,7 +98,7 @@
* https://docs.aws.amazon.com/iot/latest/developerguide/client-authentication.html
*
* @note This certificate should be PEM-encoded.
*
*m
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove any and all changes made in the demo_config.h files unless it is related to the MQTTv5 upgrade.

* #define CLIENT_CERT_PATH "...insert here..."
*/

Expand Down
24 changes: 15 additions & 9 deletions demos/defender/defender_demo_json/mqtt_operations.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId );
*/
static void mqttCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo );
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer );

/**
* @brief Resend the publishes if a session is re-established with the broker.
Expand Down Expand Up @@ -555,7 +558,10 @@ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId )

static void mqttCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo )
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer )
{
uint16_t packetIdentifier;

Expand Down Expand Up @@ -659,7 +665,7 @@ static bool handlePublishResend( MQTTContext_t * pMqttContext )
outgoingPublishPackets[ index ].packetId ) );
mqttStatus = MQTT_Publish( pMqttContext,
&outgoingPublishPackets[ index ].pubInfo,
outgoingPublishPackets[ index ].packetId );
outgoingPublishPackets[ index ].packetId , NULL);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -799,7 +805,7 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback )
pOutgoingPublishRecords,
OUTGOING_PUBLISH_RECORD_LEN,
pIncomingPublishRecords,
INCOMING_PUBLISH_RECORD_LEN );
INCOMING_PUBLISH_RECORD_LEN, NULL, 0 );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -842,7 +848,7 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback )
&connectInfo,
NULL,
CONNACK_RECV_TIMEOUT_MS,
&sessionPresent );
&sessionPresent, NULL, NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -908,7 +914,7 @@ bool DisconnectMqttSession( void )
if( mqttSessionEstablished == true )
{
/* Send DISCONNECT. */
mqttStatus = MQTT_Disconnect( pMqttContext );
mqttStatus = MQTT_Disconnect( pMqttContext , NULL, MQTT_REASON_DISCONNECT_NORMAL_DISCONNECTION);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -956,7 +962,7 @@ bool SubscribeToTopic( const char * pTopicFilter,
mqttStatus = MQTT_Subscribe( pMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalSubscribePacketIdentifier );
globalSubscribePacketIdentifier, NULL);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -1012,7 +1018,7 @@ bool UnsubscribeFromTopic( const char * pTopicFilter,
mqttStatus = MQTT_Unsubscribe( pMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalUnsubscribePacketIdentifier );
globalUnsubscribePacketIdentifier, NULL);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -1080,7 +1086,7 @@ bool PublishToTopic( const char * pTopicFilter,
/* Send PUBLISH packet. */
mqttStatus = MQTT_Publish( pMqttContext,
&outgoingPublishPackets[ publishIndex ].pubInfo,
outgoingPublishPackets[ publishIndex ].packetId );
outgoingPublishPackets[ publishIndex ].packetId , NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
* @note This path is relative from the demo binary created. Update
* ROOT_CA_CERT_PATH to the absolute path if this demo is executed from elsewhere.
*/

#ifndef ROOT_CA_CERT_PATH
#define ROOT_CA_CERT_PATH "certificates/AmazonRootCA1.crt"
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId );
*/
static void mqttCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo );
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer );

/**
* @brief Resend the publishes if a session is re-established with the broker.
Expand Down Expand Up @@ -548,7 +551,10 @@ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId )

static void mqttCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo )
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer )
{
uint16_t packetIdentifier;

Expand Down Expand Up @@ -652,7 +658,7 @@ static bool handlePublishResend( MQTTContext_t * pMqttContext )
outgoingPublishPackets[ index ].packetId ) );
mqttStatus = MQTT_Publish( pMqttContext,
&outgoingPublishPackets[ index ].pubInfo,
outgoingPublishPackets[ index ].packetId );
outgoingPublishPackets[ index ].packetId, NULL);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -798,7 +804,7 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback,
pOutgoingPublishRecords,
OUTGOING_PUBLISH_RECORD_LEN,
pIncomingPublishRecords,
INCOMING_PUBLISH_RECORD_LEN );
INCOMING_PUBLISH_RECORD_LEN, NULL, 0);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -841,7 +847,7 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback,
&connectInfo,
NULL,
CONNACK_RECV_TIMEOUT_MS,
&sessionPresent );
&sessionPresent, NULL, NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -908,7 +914,7 @@ bool DisconnectMqttSession( void )
if( mqttSessionEstablished == true )
{
/* Send DISCONNECT. */
mqttStatus = MQTT_Disconnect( pMqttContext );
mqttStatus = MQTT_Disconnect( pMqttContext, NULL, 0 );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -956,7 +962,7 @@ bool SubscribeToTopic( const char * pTopicFilter,
mqttStatus = MQTT_Subscribe( pMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalSubscribePacketIdentifier );
globalSubscribePacketIdentifier, NULL);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -1012,7 +1018,7 @@ bool UnsubscribeFromTopic( const char * pTopicFilter,
mqttStatus = MQTT_Unsubscribe( pMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalUnsubscribePacketIdentifier );
globalUnsubscribePacketIdentifier, NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -1080,7 +1086,7 @@ bool PublishToTopic( const char * pTopicFilter,
/* Send PUBLISH packet. */
mqttStatus = MQTT_Publish( pMqttContext,
&outgoingPublishPackets[ publishIndex ].pubInfo,
outgoingPublishPackets[ publishIndex ].packetId );
outgoingPublishPackets[ publishIndex ].packetId, NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId );
*/
static void mqttCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo );
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer );

/**
* @brief Resend the publishes if a session is re-established with the broker.
Expand Down Expand Up @@ -548,7 +551,10 @@ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId )

static void mqttCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo )
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer )
{
uint16_t packetIdentifier;

Expand Down Expand Up @@ -652,7 +658,8 @@ static bool handlePublishResend( MQTTContext_t * pMqttContext )
outgoingPublishPackets[ index ].packetId ) );
mqttStatus = MQTT_Publish( pMqttContext,
&outgoingPublishPackets[ index ].pubInfo,
outgoingPublishPackets[ index ].packetId );
outgoingPublishPackets[ index ].packetId,
NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -798,7 +805,7 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback,
pOutgoingPublishRecords,
OUTGOING_PUBLISH_RECORD_LEN,
pIncomingPublishRecords,
INCOMING_PUBLISH_RECORD_LEN );
INCOMING_PUBLISH_RECORD_LEN, NULL, 0);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -841,7 +848,9 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback,
&connectInfo,
NULL,
CONNACK_RECV_TIMEOUT_MS,
&sessionPresent );
&sessionPresent,
NULL,
NULL);

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -908,7 +917,7 @@ bool DisconnectMqttSession( void )
if( mqttSessionEstablished == true )
{
/* Send DISCONNECT. */
mqttStatus = MQTT_Disconnect( pMqttContext );
mqttStatus = MQTT_Disconnect( pMqttContext, NULL, 0 );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -956,7 +965,8 @@ bool SubscribeToTopic( const char * pTopicFilter,
mqttStatus = MQTT_Subscribe( pMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalSubscribePacketIdentifier );
globalSubscribePacketIdentifier,
NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -1012,7 +1022,8 @@ bool UnsubscribeFromTopic( const char * pTopicFilter,
mqttStatus = MQTT_Unsubscribe( pMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalUnsubscribePacketIdentifier );
globalUnsubscribePacketIdentifier,
NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -1080,7 +1091,8 @@ bool PublishToTopic( const char * pTopicFilter,
/* Send PUBLISH packet. */
mqttStatus = MQTT_Publish( pMqttContext,
&outgoingPublishPackets[ publishIndex ].pubInfo,
outgoingPublishPackets[ publishIndex ].packetId );
outgoingPublishPackets[ publishIndex ].packetId,
NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down
2 changes: 1 addition & 1 deletion demos/greengrass/greengrass_demo_local_auth/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ openssl x509 -req \

Deploy the following components to your Greengrass core:
- aws.greengrass.clientdevices.Auth
- aws.greengrass.clientdevices.mqtt.Moquette
- aws.greengrass.clientdevices.mqtt.EMQX
- aws.greengrass.clientdevices.mqtt.Bridge
- aws.greengrass.clientdevices.IPDetector

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo,
*/
static void eventCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo );
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer );

/**
* @brief Initializes the MQTT library.
Expand Down Expand Up @@ -433,7 +436,10 @@ static void updateSubAckStatus( MQTTPacketInfo_t * pPacketInfo )

static void eventCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo )
MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTSuccessFailReasonCode_t * pReasonCode,
MQTTPropBuilder_t * sendPropsBuffer,
MQTTPropBuilder_t * getPropsBuffer )
{
uint16_t packetId;

Expand Down Expand Up @@ -514,7 +520,7 @@ static int establishMqttSession( MQTTContext_t * pMqttContext )
connectInfo.cleanSession = true;

mqttStatus = MQTT_Connect( pMqttContext, &connectInfo, NULL,
CONNACK_RECV_TIMEOUT_MS, &sessionPresent );
CONNACK_RECV_TIMEOUT_MS, &sessionPresent, NULL, NULL );

if( mqttStatus != MQTTSuccess )
{
Expand All @@ -540,7 +546,7 @@ static int disconnectMqttSession( MQTTContext_t * pMqttContext )
assert( pMqttContext != NULL );

/* Send DISCONNECT. */
mqttStatus = MQTT_Disconnect( pMqttContext );
mqttStatus = MQTT_Disconnect( pMqttContext, NULL, 0 );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -571,7 +577,7 @@ static int subscribeToTopic( MQTTContext_t * pMqttContext )
mqttStatus = MQTT_Subscribe( pMqttContext,
&subInfo,
1U,
MQTT_GetPacketId( pMqttContext ) );
MQTT_GetPacketId( pMqttContext ), NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -607,7 +613,7 @@ static int unsubscribeFromTopic( MQTTContext_t * pMqttContext )
mqttStatus = MQTT_Unsubscribe( pMqttContext,
&unsubInfo,
1U,
MQTT_GetPacketId( pMqttContext ) );
MQTT_GetPacketId( pMqttContext ), NULL );

if( mqttStatus != MQTTSuccess )
{
Expand Down Expand Up @@ -648,7 +654,7 @@ static int publishToTopic( MQTTContext_t * pMqttContext )

mqttStatus = MQTT_Publish( pMqttContext,
&publish,
packetId );
packetId , NULL);

if( mqttStatus != MQTTSuccess )
{
Expand Down
1 change: 0 additions & 1 deletion demos/mqtt/mqtt_demo_basic_tls/demo_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
*
* In general, port 8883 is for secured MQTT connections.
*/
#define BROKER_PORT ( 8883 )

/**
* @brief Path of the file containing the server's root CA certificate.
Expand Down
Loading
Loading