diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 0dd85400..d77c14a6 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -474,6 +474,14 @@ bool PubSubClient::loop() { _client->stop(); pingOutstanding = false; return false; + } else if (_bufferWritePos > 0) { + // There is still data in the buffer to be sent, so send it now instead of a ping + if (flushBuffer() == 0) { + _state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + pingOutstanding = false; + return false; + } } else { this->buffer[0] = MQTTPINGREQ; this->buffer[1] = 0; @@ -524,7 +532,6 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t ple bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) { if (beginPublish(topic, plength, qos, retained)) { size_t rc = write(payload, plength); - lastOutActivity = millis(); return endPublish() && (rc == plength); } return false; @@ -544,11 +551,7 @@ bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t p bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) { if (beginPublish(topic, plength, qos, retained)) { - size_t rc = 0; - for (size_t i = 0; i < plength; i++) { - rc += _client->write((uint8_t)pgm_read_byte_near(payload + i)); - } - lastOutActivity = millis(); + size_t rc = write_P(payload, plength); return endPublish() && (rc == plength); } return false; @@ -588,6 +591,10 @@ bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos, bool PubSubClient::endPublish() { if (connected()) { + if (_bufferWritePos > 0) { + // still data in the buffer to be sent + if (flushBuffer() == 0) return false; + } return true; } return false; @@ -628,13 +635,21 @@ uint8_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) { } size_t PubSubClient::write(uint8_t data) { - lastOutActivity = millis(); - return _client->write(data); + return appendBuffer(data); } size_t PubSubClient::write(const uint8_t* buf, size_t size) { - lastOutActivity = millis(); - return _client->write(buf, size); + for (size_t i = 0; i < size; i++) { + if (appendBuffer(buf[i]) == 0) return i; + } + return size; +} + +size_t PubSubClient::write_P(const uint8_t* buf, size_t size) { + for (size_t i = 0; i < size; i++) { + if (appendBuffer((uint8_t)pgm_read_byte_near(buf + i)) == 0) return i; + } + return size; } /** @@ -646,34 +661,48 @@ size_t PubSubClient::write(const uint8_t* buf, size_t size) { * @return True if successfully sent, otherwise false if buildHeader() failed or buf could not be written. */ bool PubSubClient::write(uint8_t header, uint8_t* buf, size_t length) { - bool result = true; - size_t rc; uint8_t hdrLen = buildHeader(header, buf, length); if (hdrLen == 0) return false; // exit here in case of header generation failure + return writeBuffer(MQTT_MAX_HEADER_SIZE - hdrLen, hdrLen + length); +} + +/** + * @brief Write the internal buffer to the client / MQTT broker. + * + * @param pos Position in the buffer to start writing from. + * @param size Number of bytes to write from the buffer. + * @return Number of bytes written to the client / MQTT broker (0 .. bufferSize). If 0 is returned a write error occurred or buffer index error. + */ +size_t PubSubClient::writeBuffer(size_t pos, size_t size) { + size_t rc = 0; + if (size > 0 && pos + size <= this->bufferSize) { #ifdef MQTT_MAX_TRANSFER_SIZE - uint8_t* writeBuf = buf + (MQTT_MAX_HEADER_SIZE - hdrLen); - size_t bytesRemaining = length + hdrLen; // Match the length type - size_t bytesToWrite; - while ((bytesRemaining > 0) && result) { - yield(); - bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining; - rc = _client->write(writeBuf, bytesToWrite); - result = (rc == bytesToWrite); - bytesRemaining -= rc; - writeBuf += rc; - if (result) { - lastOutActivity = millis(); + uint8_t* writeBuf = buffer + pos; + size_t bytesRemaining = size; + bool result = true; + while ((bytesRemaining > 0) && result) { + size_t bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining; + size_t bytesWritten = _client->write(writeBuf, bytesToWrite); + result = (bytesWritten == bytesToWrite); + bytesRemaining -= bytesWritten; + writeBuf += bytesWritten; + if (result) { + lastOutActivity = millis(); + } + yield(); } - } + rc = result ? size : 0; // if result is false indicate a write error #else - rc = _client->write(buf + (MQTT_MAX_HEADER_SIZE - hdrLen), length + hdrLen); - result = (rc == length + hdrLen); - if (result) { - lastOutActivity = millis(); - } + rc = _client->write(buffer + pos, size); + if (rc == size) { + lastOutActivity = millis(); + } else { + rc = 0; // indicate a write error + } #endif - return result; + } + return rc; } /** @@ -723,6 +752,35 @@ size_t PubSubClient::writeNextMsgId(uint8_t* buf, size_t pos, size_t size) { return pos; } +/** + * @brief Append a byte to the internal buffer. If the buffer is full it is flushed to the client / MQTT broker. + * + * @param data Byte to append to the buffer. + * @return Number of bytes appended to the buffer (0 or 1). If 0 is returned a write error occurred. + */ +size_t PubSubClient::appendBuffer(uint8_t data) { + buffer[_bufferWritePos++] = data; + if (_bufferWritePos >= bufferSize) { + if (flushBuffer() == 0) return 0; + } + return 1; +} + +/** + * @brief Flush the internal buffer (bytes 0 .. _bufferWritePos) to the client / MQTT broker. + * This is used by endPublish() to flush data written by appendBuffer(). + * + * @return Number of bytes written to the client / MQTT broker (0 .. bufferSize). If 0 is returned a write error occurred or the buffer was empty. + */ +size_t PubSubClient::flushBuffer() { + size_t rc = 0; + if (connected()) { + rc = writeBuffer(0, _bufferWritePos); + } + _bufferWritePos = 0; + return rc; +} + bool PubSubClient::subscribe(const char* topic) { return subscribe(topic, MQTT_QOS0); } diff --git a/src/PubSubClient.h b/src/PubSubClient.h index 01faf141..9b1857c0 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -178,6 +178,7 @@ class PubSubClient : public Print { Client* _client{}; uint8_t* buffer{}; size_t bufferSize{}; + size_t _bufferWritePos{}; unsigned long keepAliveMillis{}; unsigned long socketTimeoutMillis{}; uint16_t nextMsgId{}; @@ -197,9 +198,14 @@ class PubSubClient : public Print { bool readByte(uint8_t* result, size_t* pos); uint8_t buildHeader(uint8_t header, uint8_t* buf, size_t length); bool write(uint8_t header, uint8_t* buf, size_t length); + size_t writeBuffer(size_t pos, size_t size); size_t writeString(const char* string, uint8_t* buf, size_t pos, size_t size); size_t writeNextMsgId(uint8_t* buf, size_t pos, size_t size); + // Add to buffer and flush if full (only to be used with beginPublish/endPublish) + size_t appendBuffer(uint8_t data); + size_t flushBuffer(); + public: /** * @brief Creates an uninitialised client instance. @@ -637,19 +643,33 @@ class PubSubClient : public Print { /** * @brief Writes a single byte as a component of a publish started with a call to beginPublish. + * For performance reasons, this will be appended to the internal buffer, + * which will be flushed when full or on a call to endPublish(). * @param data A byte to write to the publish payload. - * @return The number of bytes written. + * @return The number of bytes written (0 or 1). If 0 is returned a write error occurred. */ virtual size_t write(uint8_t data); /** * @brief Writes an array of bytes as a component of a publish started with a call to beginPublish. + * For performance reasons, this will be appended to the internal buffer, + * which will be flushed when full or on a call to endPublish(). * @param buf The bytes to write. * @param size The length of the payload to be sent. - * @return The number of bytes written. + * @return The number of bytes written. If return value is != size a write error occurred. */ virtual size_t write(const uint8_t* buf, size_t size); + /** + * @brief Writes an array of progmem bytes as a component of a publish started with a call to beginPublish. + * For performance reasons, this will be appended to the internal buffer, + * which will be flushed when full or on a call to endPublish(). + * @param buf The bytes to write. + * @param size The length of the payload to be sent. + * @return The number of bytes written. If return value is != size a write error occurred. + */ + size_t write_P(const uint8_t* buf, size_t size); + /** * @brief Subscribes to messages published to the specified topic using QoS 0. * @param topic The topic to subscribe to.