Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
786247a
[PubSubClient3] Speedup publish large messages using buffer
TD-er Jul 18, 2025
f099948
fix lint errors
hmueller01 Jul 19, 2025
c8379c7
change _bufferWritePos from int to size_t to fix compiler error
hmueller01 Jul 19, 2025
ff4b884
Merge branch 'master' into feature/optimize_large_publish
hmueller01 Aug 28, 2025
19681bb
renamed parameter data to buf to adjust to other functions
hmueller01 Aug 30, 2025
df05c95
renamed parameter data to buf to adjust to other functions, added fun…
hmueller01 Aug 30, 2025
44e19b1
lastOutActivity is ony updated if data are really send, do not ping a…
hmueller01 Aug 30, 2025
4dfc310
do increment of _bufferWritePos inline
hmueller01 Aug 30, 2025
2d3b2f8
check that all data are written to the client
hmueller01 Aug 30, 2025
390fd7b
implement write_P() solves #36
hmueller01 Aug 30, 2025
7a965a7
handle return value of flushBuffer() in endPublish()
hmueller01 Aug 30, 2025
300e6e2
do not use prog_uint8_t (only in compat env defined)
hmueller01 Aug 30, 2025
d146618
do not set lastOutActivity after calls to write() (will automatically…
hmueller01 Aug 30, 2025
40fe6e4
implement MQTT_MAX_TRANSFER_SIZE in flushBuffer()
hmueller01 Aug 30, 2025
5b2c586
refactoring to avoid double implementation of client write methods by…
hmueller01 Sep 2, 2025
99f9ce9
updated comments of write functions return values
hmueller01 Sep 4, 2025
2aea3d9
Merge branch 'master' into feature/optimize_large_publish
hmueller01 Sep 11, 2025
98ef237
use brackets around if && clauses for better clarity
hmueller01 Oct 16, 2025
d98429e
added test_publish_long()
hmueller01 Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 95 additions & 37 deletions src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ size_t PubSubClient::readPacket(uint8_t* hdrLen) {
for (size_t i = start; i < length; i++) {
if (!readByte(&digit)) return 0;
if (this->stream) {
if (isPublish && idx - *hdrLen - 2 > skip) {
if (isPublish && (idx - *hdrLen - 2 > skip)) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the sum before the compare...

this->stream->write(digit);
}
}
Expand All @@ -351,7 +351,7 @@ size_t PubSubClient::readPacket(uint8_t* hdrLen) {
idx++;
}

if (!this->stream && idx > this->bufferSize) {
if (!this->stream && (idx > this->bufferSize)) {
DEBUG_PSC_PRINTF("readPacket ignoring packet of size %zu exceeding buffer of size %zu\n", length, this->bufferSize);
len = 0; // This will cause the packet to be ignored.
}
Expand Down Expand Up @@ -474,6 +474,14 @@ bool PubSubClient::loop() {
_client->stop();
pingOutstanding = false;
return false;
} else if (_bufferWritePos > 0) {
// There is still data in the buffer to be sent, so send it now instead of a ping
if (flushBuffer() == 0) {
_state = MQTT_CONNECTION_TIMEOUT;
_client->stop();
pingOutstanding = false;
return false;
}
} else {
this->buffer[0] = MQTTPINGREQ;
this->buffer[1] = 0;
Expand Down Expand Up @@ -524,7 +532,6 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t ple
bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
if (beginPublish(topic, plength, qos, retained)) {
size_t rc = write(payload, plength);
lastOutActivity = millis();
return endPublish() && (rc == plength);
}
return false;
Expand All @@ -544,11 +551,7 @@ bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t p

bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
if (beginPublish(topic, plength, qos, retained)) {
size_t rc = 0;
for (size_t i = 0; i < plength; i++) {
rc += _client->write((uint8_t)pgm_read_byte_near(payload + i));
}
lastOutActivity = millis();
size_t rc = write_P(payload, plength);
return endPublish() && (rc == plength);
}
return false;
Expand All @@ -567,7 +570,7 @@ bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos,
}
const size_t nextMsgLen = (qos > MQTT_QOS0) ? 2 : 0; // add 2 bytes for nextMsgId if QoS > 0
// check if the header, the topic (including 2 length bytes) and nextMsgId fit into the buffer
if (connected() && MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 + nextMsgLen <= this->bufferSize) {
if (connected() && (MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 + nextMsgLen <= this->bufferSize)) {
// first write the topic at the end of the maximal variable header (MQTT_MAX_HEADER_SIZE) to the buffer
size_t topicLen = writeString(topic, this->buffer, MQTT_MAX_HEADER_SIZE, this->bufferSize) - MQTT_MAX_HEADER_SIZE;
if (qos > MQTT_QOS0) {
Expand All @@ -588,6 +591,10 @@ bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos,

bool PubSubClient::endPublish() {
if (connected()) {
if (_bufferWritePos > 0) {
// still data in the buffer to be sent
if (flushBuffer() == 0) return false;
}
return true;
}
return false;
Expand Down Expand Up @@ -615,7 +622,7 @@ uint8_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) {
digit |= 0x80;
}
hdrBuf[hdrLen++] = digit;
} while (len > 0 && hdrLen < MQTT_MAX_HEADER_SIZE - 1);
} while ((len > 0) && (hdrLen < MQTT_MAX_HEADER_SIZE - 1));

if (len > 0) {
ERROR_PSC_PRINTF_P("buildHeader() length too big %zu, left %zu\n", length, len);
Expand All @@ -628,13 +635,21 @@ uint8_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) {
}

size_t PubSubClient::write(uint8_t data) {
lastOutActivity = millis();
return _client->write(data);
return appendBuffer(data);
}

size_t PubSubClient::write(const uint8_t* buf, size_t size) {
lastOutActivity = millis();
return _client->write(buf, size);
for (size_t i = 0; i < size; i++) {
if (appendBuffer(buf[i]) == 0) return i;
}
return size;
}

size_t PubSubClient::write_P(const uint8_t* buf, size_t size) {
for (size_t i = 0; i < size; i++) {
if (appendBuffer((uint8_t)pgm_read_byte_near(buf + i)) == 0) return i;
}
return size;
}

/**
Expand All @@ -646,34 +661,48 @@ size_t PubSubClient::write(const uint8_t* buf, size_t size) {
* @return True if successfully sent, otherwise false if buildHeader() failed or buf could not be written.
*/
bool PubSubClient::write(uint8_t header, uint8_t* buf, size_t length) {
bool result = true;
size_t rc;
uint8_t hdrLen = buildHeader(header, buf, length);
if (hdrLen == 0) return false; // exit here in case of header generation failure

return writeBuffer(MQTT_MAX_HEADER_SIZE - hdrLen, hdrLen + length);
}

/**
* @brief Write the internal buffer to the client / MQTT broker.
*
* @param pos Position in the buffer to start writing from.
* @param size Number of bytes to write from the buffer.
* @return Number of bytes written to the client / MQTT broker (0 .. bufferSize). If 0 is returned a write error occurred or buffer index error.
*/
size_t PubSubClient::writeBuffer(size_t pos, size_t size) {
size_t rc = 0;
if ((size > 0) && (pos + size <= this->bufferSize)) {
#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf + (MQTT_MAX_HEADER_SIZE - hdrLen);
size_t bytesRemaining = length + hdrLen; // Match the length type
size_t bytesToWrite;
while ((bytesRemaining > 0) && result) {
yield();
bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining;
rc = _client->write(writeBuf, bytesToWrite);
result = (rc == bytesToWrite);
bytesRemaining -= rc;
writeBuf += rc;
if (result) {
lastOutActivity = millis();
uint8_t* writeBuf = buffer + pos;
size_t bytesRemaining = size;
bool result = true;
while ((bytesRemaining > 0) && result) {
size_t bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining;
size_t bytesWritten = _client->write(writeBuf, bytesToWrite);
result = (bytesWritten == bytesToWrite);
bytesRemaining -= bytesWritten;
writeBuf += bytesWritten;
if (result) {
lastOutActivity = millis();
}
yield();
}
}
rc = result ? size : 0; // if result is false indicate a write error
#else
rc = _client->write(buf + (MQTT_MAX_HEADER_SIZE - hdrLen), length + hdrLen);
result = (rc == length + hdrLen);
if (result) {
lastOutActivity = millis();
}
rc = _client->write(buffer + pos, size);
if (rc == size) {
lastOutActivity = millis();
} else {
rc = 0; // indicate a write error
}
#endif
return result;
}
return rc;
}

/**
Expand All @@ -692,7 +721,7 @@ size_t PubSubClient::writeString(const char* string, uint8_t* buf, size_t pos, s
if (!string) return pos;

size_t sLen = strlen(string);
if (pos + 2 + sLen <= size && sLen <= 0xFFFF) {
if ((pos + 2 + sLen <= size) && (sLen <= 0xFFFF)) {
buf[pos++] = (uint8_t)(sLen >> 8);
buf[pos++] = (uint8_t)(sLen & 0xFF);
memcpy(buf + pos, string, sLen);
Expand All @@ -713,7 +742,7 @@ size_t PubSubClient::writeString(const char* string, uint8_t* buf, size_t pos, s
* @return New position in the buffer (pos + 2), or pos if a buffer overrun would occur.
*/
size_t PubSubClient::writeNextMsgId(uint8_t* buf, size_t pos, size_t size) {
if (pos + 2 <= size) {
if ((pos + 2) <= size) {
nextMsgId = (++nextMsgId == 0) ? 1 : nextMsgId; // increment msgId (must not be 0, so start at 1)
buf[pos++] = (uint8_t)(nextMsgId >> 8);
buf[pos++] = (uint8_t)(nextMsgId & 0xFF);
Expand All @@ -723,6 +752,35 @@ size_t PubSubClient::writeNextMsgId(uint8_t* buf, size_t pos, size_t size) {
return pos;
}

/**
* @brief Append a byte to the internal buffer. If the buffer is full it is flushed to the client / MQTT broker.
*
* @param data Byte to append to the buffer.
* @return Number of bytes appended to the buffer (0 or 1). If 0 is returned a write error occurred.
*/
size_t PubSubClient::appendBuffer(uint8_t data) {
buffer[_bufferWritePos++] = data;
if (_bufferWritePos >= bufferSize) {
if (flushBuffer() == 0) return 0;
}
return 1;
}

/**
* @brief Flush the internal buffer (bytes 0 .. _bufferWritePos) to the client / MQTT broker.
* This is used by endPublish() to flush data written by appendBuffer().
*
* @return Number of bytes written to the client / MQTT broker (0 .. bufferSize). If 0 is returned a write error occurred or the buffer was empty.
*/
size_t PubSubClient::flushBuffer() {
size_t rc = 0;
if (connected()) {
rc = writeBuffer(0, _bufferWritePos);
}
_bufferWritePos = 0;
return rc;
}

bool PubSubClient::subscribe(const char* topic) {
return subscribe(topic, MQTT_QOS0);
}
Expand Down
24 changes: 22 additions & 2 deletions src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class PubSubClient : public Print {
Client* _client{};
uint8_t* buffer{};
size_t bufferSize{};
size_t _bufferWritePos{};
unsigned long keepAliveMillis{};
unsigned long socketTimeoutMillis{};
uint16_t nextMsgId{};
Expand All @@ -197,9 +198,14 @@ class PubSubClient : public Print {
bool readByte(uint8_t* result, size_t* pos);
uint8_t buildHeader(uint8_t header, uint8_t* buf, size_t length);
bool write(uint8_t header, uint8_t* buf, size_t length);
size_t writeBuffer(size_t pos, size_t size);
size_t writeString(const char* string, uint8_t* buf, size_t pos, size_t size);
size_t writeNextMsgId(uint8_t* buf, size_t pos, size_t size);

// Add to buffer and flush if full (only to be used with beginPublish/endPublish)
size_t appendBuffer(uint8_t data);
size_t flushBuffer();

public:
/**
* @brief Creates an uninitialised client instance.
Expand Down Expand Up @@ -637,19 +643,33 @@ class PubSubClient : public Print {

/**
* @brief Writes a single byte as a component of a publish started with a call to beginPublish.
* For performance reasons, this will be appended to the internal buffer,
* which will be flushed when full or on a call to endPublish().
* @param data A byte to write to the publish payload.
* @return The number of bytes written.
* @return The number of bytes written (0 or 1). If 0 is returned a write error occurred.
*/
virtual size_t write(uint8_t data);

/**
* @brief Writes an array of bytes as a component of a publish started with a call to beginPublish.
* For performance reasons, this will be appended to the internal buffer,
* which will be flushed when full or on a call to endPublish().
* @param buf The bytes to write.
* @param size The length of the payload to be sent.
* @return The number of bytes written.
* @return The number of bytes written. If return value is != size a write error occurred.
*/
virtual size_t write(const uint8_t* buf, size_t size);

/**
* @brief Writes an array of progmem bytes as a component of a publish started with a call to beginPublish.
* For performance reasons, this will be appended to the internal buffer,
* which will be flushed when full or on a call to endPublish().
* @param buf The bytes to write.
* @param size The length of the payload to be sent.
* @return The number of bytes written. If return value is != size a write error occurred.
*/
size_t write_P(const uint8_t* buf, size_t size);

/**
* @brief Subscribes to messages published to the specified topic using QoS 0.
* @param topic The topic to subscribe to.
Expand Down