diff --git a/include/mqtt.h b/include/mqtt.h index c667e4f..36ec79e 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -3,6 +3,11 @@ #include "globals.h" #include +#include + +// Size of buffer to serialize status messages into, can also be used +// as size of the preceeding JsonDocument +#define STATUS_BUFSIZE 512 namespace mqtt { typedef void (*calibrateCo2SensorCallback_t)(uint16_t); @@ -26,6 +31,8 @@ namespace mqtt { void publishSensors(uint16_t mask); void publishConfiguration(); + void sendStatus(DynamicJsonDocument *status); + void sendStatusMsg(const char *statusmsg); void mqttLoop(void* pvParameters); diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 3819fe6..d38e976 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -24,10 +24,14 @@ namespace mqtt { struct MqttMessage { uint8_t cmd; uint16_t mask; + // Only for X_CMD_PUBLISH_STATUS, pointer to status doc + // owned by MqttMessage, will be freed after use. + DynamicJsonDocument *status; }; const uint8_t X_CMD_PUBLISH_SENSORS = bit(0); const uint8_t X_CMD_PUBLISH_CONFIGURATION = bit(1); + const uint8_t X_CMD_PUBLISH_STATUS = bit(2); TaskHandle_t mqttTask; QueueHandle_t mqttQueue; @@ -45,6 +49,7 @@ namespace mqtt { getSPS30StatusCallback_t getSPS30StatusCallback; uint32_t lastReconnectAttempt = 0; + uint32_t connectionAttempts = 0; void publishSensors(uint16_t mask) { if (!WiFi.isConnected() || !mqtt_client->connected()) return; @@ -255,12 +260,58 @@ namespace mqtt { } } + // Queue a status message with provided document. + // This takes ownership (and will free) the provided document after serialization. + // If the document does not contain a key name 'online', it will be added and set + // to true. + // The document is serialized into a buffer of STATUS_BUFSIZE bytes, so you should + // make sure to keep your document below that. + void sendStatus(DynamicJsonDocument *status) { + MqttMessage msg; + msg.cmd = X_CMD_PUBLISH_STATUS; + msg.status = status; + if (mqttQueue) xQueueSendToBack(mqttQueue, (void*)&msg, pdMS_TO_TICKS(100)); + } + + // Helper for sending a simple status message. Must be shorter than + // STATUS_BUFSZIZE - 48 (e.g. 464 chars by default) to allow for JSON overhead. + void sendStatusMsg(const char *msg) { + DynamicJsonDocument *status = new DynamicJsonDocument(STATUS_BUFSIZE); + (*status)["status"] = msg; + if (status->overflowed()) { + ESP_LOGW(TAG, "Status overflowed buffer, may not be sent: %s", msg); + } + sendStatus(status); + } + + void publishStatusInternal(DynamicJsonDocument *status) { + if (!mqtt_client->connected()) { + ESP_LOGE(TAG, "MQTT disconnected; status message lost."); + delete status; + return; + } + char msg[STATUS_BUFSIZE]; + if ((*status)["online"].isNull()) { + (*status)["online"] = true; + } + if (serializeJson(*status, msg) == 0) { + ESP_LOGE(TAG, "Failed to serialise status payload; message lost."); + delete status; + return; + } + delete status; + char buf[256]; + sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId); + mqtt_client->publish(buf, msg); + } + void reconnect() { if (millis() - lastReconnectAttempt < 60000) return; char buf[256]; sprintf(buf, "CO2Monitor-%u-%s", config.deviceId, WifiManager::getMac().c_str()); if (!WiFi.isConnected()) return; lastReconnectAttempt = millis(); + connectionAttempts++; if (!mqtt_client->connected()) { ESP_LOGD(TAG, "Attempting MQTT connection..."); if (mqtt_client->connect(buf, config.mqttUsername, config.mqttPassword)) { @@ -269,8 +320,9 @@ namespace mqtt { mqtt_client->subscribe(buf); sprintf(buf, "%s/down/#", config.mqttTopic); mqtt_client->subscribe(buf); - sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId); - mqtt_client->publish(buf, "{\"online\":true}"); + DynamicJsonDocument *status = new DynamicJsonDocument(STATUS_BUFSIZE); + (*status)["connectionAttempts"] = connectionAttempts; + publishStatusInternal(status); } else { ESP_LOGW(TAG, "MQTT connection failed, rc=%i", mqtt_client->state()); vTaskDelay(pdMS_TO_TICKS(1000)); @@ -288,7 +340,7 @@ namespace mqtt { cleanSPS30Callback_t _cleanSPS30Callback, getSPS30StatusCallback_t _getSPS30StatusCallback ) { - mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage*)); + mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage)); if (mqttQueue == NULL) { ESP_LOGE(TAG, "Queue creation failed!"); } @@ -347,6 +399,8 @@ namespace mqtt { publishConfigurationInternal(); } else if (msg.cmd == X_CMD_PUBLISH_SENSORS) { publishSensorsInternal(msg.mask); + } else if (msg.cmd == X_CMD_PUBLISH_STATUS) { + publishStatusInternal(msg.status); } } if (!mqtt_client->connected()) { diff --git a/src/ota.cpp b/src/ota.cpp index c7c1857..68c0ff5 100644 --- a/src/ota.cpp +++ b/src/ota.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ namespace OTA { if (shouldExecuteFirmwareUpdate) { ESP_LOGD(TAG, "Firmware update available"); if (preUpdateCallback) preUpdateCallback(); + mqtt::sendStatusMsg("Starting OTA update"); esp32FOTA.execOTA(); } else { ESP_LOGD(TAG, "No firmware update available"); @@ -52,6 +54,7 @@ namespace OTA { ESP_LOGD(TAG, "Beginning forced OTA"); if (preUpdateCallback) preUpdateCallback(); esp32FOTA esp32FOTA(OTA_APP, APP_VERSION, LittleFS, false, false); + mqtt::sendStatusMsg("Starting forced OTA update"); esp32FOTA.forceUpdate(forceUpdateURL, false); forceUpdateURL = ""; ESP_LOGD(TAG, "Forced OTA done"); forceUpdateURL = "";