Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Result of a code analysis #267

Open
DeJusten opened this issue Jan 22, 2025 · 3 comments
Open

Result of a code analysis #267

DeJusten opened this issue Jan 22, 2025 · 3 comments

Comments

@DeJusten
Copy link

In my research project, I analyzed the MQTT stack for buffer overflow errors. The analysis is based on a code review. In particular, it was checked whether all pointers point to a valid area. I found no errors regarding a buffer overflow, so attackers cannot use the protocol as a gateway into a system.
However, there are errors in the code, e.g.

  • Errors when sending data when it cannot be sent in one go
  • Sloppy handling of error return values ​​(when setting the return values ​​and when checking them)
  • Determination of the packet size consisting of FixedHeader+VriablenHeader+Payload is incorrect, which leads to a 'drop' of an actually 'suitable' frame
  • Filter condition is not SPEC compliant
  • Sloppy handling of data type size_t, which leads to problems in systems with sizeof(int)!=sizeof(size_t)
  • Sloppy handling of Publish&Co with QoS > 0 (e.g. MessageID is not checked)
  • Some code optimizations

Unfortunately, I don't have the time to correct these errors. However, if you are interested, I would be happy to send you a detailed description here.

@NeroReflex
Copy link

I am interested in pretty much all of them. The sloppy return errors code handling is something I have spotted too. I plan to enhance the code over time... One PR at a time.

@DeJusten
Copy link
Author

DeJusten commented Feb 3, 2025

The error description is attached.
I have noted possible solutions for the 'simple' errors. Since I don't have a test environment, I find it difficult to commit them myself.
Two errors require a few more code changes (global static variable, missing error check when calling MQTTPacket_decode())
If QoS>0, the acknowledged PacketID should be compared with the sent PacketID. These changes (especially if QoS=2) require more effort. Instead of fixing the error, the problem could also be named in the foreword.

If you have any questions, please ask

For the sake of simplicity, I have attached all routines with errors. I have added the errors themselves as a comment with my initials DJ. I hope it is OK that the error description is written in German

/*******************************************************************************
  MQTTUnsubscribeClient.c
 *******************************************************************************/

int MQTTDeserialize_unsuback(unsigned short* packetid, unsigned char* buf, int32_t buflen)
{
	unsigned char type = 0;
	unsigned char dup = 0;
	int rc = 0;

	FUNC_ENTRY;
	rc = MQTTDeserialize_ack(&type, &dup, packetid, buf, buflen);
//DJ rc sollte auf Gültigkeit geprüft werden! Momentan ist der Rückgabewert
//   von MQTTDeserialize_unsuback() auch dann 1, 
//   wenn MQTTDeserialize_ack() als gültig erkannt wird
//   if(MQTTDeserialize_ack(..) == 1) && (type == UNSUBACK)) {rc=1;}
	if (type == UNSUBACK) {
		rc = 1;
    } 
    else {
    }

	FUNC_EXIT_RC(rc);
	return rc;
}

/*******************************************************************************
  MQTTConnectClient.c
 *******************************************************************************/
int MQTTSerialize_connectLength(MQTTPacket_connectData* options)
{
	int32_t len = 0;

	FUNC_ENTRY;

	if (options->MQTTVersion == 3)
		len = 12; /* variable depending on MQTT or MQIsdp */
	else if (options->MQTTVersion == 4)
		len = 10; 
//DJ Wenn MQTTVersion ungleich 3 und 4 wird len nicht gesetzt!	
//   Am einfachsten else if()  wie bei MQTTSerialize_connect() entfernen
//   oder in MQTTSerialize_connect() else if() wie hier einfügen
//   und über ein weiteres else einen Fehler an den Aufrufer melden

	len += MQTTstrlen(options->clientID)+2;
	if (options->willFlag)
		len += MQTTstrlen(options->will.topicName)+2 + MQTTstrlen(options->will.message)+2;
	if (options->username.cstring || options->username.lenstring.data)
		len += MQTTstrlen(options->username)+2;
	if (options->password.cstring || options->password.lenstring.data)
		len += MQTTstrlen(options->password)+2;

	FUNC_EXIT_RC(len);
	return len;
}

int MQTTSerialize_connect(unsigned char* buf, int32_t buflen, MQTTPacket_connectData* options)
{
	unsigned char *ptr = buf;
	MQTTHeader header = {0};
	MQTTConnectFlags flags = {0};
	int32_t len = 0;
	int rc = -1;

	FUNC_ENTRY;
	if (MQTTPacket_len(len = MQTTSerialize_connectLength(options)) > buflen)
	{
		rc = MQTTPACKET_BUFFER_TOO_SHORT;
		goto exit;
	}

	header.byte = 0;
	header.bits.type = CONNECT;
	writeChar(&ptr, header.byte); /* write header */

	ptr += MQTTPacket_encode(ptr, len); /* write remaining length */

	if (options->MQTTVersion == 4)
	{
		writeCString(&ptr, "MQTT");
		writeChar(&ptr, (char) 4);
	}
	else /* DJ if options->MQTTVersion == 3) damit dies Kompatibel zu MQTTSerialize_connectLength() ist*/
	{ 
		writeCString(&ptr, "MQIsdp");
		writeChar(&ptr, (char) 3);
	}
//DJ siehe MQTTSerialize_connectLength()	
//  else
//  {
//      rc=...;
//      goto exit;
//  }

	flags.all = 0;
	flags.bits.cleansession = options->cleansession;
	flags.bits.will = (options->willFlag) ? 1 : 0;
	if (flags.bits.will)
	{
		flags.bits.willQoS = options->will.qos;
		flags.bits.willRetain = options->will.retained;
	}

	if (options->username.cstring || options->username.lenstring.data)
		flags.bits.username = 1;
	if (options->password.cstring || options->password.lenstring.data)
		flags.bits.password = 1;

	writeChar(&ptr, flags.all);    
	writeInt(&ptr, options->keepAliveInterval);
	writeMQTTString(&ptr, options->clientID);
	if (options->willFlag)
	{
		writeMQTTString(&ptr, options->will.topicName);
		writeMQTTString(&ptr, options->will.message);
	}
	if (flags.bits.username)
		writeMQTTString(&ptr, options->username);
	if (flags.bits.password)
		writeMQTTString(&ptr, options->password);

	rc = ptr - buf;

	exit: FUNC_EXIT_RC(rc);
	return rc;
}

int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int32_t buflen)
{
	MQTTHeader header = {0};
	unsigned char* curdata = buf;
	unsigned char* enddata = NULL;
	int32_t rc = 0;
	int mylen;
	MQTTConnackFlags flags = {0};

	FUNC_ENTRY;
	header.byte = readChar(&curdata);
	if (header.bits.type != CONNACK)
		goto exit;

	curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
	enddata = curdata + mylen;
	if (enddata - curdata < 2) {
//DJ: rc setzen. Derzeit beinhaltet rc den Rückgabewert von MQTTPacket_decodeBuf() und damit ggf. 1
//      rc = 0;
		goto exit;
    }

	flags.all = readChar(&curdata);
	*sessionPresent = flags.bits.sessionpresent;
	*connack_rc = readChar(&curdata);
	rc = 1;
exit:
	FUNC_EXIT_RC(rc);
	return rc;
}

/*******************************************************************************
  MQTTDeserializePublish.c
 *******************************************************************************/
int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName,
		unsigned char** payload, int* payloadlen, unsigned char* buf, int32_t buflen)
{
	MQTTHeader header = {0};
	unsigned char* curdata = buf;
	unsigned char* enddata = NULL;
	int32_t rc = 0;
	int mylen = 0;

	FUNC_ENTRY;
	header.byte = readChar(&curdata);
	if (header.bits.type != PUBLISH)
		goto exit;
	*dup = header.bits.dup;
	*qos = header.bits.qos;
	*retained = header.bits.retain;

	curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
	enddata = curdata + mylen;

//DJ (enddata - curdata < 0) macht hier keinen Sinn (insbesondere nich mit or), 
//   besser wäre die Überprüfung weiter unten aufgehoben
	if (!readMQTTLenString(topicName, &curdata, enddata) ||
		enddata - curdata < 0) /* do we have enough data to read the protocol version byte? */
		goto exit;
  
	if (*qos > 0)
//DJ Überprüfung wäre sinnvoll if((enddata - currdata) >= 2)    
		*packetid = readInt(&curdata);

//DJ Überprüfung wäre sinnvoll if((enddata- curdata) >=0)
	*payloadlen = enddata - curdata;
	*payload = curdata;
	rc = 1;
    
exit:
	FUNC_EXIT_RC(rc);
	return (int) rc;
}

int MQTTDeserialize_ack(unsigned char* packettype, unsigned char* dup, unsigned short* packetid, 
                       unsigned char* buf, int32_t buflen)
{
	MQTTHeader header = {0};
	unsigned char* curdata = buf;
	unsigned char* enddata = NULL;
	int32_t rc = 0;
	int mylen;

	FUNC_ENTRY;

	header.byte = readChar(&curdata);
	*dup = header.bits.dup;
	*packettype = header.bits.type;

	curdata += (rc = MQTTPacket_decodeBuf(curdata, &mylen)); /* read remaining length */
	enddata = curdata + mylen;

	if (enddata - curdata < 2) {
//DJ: rc setzen. Derzeit beinhaltet rc den Rückgabewert von MQTTPacket_decodeBuf() 
//   und damit ggf. 1 (kein Fehler)
//      rc = 0;
		goto exit;
    }
	*packetid = readInt(&curdata);

	rc = 1;
exit:
	FUNC_EXIT_RC(rc);
	return (int) rc;
}


/*******************************************************************************
  MQTTPacket.c
 *******************************************************************************/

/**
 * Decodes the message length according to the MQTT algorithm
 * @param getcharfn pointer to function to read the next character from the data source
 * @param value the decoded length returned
 * @return the number of bytes read from the socket
 */
int32_t MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value)
{
	unsigned char c;
	int multiplier = 1;
	int32_t len = 0;
    const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;

	FUNC_ENTRY;
	*value = 0;
	do
	{
		int rc = MQTTPACKET_READ_ERROR;

		if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
		{
			rc = MQTTPACKET_READ_ERROR;	/* bad data */
//DJ rc außerhalb der do while() Schleife ungültig, damit macht das Setzen von rc keinen Sinn
//   Gültigkeitsbereich von rc auf Funktionsebene und nicht do while() Ebene setzen
			goto exit;
		}
		rc = (*getcharfn)(&c, 1);
		if (rc != 1)
//DJ Fehlercode setzen			
			goto exit;
		*value += (c & 127) * multiplier;
		multiplier *= 128;
	} while ((c & 128) != 0);
exit:
	FUNC_EXIT_RC(len);
	return len;
//DJ alle von MQTTPacket_decode() MQTTPacket_decodeBUF() aufrufenden Funktionen führen keine
//   Fehlerkontrolle durch. Dies müsste entsprechend nachgezogen werden 
}

int32_t MQTTPacket_len(int32_t rem_len)
{
//DJ: HeaderLenght sollte am Ende addiert werden und nicht am Anfang. Damit kann die 
//    Sendepuffergröße als nichtausreichend bestimmt werden, obwohl ausreichend wäre!
	rem_len += 1; /* header byte */  

	/* now remaining_length field */
	if (rem_len < NUM_OF_DIGITS_1)
		rem_len += 1;
	else if (rem_len < NUM_OF_DIGITS_2)
		rem_len += 2;
	else if (rem_len < NUM_OF_DIGITS_3)
		rem_len += 3;
	else
		rem_len += 4;
		
//DJ: rem_len += 1;		

	return rem_len;
}

//DJ: Aufgrund dieser globalen Variablen sind keine zwei parallel MQTT Kanäle möglich
//    Daher diese Variable in Datenstruktur MQTTClient aufnhmen und diese
//    Datenstruktur an MQTTPacket_decodeBuf() durchreichen
static unsigned char* bufptr;

int bufchar(unsigned char* c, int count)
{
	int i;

	for (i = 0; i < count; ++i)
		*c = *bufptr++;
	return count;
}


int32_t MQTTPacket_decodeBuf(unsigned char* buf, int* value)
{
	bufptr = buf;
	return MQTTPacket_decode(bufchar, value);
}

char readChar(unsigned char** pptr)
{
	char c = **pptr;  //DJ Cast  char c = (char)**pptr;
	(*pptr)++;
	return c;
}


void writeMQTTString(unsigned char** pptr, MQTTString mqttstring)
{
//DJ: Hier wird lenstring Vorrang gegeben. In MQTTstrlen() wird cstring den Vorrang gegeben!
//    Prioritäten bei Nutzung der Datenstruktur MQTTString gleichsetzen
	if (mqttstring.lenstring.len > 0)
	{
		writeInt(pptr, mqttstring.lenstring.len);
		memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len);
		*pptr += mqttstring.lenstring.len;
	}
	else if (mqttstring.cstring)
		writeCString(pptr, mqttstring.cstring);
	else
		writeInt(pptr, 0);
}

int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata)
{
	int rc = 0;
	FUNC_ENTRY;
	/* the first two bytes are the length of the string */
	if (enddata - (*pptr) > 1) /* enough length to read the integer? */
	{        
		mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */
		if (&(*pptr)[mqttstring->lenstring.len] <= enddata)
		{
			mqttstring->lenstring.data = (char*)*pptr;
			*pptr += mqttstring->lenstring.len;
			rc = 1;
		}
	}
//DJ Im Fehlerfall wäre es gut, auch lenstring auf 0 zu setzen    
//  MQTTString->lenstring = 0;
	mqttstring->cstring = NULL;
	FUNC_EXIT_RC(rc);
	return rc;
}

/*******************************************************************************
  MQTTClient.c
 *******************************************************************************/
static int sendPacket(MQTTClient* c, int32_t length/*, Timer* timer*/)
{
    int rc = FAILURE,
        sent = 0;

    while (sent < length /*&& !TimerIsExpired(timer)*/)
    {
//DJ Wenn length > Ethernet-Frame, würde nun beim zweiten zu sendenden Frame dieser ebenfalls lenght lang sein!
        rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
//DJ                                                          length-sent
        if (rc < 0) {  // there was an error writing the data
            break;
        }            
        sent += rc;
    }
    if (sent == length)
    {
        TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
        rc = SUCCESS;
    }
    else
        rc = FAILURE;
    return rc;
}


static int decodePacket(MQTTClient* c, int* value /*, int timeout*/)
{
    unsigned char i;
    int multiplier = 1;
    int32_t len = 0;
    const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;

    *value = 0;
    do
    {
        int rc = MQTTPACKET_READ_ERROR;

        if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
        {
//DJ: rc ist außerhalb nicht mehr gültig!!!
//    siehe MQTTPacket_decode()	
            rc = MQTTPACKET_READ_ERROR; /* bad data */
            goto exit;
        }
        rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
        if (rc != 1)
//DJ: Hier wäre eine Fehlermeldung hilfreich	
//    Da nun auch remain_Len>268,435,455  möglich ist
//    siehe MQTTPacket_decode()
            goto exit;
        *value += (i & 127) * multiplier;
        multiplier *= 128;
    } while ((i & 128) != 0);
exit:
    return len;
}

static int readPacket(MQTTClient* c/*, Timer* timer*/)
{
    MQTTHeader header = {0};
    int32_t len = 0;
    int rem_len = 0;

    /* 1. read the header byte.  This has the packet type in it */
    int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
    if (rc != 1)
        goto exit;

    len = 1;
    /* 2. read the remaining length.  This is variable in itself */
    decodePacket(c, &rem_len/*, TimerLeftMS(timer)*/);
//DJ: Abfrage Rückgabewert??? (der Momentan noch nicht existiert)
    len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */

    if (rem_len > (c->readbuf_size - len))
    {
        rc = BUFFER_OVERFLOW;
        goto exit;
    }

    /* 3. read the rest of the buffer using a callback to supply the rest of the data */
    if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
        rc = 0;  
        goto exit;
    }

    header.byte = c->readbuf[0];
    rc = header.bits.type;
    if (c->keepAliveInterval > 0)
        TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
exit:
    return rc;
}

// assume topic filter and name is in correct format
// # can only be at end
// + and # can only be next to separator
static char isTopicMatched(char* topicFilter, MQTTString* topicName)
{
//DJ: Filterbedingung "aa/AA/#" und Name "aa/AA" nicht erfüllt
//DJ: Filterbedingung "sport/+" und Name "sport/" nicht erfüllt
//DJ: Filterbedingung "sport+" ist lt. spec ungültig führt hier aber gegen
//                     Name "sport44" zu einem 'Match'
//DJ: Filterbedingung "+/+" und Name "/finance" nicht erfüllt
    char* curf = topicFilter;
//DJ: Bei topicName könnte der String alternativ in topicName->cstring enthalten sein	
//    Wobei er in diesem Anwendungsfall immer in lenstring enthalten ist
    char* curn = topicName->lenstring.data;
    char* curn_end = curn + topicName->lenstring.len;

    while (*curf && curn < curn_end)
    {
        if (*curn == '/' && *curf != '/')
            break;
        if (*curf != '+' && *curf != '#' && *curf != *curn)
            break;
        if (*curf == '+')
        {   // skip until we meet the next separator, or end of string
            char* nextpos = curn + 1;
            while (nextpos < curn_end && *nextpos != '/')
                nextpos = ++curn + 1;
        }
        else if (*curf == '#')
            curn = curn_end - 1;    // skip until end of string
        curf++;
        curn++;
    };
    return (curn == curn_end) && (*curf == '\0');
}


int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
{
    int i;
    int rc = FAILURE;

    // we have to find the right message handler - indexed by topic
    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
    {
//DJ: MQTTPacket_equals() wäre nicht nötig, bereits in isTopicMatched() enthalten
        if (c->messageHandlers[i].topicFilter != 0 && 
		      (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
                isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
        {
            if (c->messageHandlers[i].fp != NULL)
            {
                MessageData md;
                NewMessageData(&md, topicName, message);
                c->messageHandlers[i].fp(&md);
                rc = SUCCESS;
            }
        }
    }
    if (rc == FAILURE && c->defaultMessageHandler != NULL)
    {
        MessageData md;
        NewMessageData(&md, topicName, message);
        c->defaultMessageHandler(&md);
        rc = SUCCESS;
    }

    return rc;
}

int keepalive(MQTTClient* c)
{
    int rc = SUCCESS;

    if (c->keepAliveInterval == 0)
        goto exit;

    if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
    {
//DJ: Wenn hier aufgrund von last_sent, wird mit sendPacket() last_send 'zurückgesetzt' 
//    Auf Ausbleiben von PINGRESP wird erst nach Ausbleiben von last_received oder erneuten
//    Ablauf von last_sent (und damit doppelte Zeit) reagiert! 
//DJ: Wenn hier aufgrund von last_received, wird last_received nicht zurückgesetzt
//    Wenn Gegenstelle nicht unmittelbar antwortet, ist diese Bedingung
//    beim nächsten cycle() Durchlauf weiterhin erfüllt, so dass dies zum Ende führt
        if (c->ping_outstanding) {
            rc = FAILURE; /* PINGRESP not received in keepalive interval */
        }
        else
        {
            Timer timer;
            TimerInit(&timer);
            TimerCountdownMS(&timer, 1000);
            int32_t len = MQTTSerialize_pingreq(c->buf, c->buf_size);
            if (len > 0 && (rc = sendPacket(c, len/*, &timer*/)) == SUCCESS) // send the ping packet
                c->ping_outstanding = 1;
        }
    }

exit:
    return rc;
}


int cycle(MQTTClient* c/*, Timer* timer*/)
{
    int32_t len = 0,
    rc = SUCCESS;
    int packet_type = readPacket(c/*, timer*/);     /* read the socket, see what work is due */

if(packet_type>0) debug(c->readbuf,c->readbuf_size,"Cycle() readbuf[] after readPacket()",0);

    switch (packet_type)
    {
        default:
            /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
            rc = packet_type;
            goto exit;
        case 0: /* timed out reading packet */
            break;
        case CONNACK:   
        case PUBACK:    
        case SUBACK:    
        case UNSUBACK:  
            break;
        case PUBLISH:
        {
            MQTTString topicName;
            MQTTMessage msg;
            int intQoS;
            msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */

            if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
               (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
                goto exit;
            msg.qos = (enum QoS)intQoS;
//DJ Fehler: msg.playloadlen ist vom Typ size_t. Die Adresse dieser Variable wird an 
//           MQTTDeserialize_publish() übergeben, welche jedoch nur int füllt. 
//           Bei 64-Bit Systmene wäre size_t 64-bit und int 32-Bit so dass hier
//           der Wert in size_t nur z.T. gesetzt ist/invalide ist

//DJ Nach Spec müsste das DUP-Flag  kontrolliert. 
//   Bei QoS=1 und DUP=1 müsste geprüft werden, ob vor Kurzen ein Paket mit der selben PacketID(msg.id) empfangen wurde
//   Wenn ja, darf dieses Paket nicht erneut verarbeitet werden.
//   Bei QoS=2 und DUP=1 darf dieses Paket nicht zugestellt werden, wenn noch kein PUBREL Paket empfangen wurde
            deliverMessage(c, &topicName, &msg);
            
            if (msg.qos != QOS0)
            {
                if (msg.qos == QOS1)
                    len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
                else if (msg.qos == QOS2)
                    len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
                if (len <= 0)
                    rc = FAILURE;
                else
                    rc = sendPacket(c, len/*, timer*/);
                if (rc == FAILURE)
                    goto exit; // there was a problem
            }            
            break;
        }
        case PUBREC:
        case PUBREL:
        {
            unsigned short mypacketid;
            unsigned char dup, type;         
            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) {
                rc = FAILURE;
            }
            else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
                (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) {
                rc = FAILURE;
            }
            else if ((rc = sendPacket(c, len/*, timer*/)) != SUCCESS) { // send the PUBREL packet
                rc = FAILURE; // there was a problem
            }
            if (rc == FAILURE)
                goto exit; // there was a problem

            break;
        }

        case PUBCOMP:  
//DJ Formell müsste bei Empfang von PUBCOMP die empfangende PacketID mit der gesendete PacketID verglichen werden
//   um diese ID für den nächsten Gebrauch freizugeben        
            break;     
        case PINGRESP:
            c->ping_outstanding = 0;
            break;
    }

    if (keepalive(c) != SUCCESS) {
      //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
      rc = FAILURE;
    }

exit:
    if (rc == SUCCESS)
        rc = packet_type;
    else if (c->isconnected)
        MQTTCloseSession(c);
    return rc;
}


int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
{
    int rc = FAILURE;
    int i = -1;

    /* first check for an existing matching slot */
    for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
    {
        if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
        {
//DJ Abfrage sollte, sofern keine Änderung des vorhandenen Eintrages wie in diesem Fall vorgesehen,
//   im Vorfeld der FOR-Schleife erfolgen!            
            if (messageHandler == NULL) /* remove existing */
            {
                c->messageHandlers[i].topicFilter = NULL;
                c->messageHandlers[i].fp = NULL;
            }
            rc = SUCCESS; /* return i when adding new subscription */
            break;
        }
    }
    /* if no existing, look for empty slot (unless we are removing) */
    if (messageHandler != NULL) {
        if (rc == FAILURE)
        {
            for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
            {
                if (c->messageHandlers[i].topicFilter == NULL)
                {
                    rc = SUCCESS;
                    break;
                }
            }
        }
        if (i < MAX_MESSAGE_HANDLERS)
        {
//DJ warum nicht in vorheriger IF Bedingung abgehandelt?            
            c->messageHandlers[i].topicFilter = topicFilter;
            c->messageHandlers[i].fp = messageHandler;
        }
    }
    return rc;
}

int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
       messageHandler messageHandler, MQTTSubackData* data)
{
    int rc = FAILURE;
    Timer timer;
    int32_t len = 0;
    MQTTString topic = MQTTString_initializer;
    topic.cstring = (char *)topicFilter;

#if defined(MQTT_TASK)
	  MutexLock(&c->mutex);
#endif
	if (!c->isconnected) {
	    goto exit;
    }
    TimerInit(&timer);
    TimerCountdownMS(&timer, c->command_timeout_ms);

    int _qos = qos;    
    len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, &_qos);
    if (len <= 0)
        goto exit;
    if ((rc = sendPacket(c, len/*, &timer*/)) != SUCCESS) // send the subscribe packet
        goto exit;             // there was a problem

    if (waitfor(c, SUBACK/*, &timer*/) == SUBACK)      // wait for suback
    {
        int count = 0;
        unsigned short mypacketid;
        int grantedQoS = QOS0;
        int retval = MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, c->readbuf, c->readbuf_size);
        data->grantedQoS = grantedQoS;
//DJ In grantedQoS wird mit 0x80 ein Fehler angekündigt. Sollte dem Anwender zurückgemeldet werden
        data->grantedQoS = QOS0;   //DJ warum überschrieben!
//DJ mypacketid sollte mit 'getNextPacketId()' verglichen werden      
        if (retval == 1)
        {
            if (data->grantedQoS != 0x80)  //DJ Abfrage aufgrund QOS0 unnötig!
                rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
        }
    }
    else
        rc = FAILURE;

exit:
    if (rc == FAILURE)
        MQTTCloseSession(c);

#if defined(MQTT_TASK)
	  MutexUnlock(&c->mutex);
#endif
    return rc;
}


int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
{
    int rc = FAILURE;
    Timer timer;
    MQTTString topic = MQTTString_initializer;
    topic.cstring = (char *)topicFilter;
    int32_t len = 0;

#if defined(MQTT_TASK)
	  MutexLock(&c->mutex);
#endif
    if (!c->isconnected) {
		  goto exit;
      }

    TimerInit(&timer);
    TimerCountdownMS(&timer, c->command_timeout_ms);

    if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
        goto exit;
    if ((rc = sendPacket(c, len/*, &timer*/)) != SUCCESS) // send the subscribe packet
        goto exit; // there was a problem

    if (waitfor(c, UNSUBACK/*, &timer*/) == UNSUBACK)
    {
        unsigned short mypacketid;  // should be the same as the packetid above
        if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
        {
            /* remove the subscription message handler associated with this topic, if there is one */
            MQTTSetMessageHandler(c, topicFilter, NULL);
        }
//DJ  mypacketid wird nicht gegen 'getNextPacketID()' gerprüft
    }
    else
        rc = FAILURE;

exit:
    if (rc == FAILURE)
        MQTTCloseSession(c);
#if defined(MQTT_TASK)
	  MutexUnlock(&c->mutex);
#endif
    return rc;
}

int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
{
    int rc = FAILURE;
    Timer timer;
    MQTTString topic = MQTTString_initializer;
    topic.cstring = (char *)topicName;
    int32_t len = 0;

#if defined(MQTT_TASK)
	  MutexLock(&c->mutex);
#endif
	  if (!c->isconnected) {
		    goto exit;
      }

    TimerInit(&timer);
    TimerCountdownMS(&timer, c->command_timeout_ms);

    if (message->qos == QOS1 || message->qos == QOS2)
        message->id = getNextPacketId(c);

    len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
              topic, (unsigned char*)message->payload, message->payloadlen);
    if (len <= 0)
        goto exit;        
    if ((rc = sendPacket(c, len/*, &timer*/)) != SUCCESS) // send the subscribe packet
        goto exit; // there was a problem

    if (message->qos == QOS1)
    {
//DJ: Lt. Spec muss nicht exclusiv gewartet werden, sondern es dürfen weitere 
//    Publish Pakete währenddessen gesendet werden.
//    Lässt sich in dieser Realisierung jedoch nicht einfach lösen.
        if (waitfor(c, PUBACK/*, &timer*/) == PUBACK)
        {
            unsigned short mypacketid;
            unsigned char dup, type;
            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) {
                rc = FAILURE;
            }
//DJ: Hier sollte mindestens mypacketid gegen message->id geprüft werden!!!                
        }
        else
            rc = FAILURE;
//DJ: Wenn das Paket ausbleibt, müsste meines Erachtens das Paket erneut gesendet werden (DUP=1)        
//    Hier wird aber die Verbindung geschlossen. Wenn dann die neue Verbindung mit CleanSession=0
//    gestartet wird, könnte es zu Problemen führen, wenn hier eine Publish mit noch nicht
//    quitierter PacketId genutzt wird
    }
    else if (message->qos == QOS2)
    {
        if (waitfor(c, PUBCOMP/*, &timer*/) == PUBCOMP)
        {
            unsigned short mypacketid;
            unsigned char dup, type;
            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) {
                rc = FAILURE;
            }                
//DJ: Hier sollte mindestens mypacketid gegen message->id geprüft werden!!!                
        }
        else
            rc = FAILURE;
    }

exit:
    if (rc == FAILURE)
        MQTTCloseSession(c);

#if defined(MQTT_TASK)
	  MutexUnlock(&c->mutex);
#endif
    return rc;
}

@gerhil
Copy link

gerhil commented Mar 12, 2025

Great.

There is an error in Line 259 in MQTTFormat.c.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants