Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 89 additions & 31 deletions src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,14 @@ bool PubSubClient::loop() {
_client->stop();
pingOutstanding = false;
return false;
} else if (_bufferWritePos > 0) {
// There is still data in the buffer to be sent, so send it now instead of a ping
if (flushBuffer() == 0) {
_state = MQTT_CONNECTION_TIMEOUT;
_client->stop();
pingOutstanding = false;
return false;
}
} else {
this->buffer[0] = MQTTPINGREQ;
this->buffer[1] = 0;
Expand Down Expand Up @@ -524,7 +532,6 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t ple
bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
if (beginPublish(topic, plength, qos, retained)) {
size_t rc = write(payload, plength);
lastOutActivity = millis();
return endPublish() && (rc == plength);
}
return false;
Expand All @@ -544,11 +551,7 @@ bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t p

bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
if (beginPublish(topic, plength, qos, retained)) {
size_t rc = 0;
for (size_t i = 0; i < plength; i++) {
rc += _client->write((uint8_t)pgm_read_byte_near(payload + i));
}
lastOutActivity = millis();
size_t rc = write_P(payload, plength);
return endPublish() && (rc == plength);
}
return false;
Expand Down Expand Up @@ -588,6 +591,10 @@ bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos,

bool PubSubClient::endPublish() {
if (connected()) {
if (_bufferWritePos > 0) {
// still data in the buffer to be sent
if (flushBuffer() == 0) return false;
}
return true;
}
return false;
Expand Down Expand Up @@ -628,13 +635,21 @@ uint8_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) {
}

size_t PubSubClient::write(uint8_t data) {
lastOutActivity = millis();
return _client->write(data);
return appendBuffer(data);
}

size_t PubSubClient::write(const uint8_t* buf, size_t size) {
lastOutActivity = millis();
return _client->write(buf, size);
for (size_t i = 0; i < size; i++) {
if (appendBuffer(buf[i]) == 0) return i;
}
return size;
}

size_t PubSubClient::write_P(const uint8_t* buf, size_t size) {
for (size_t i = 0; i < size; i++) {
if (appendBuffer((uint8_t)pgm_read_byte_near(buf + i)) == 0) return i;
}
return size;
}

/**
Expand All @@ -646,34 +661,48 @@ size_t PubSubClient::write(const uint8_t* buf, size_t size) {
* @return True if successfully sent, otherwise false if buildHeader() failed or buf could not be written.
*/
bool PubSubClient::write(uint8_t header, uint8_t* buf, size_t length) {
bool result = true;
size_t rc;
uint8_t hdrLen = buildHeader(header, buf, length);
if (hdrLen == 0) return false; // exit here in case of header generation failure

return writeBuffer(MQTT_MAX_HEADER_SIZE - hdrLen, hdrLen + length);
}

/**
* @brief Write the internal buffer to the client / MQTT broker.
*
* @param pos Position in the buffer to start writing from.
* @param size Number of bytes to write from the buffer.
* @return Number of bytes written to the client / MQTT broker (0 .. bufferSize). If 0 is returned a write error occurred or buffer index error.
*/
size_t PubSubClient::writeBuffer(size_t pos, size_t size) {
size_t rc = 0;
if (size > 0 && pos + size <= this->bufferSize) {
#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf + (MQTT_MAX_HEADER_SIZE - hdrLen);
size_t bytesRemaining = length + hdrLen; // Match the length type
size_t bytesToWrite;
while ((bytesRemaining > 0) && result) {
yield();
bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining;
rc = _client->write(writeBuf, bytesToWrite);
result = (rc == bytesToWrite);
bytesRemaining -= rc;
writeBuf += rc;
if (result) {
lastOutActivity = millis();
uint8_t* writeBuf = buffer + pos;
size_t bytesRemaining = size;
bool result = true;
while ((bytesRemaining > 0) && result) {
size_t bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining;
size_t bytesWritten = _client->write(writeBuf, bytesToWrite);
result = (bytesWritten == bytesToWrite);
bytesRemaining -= bytesWritten;
writeBuf += bytesWritten;
if (result) {
lastOutActivity = millis();
}
yield();
}
}
rc = result ? size : 0; // if result is false indicate a write error
#else
rc = _client->write(buf + (MQTT_MAX_HEADER_SIZE - hdrLen), length + hdrLen);
result = (rc == length + hdrLen);
if (result) {
lastOutActivity = millis();
}
rc = _client->write(buffer + pos, size);
if (rc == size) {
lastOutActivity = millis();
} else {
rc = 0; // indicate a write error
}
#endif
return result;
}
return rc;
}

/**
Expand Down Expand Up @@ -723,6 +752,35 @@ size_t PubSubClient::writeNextMsgId(uint8_t* buf, size_t pos, size_t size) {
return pos;
}

/**
* @brief Append a byte to the internal buffer. If the buffer is full it is flushed to the client / MQTT broker.
*
* @param data Byte to append to the buffer.
* @return Number of bytes appended to the buffer (0 or 1). If 0 is returned a write error occurred.
*/
size_t PubSubClient::appendBuffer(uint8_t data) {
buffer[_bufferWritePos++] = data;
if (_bufferWritePos >= bufferSize) {
if (flushBuffer() == 0) return 0;
}
return 1;
}

/**
* @brief Flush the internal buffer (bytes 0 .. _bufferWritePos) to the client / MQTT broker.
* This is used by endPublish() to flush data written by appendBuffer().
*
* @return Number of bytes written to the client / MQTT broker (0 .. bufferSize). If 0 is returned a write error occurred or the buffer was empty.
*/
size_t PubSubClient::flushBuffer() {
size_t rc = 0;
if (connected()) {
rc = writeBuffer(0, _bufferWritePos);
}
_bufferWritePos = 0;
return rc;
}

bool PubSubClient::subscribe(const char* topic) {
return subscribe(topic, MQTT_QOS0);
}
Expand Down
24 changes: 22 additions & 2 deletions src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class PubSubClient : public Print {
Client* _client{};
uint8_t* buffer{};
size_t bufferSize{};
size_t _bufferWritePos{};
unsigned long keepAliveMillis{};
unsigned long socketTimeoutMillis{};
uint16_t nextMsgId{};
Expand All @@ -197,9 +198,14 @@ class PubSubClient : public Print {
bool readByte(uint8_t* result, size_t* pos);
uint8_t buildHeader(uint8_t header, uint8_t* buf, size_t length);
bool write(uint8_t header, uint8_t* buf, size_t length);
size_t writeBuffer(size_t pos, size_t size);
size_t writeString(const char* string, uint8_t* buf, size_t pos, size_t size);
size_t writeNextMsgId(uint8_t* buf, size_t pos, size_t size);

// Add to buffer and flush if full (only to be used with beginPublish/endPublish)
size_t appendBuffer(uint8_t data);
size_t flushBuffer();

public:
/**
* @brief Creates an uninitialised client instance.
Expand Down Expand Up @@ -637,19 +643,33 @@ class PubSubClient : public Print {

/**
* @brief Writes a single byte as a component of a publish started with a call to beginPublish.
* For performance reasons, this will be appended to the internal buffer,
* which will be flushed when full or on a call to endPublish().
* @param data A byte to write to the publish payload.
* @return The number of bytes written.
* @return The number of bytes written (0 or 1). If 0 is returned a write error occurred.
*/
virtual size_t write(uint8_t data);

/**
* @brief Writes an array of bytes as a component of a publish started with a call to beginPublish.
* For performance reasons, this will be appended to the internal buffer,
* which will be flushed when full or on a call to endPublish().
* @param buf The bytes to write.
* @param size The length of the payload to be sent.
* @return The number of bytes written.
* @return The number of bytes written. If return value is != size a write error occurred.
*/
virtual size_t write(const uint8_t* buf, size_t size);

/**
* @brief Writes an array of progmem bytes as a component of a publish started with a call to beginPublish.
* For performance reasons, this will be appended to the internal buffer,
* which will be flushed when full or on a call to endPublish().
* @param buf The bytes to write.
* @param size The length of the payload to be sent.
* @return The number of bytes written. If return value is != size a write error occurred.
*/
size_t write_P(const uint8_t* buf, size_t size);

/**
* @brief Subscribes to messages published to the specified topic using QoS 0.
* @param topic The topic to subscribe to.
Expand Down