Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

#include "globals.h"
#include <model.h>
#include <ArduinoJson.h>

// Size of buffer to serialize status messages into, can also be used
// as size of the preceeding JsonDocument
#define STATUS_BUFSIZE 512

namespace mqtt {
typedef void (*calibrateCo2SensorCallback_t)(uint16_t);
Expand All @@ -26,6 +31,8 @@ namespace mqtt {

void publishSensors(uint16_t mask);
void publishConfiguration();
void sendStatus(DynamicJsonDocument *status);
void sendStatusMsg(const char *statusmsg);

void mqttLoop(void* pvParameters);

Expand Down
57 changes: 54 additions & 3 deletions src/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ namespace mqtt {
struct MqttMessage {
uint8_t cmd;
uint16_t mask;
// Only for X_CMD_PUBLISH_STATUS, pointer to status doc
// owned by MqttMessage, will be freed after use.
DynamicJsonDocument *status;
};

const uint8_t X_CMD_PUBLISH_SENSORS = bit(0);
const uint8_t X_CMD_PUBLISH_CONFIGURATION = bit(1);
const uint8_t X_CMD_PUBLISH_STATUS = bit(2);

TaskHandle_t mqttTask;
QueueHandle_t mqttQueue;
Expand All @@ -45,6 +49,7 @@ namespace mqtt {
getSPS30StatusCallback_t getSPS30StatusCallback;

uint32_t lastReconnectAttempt = 0;
uint32_t connectionAttempts = 0;

void publishSensors(uint16_t mask) {
if (!WiFi.isConnected() || !mqtt_client->connected()) return;
Expand Down Expand Up @@ -255,12 +260,55 @@ namespace mqtt {
}
}

// Queue a status message with provided document.
// This takes ownership (and will free) the provided document after serialization.
// If the document does not contain a key name 'online', it will be added and set
// to true.
// The document is serialized into a buffer of STATUS_BUFSIZE bytes, so you should
// make sure to keep your document below that.
void sendStatus(DynamicJsonDocument *status) {
MqttMessage msg;
msg.cmd = X_CMD_PUBLISH_STATUS;
msg.status = status;
if (mqttQueue) xQueueSendToBack(mqttQueue, (void*)&msg, pdMS_TO_TICKS(100));
}

// Helper for sending a simple status message. Must be shorter than
// STATUS_BUFSZIZE - 48 (e.g. 464 chars by default) to allow for JSON overhead.
void sendStatusMsg(const char *msg) {
DynamicJsonDocument *status = new DynamicJsonDocument(STATUS_BUFSIZE);
(*status)["status"] = msg;
if (status->overflowed()) {
ESP_LOGW(TAG, "Status overflowed buffer, may not be sent: %s", msg);
}
sendStatus(status);
}

void publishStatusInternal(DynamicJsonDocument *status) {
if (!mqtt_client->connected()) {
return;
}
char msg[STATUS_BUFSIZE];
if ((*status)["online"].isNull()) {
(*status)["online"] = true;
}
if (serializeJson(*status, msg) == 0) {
ESP_LOGE(TAG, "Failed to serialise status payload");
return;
}
delete status;
char buf[256];
sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId);
mqtt_client->publish(buf, msg);
}

void reconnect() {
if (millis() - lastReconnectAttempt < 60000) return;
char buf[256];
sprintf(buf, "CO2Monitor-%u-%s", config.deviceId, WifiManager::getMac().c_str());
if (!WiFi.isConnected()) return;
lastReconnectAttempt = millis();
connectionAttempts++;
if (!mqtt_client->connected()) {
ESP_LOGD(TAG, "Attempting MQTT connection...");
if (mqtt_client->connect(buf, config.mqttUsername, config.mqttPassword)) {
Expand All @@ -269,8 +317,9 @@ namespace mqtt {
mqtt_client->subscribe(buf);
sprintf(buf, "%s/down/#", config.mqttTopic);
mqtt_client->subscribe(buf);
sprintf(buf, "%s/%u/up/status", config.mqttTopic, config.deviceId);
mqtt_client->publish(buf, "{\"online\":true}");
DynamicJsonDocument *status = new DynamicJsonDocument(STATUS_BUFSIZE);
(*status)["connectionAttempts"] = connectionAttempts;
publishStatusInternal(status);
} else {
ESP_LOGW(TAG, "MQTT connection failed, rc=%i", mqtt_client->state());
vTaskDelay(pdMS_TO_TICKS(1000));
Expand All @@ -288,7 +337,7 @@ namespace mqtt {
cleanSPS30Callback_t _cleanSPS30Callback,
getSPS30StatusCallback_t _getSPS30StatusCallback
) {
mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage*));
mqttQueue = xQueueCreate(2, sizeof(struct MqttMessage));
if (mqttQueue == NULL) {
ESP_LOGE(TAG, "Queue creation failed!");
}
Expand Down Expand Up @@ -347,6 +396,8 @@ namespace mqtt {
publishConfigurationInternal();
} else if (msg.cmd == X_CMD_PUBLISH_SENSORS) {
publishSensorsInternal(msg.mask);
} else if (msg.cmd == X_CMD_PUBLISH_STATUS) {
publishStatusInternal(msg.status);
}
}
if (!mqtt_client->connected()) {
Expand Down
3 changes: 3 additions & 0 deletions src/ota.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Arduino.h>
#include <config.h>
#include <mqtt.h>
#include <ota.h>
#include <esp32fota.h>
#include <Ticker.h>
Expand Down Expand Up @@ -36,6 +37,7 @@ namespace OTA {
if (shouldExecuteFirmwareUpdate) {
ESP_LOGD(TAG, "Firmware update available");
if (preUpdateCallback) preUpdateCallback();
mqtt::sendStatusMsg("Starting OTA update");
esp32FOTA.execOTA();
} else {
ESP_LOGD(TAG, "No firmware update available");
Expand All @@ -52,6 +54,7 @@ namespace OTA {
ESP_LOGD(TAG, "Beginning forced OTA");
if (preUpdateCallback) preUpdateCallback();
esp32FOTA esp32FOTA(OTA_APP, APP_VERSION, LittleFS, false, false);
mqtt::sendStatusMsg("Starting forced OTA update");
esp32FOTA.forceUpdate(forceUpdateURL, false);
forceUpdateURL = "";
ESP_LOGD(TAG, "Forced OTA done"); forceUpdateURL = "";
Expand Down