Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 53 additions & 40 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,10 @@ static MQTTStatus_t sendPublishAcksWithProperty( MQTTContext_t * pContext,

if( packetTypeByte != 0U )
{
status = MQTT_ValidatePublishAckProperties( &pContext->ackPropsBuffer );
if( pContext->ackPropsBuffer.currentIndex > 0U )
{
status = MQTT_ValidatePublishAckProperties( &pContext->ackPropsBuffer );
}

if( status == MQTTSuccess )
{
Expand Down Expand Up @@ -1823,7 +1826,7 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
MQTTDeserializedInfo_t deserializedInfo;
bool duplicatePublish = false;
MQTTPropBuilder_t propBuffer = { 0 };
MQTTSuccessFailReasonCode_t reasonCode;
MQTTSuccessFailReasonCode_t reasonCode = MQTT_INVALID_REASON_CODE;
bool ackPropsAdded = false;

assert( pContext != NULL );
Expand Down Expand Up @@ -1935,24 +1938,26 @@ static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
if( duplicatePublish == false )
{
MQTTPropBuilder_t * pTempPropBuffer = NULL;
MQTTSuccessFailReasonCode_t * pTempReasonCode = NULL;

if( publishInfo.qos > MQTTQoS0 )
{
pTempPropBuffer = &pContext->ackPropsBuffer;
pTempPropBuffer = &( pContext->ackPropsBuffer );
pTempReasonCode = &reasonCode;
}

if( pContext->appCallback( pContext, pIncomingPacket, &deserializedInfo,
&reasonCode, pTempPropBuffer, &propBuffer ) == false )
pTempReasonCode, pTempPropBuffer, &propBuffer ) == false )
{
/* TODO: Figure out whether this should block the library
* from processing any more packets. */
status = MQTTEventCallbackFailed;
}
else if( publishInfo.qos > MQTTQoS0 )
{
if( ( pTempPropBuffer->pBuffer != NULL ) &&
( CHECK_SIZE_T_OVERFLOWS_32BIT( pTempPropBuffer->currentIndex ) ||
( pTempPropBuffer->currentIndex >= MQTT_REMAINING_LENGTH_INVALID ) ) )
if( ( pContext->ackPropsBuffer.pBuffer != NULL ) &&
( CHECK_SIZE_T_OVERFLOWS_32BIT( pContext->ackPropsBuffer.currentIndex ) ||
( pContext->ackPropsBuffer.currentIndex >= MQTT_REMAINING_LENGTH_INVALID ) ) )
{
status = MQTTSendFailed;
LogError( ( "Length of properties to be sent must be less than 268435456." ) );
Expand Down Expand Up @@ -2005,7 +2010,9 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
MQTTEventCallback_t appCallback;
MQTTDeserializedInfo_t deserializedInfo;
MQTTPropBuilder_t propBuffer = { 0 };
MQTTSuccessFailReasonCode_t reasonCode;
MQTTPropBuilder_t * pSendProps;
MQTTSuccessFailReasonCode_t * pSendReasonCode;
MQTTSuccessFailReasonCode_t reasonCode = MQTT_INVALID_REASON_CODE;
bool ackPropsAdded;

MQTTReasonCodeInfo_t incomingReasonCode = { 0 };
Expand Down Expand Up @@ -2071,14 +2078,25 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
deserializedInfo.pPublishInfo = NULL;
deserializedInfo.pReasonCode = &incomingReasonCode;

if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubcomp ) )
{
/* This is the last packet in PUB-ACK sequence - we will set these
* to NULL. */
pSendProps = NULL;
pSendReasonCode = NULL;
}
else
{
/* We need to send a response back to the server. Provide application with
* space to add data. */
pSendProps = &pContext->ackPropsBuffer;
pSendReasonCode = &reasonCode;
}

/* Invoke application callback to hand the buffer over to application
* before sending acks. */

reasonCode = MQTT_INVALID_REASON_CODE;

if( appCallback( pContext, pIncomingPacket, &deserializedInfo, &reasonCode,
&pContext->ackPropsBuffer, &propBuffer ) == false )
if( appCallback( pContext, pIncomingPacket, &deserializedInfo, pSendReasonCode,
pSendProps, &propBuffer ) == false )
{
/* TODO: verify whether this should block the recv thread? */
status = MQTTEventCallbackFailed;
Expand Down Expand Up @@ -2130,13 +2148,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
uint16_t packetIdentifier = MQTT_PACKET_ID_INVALID;
MQTTDeserializedInfo_t deserializedInfo;

/* We should always invoke the app callback unless we receive a PINGRESP
* and are managing keep alive, or if we receive an unknown packet. We
* initialize this to false since the callback must be invoked before
* sending any PUBREL or PUBCOMP. However, for other cases, we invoke it
* at the end to reduce the complexity of this function. */
bool invokeAppCallback = false;
MQTTEventCallback_t appCallback = NULL;
MQTTEventCallback_t appCallback;

assert( pContext != NULL );
assert( pIncomingPacket != NULL );
Expand Down Expand Up @@ -2166,11 +2178,26 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
NULL,
NULL,
&pContext->connectionProperties );
invokeAppCallback = ( status == MQTTSuccess ) && !manageKeepAlive;

if( ( status == MQTTSuccess ) && ( manageKeepAlive == true ) )
if( status == MQTTSuccess )
{
pContext->waitingForPingResp = false;
if( manageKeepAlive == true )
{
pContext->waitingForPingResp = false;
}
else
{
/* Set fields of deserialized struct. */
deserializedInfo.packetIdentifier = packetIdentifier;
deserializedInfo.deserializationResult = status;
deserializedInfo.pPublishInfo = NULL;

if( appCallback( pContext, pIncomingPacket, &deserializedInfo, NULL,
NULL, NULL ) == false )
{
status = MQTTEventCallbackFailed;
}
}
}

break;
Expand All @@ -2189,20 +2216,6 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
break;
}

if( invokeAppCallback == true )
{
/* Set fields of deserialized struct. */
deserializedInfo.packetIdentifier = packetIdentifier;
deserializedInfo.deserializationResult = status;
deserializedInfo.pPublishInfo = NULL;

if( appCallback( pContext, pIncomingPacket, &deserializedInfo, NULL,
&pContext->ackPropsBuffer, NULL ) == false )
{
status = MQTTEventCallbackFailed;
}
}

return status;
}

Expand Down Expand Up @@ -2230,7 +2243,7 @@ static MQTTStatus_t handleIncomingDisconnect( MQTTContext_t * pContext,
deserializedInfo.pReasonCode = &reasonCode;

if( pContext->appCallback( pContext, pIncomingPacket, &deserializedInfo,
NULL, &pContext->ackPropsBuffer, &propBuffer ) == false )
NULL, NULL, &propBuffer ) == false )
{
status = MQTTEventCallbackFailed;
}
Expand Down Expand Up @@ -3525,7 +3538,7 @@ static MQTTStatus_t receiveConnack( MQTTContext_t * pContext,
deserializedInfo.deserializationResult = status;

if( pContext->appCallback( pContext, pIncomingPacket, &deserializedInfo,
NULL, &pContext->ackPropsBuffer, &propBuffer ) == false )
NULL, NULL, &propBuffer ) == false )
{
status = MQTTEventCallbackFailed;
}
Expand Down Expand Up @@ -3946,7 +3959,7 @@ static MQTTStatus_t handleSubUnsubAck( MQTTContext_t * pContext,

/* Invoke application callback to hand the buffer over to application */
if( appCallback( pContext, pIncomingPacket, &deserializedInfo, NULL,
&pContext->ackPropsBuffer, &propBuffer ) == false )
NULL, &propBuffer ) == false )
{
status = MQTTEventCallbackFailed;
}
Expand Down
Loading
Loading