Skip to content

Commit

Permalink
Unique topic names per device
Browse files Browse the repository at this point in the history
Disabled by default with MQTT_UNIQUE_TOPIC
  • Loading branch information
peterharperuk committed Jan 30, 2025
1 parent 103a496 commit 3593551
Showing 1 changed file with 59 additions and 20 deletions.
79 changes: 59 additions & 20 deletions pico_w/wifi/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@
#include MQTT_CERT_INC
#endif

#ifndef MQTT_TOPIC_LEN
#define MQTT_TOPIC_LEN 100
#endif

typedef struct {
mqtt_client_t* mqtt_client_inst;
struct mqtt_connect_client_info_t mqtt_client_info;
char data[MQTT_OUTPUT_RINGBUF_SIZE];
char topic[100];
char topic[MQTT_TOPIC_LEN];
uint32_t len;
ip_addr_t mqtt_server_address;
bool connect_done;
Expand Down Expand Up @@ -79,6 +83,15 @@ typedef struct {
#define MQTT_WILL_MSG "0"
#define MQTT_WILL_QOS 1

#ifndef MQTT_DEVICE_NAME
#define MQTT_DEVICE_NAME "pico"
#endif

// Set to 1 to add the client name to topics, to support multiple devices using the same server
#ifndef MQTT_UNIQUE_TOPIC
#define MQTT_UNIQUE_TOPIC 0
#endif

/* References for this implementation:
* raspberry-pi-pico-c-sdk.pdf, Section '4.1.1. hardware_adc'
* pico-examples/adc/adc_console/adc_console.c */
Expand All @@ -105,6 +118,16 @@ static void pub_request_cb(__unused void *arg, err_t err) {
}
}

static const char *full_topic(MQTT_CLIENT_DATA_T *state, const char *name) {
#if MQTT_UNIQUE_TOPIC
static char full_topic[MQTT_TOPIC_LEN];
snprintf(full_topic, sizeof(full_topic), "/%s%s", state->mqtt_client_info.client_id, name);
return full_topic;
#else
return name;
#endif
}

static void control_led(MQTT_CLIENT_DATA_T *state, bool on) {
// Publish state on /state topic and on/off led board
const char* message = on ? "On" : "Off";
Expand All @@ -113,14 +136,12 @@ static void control_led(MQTT_CLIENT_DATA_T *state, bool on) {
else
cyw43_arch_gpio_put(CYW43_WL_GPIO_LED_PIN, 0);

char state_topic[128];
snprintf(state_topic, sizeof(state_topic), "%s/state", state->topic);
mqtt_publish(state->mqtt_client_inst, state_topic, message, strlen(message), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
mqtt_publish(state->mqtt_client_inst, full_topic(state, "/led/state"), message, strlen(message), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
}

static void publish_temperature(MQTT_CLIENT_DATA_T *state) {
static float old_temperature;
const char temperature_key[] = "/temperature";
const char *temperature_key = full_topic(state, "/temperature");
float temperature = read_onboard_temperature(TEMPERATURE_UNITS);
if (temperature != old_temperature) {
old_temperature = temperature;
Expand Down Expand Up @@ -156,32 +177,37 @@ static void unsub_request_cb(void *arg, err_t err) {

static void sub_unsub_topics(MQTT_CLIENT_DATA_T* state, bool sub) {
mqtt_request_cb_t cb = sub ? sub_request_cb : unsub_request_cb;
mqtt_sub_unsub(state->mqtt_client_inst, "/led", MQTT_SUBSCRIBE_QOS, cb, state, sub);
mqtt_sub_unsub(state->mqtt_client_inst, "/print", MQTT_SUBSCRIBE_QOS, cb, state, sub);
mqtt_sub_unsub(state->mqtt_client_inst, "/ping", MQTT_SUBSCRIBE_QOS, cb, state, sub);
mqtt_sub_unsub(state->mqtt_client_inst, "/exit", MQTT_SUBSCRIBE_QOS, cb, state, sub);
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/led"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/print"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/ping"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/exit"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
}

static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags) {
MQTT_CLIENT_DATA_T* state = (MQTT_CLIENT_DATA_T*)arg;
#if MQTT_UNIQUE_TOPIC
const char *basic_topic = state->topic + strlen(state->mqtt_client_info.client_id) + 1;
#else
const char *basic_topic = state->topic;
#endif
strncpy(state->data, (const char *)data, len);
state->len = len;
state->data[len] = '\0';

DEBUG_printf("Topic: %s, Message: %s\n", state->topic, state->data);
if (strcmp(state->topic, "/led") == 0)
if (strcmp(basic_topic, "/led") == 0)
{
if (lwip_stricmp((const char *)state->data, "On") == 0)
if (lwip_stricmp((const char *)state->data, "On") == 0 || strcmp((const char *)state->data, "1") == 0)
control_led(state, true);
else if (lwip_stricmp((const char *)state->data, "Off") == 0)
else if (lwip_stricmp((const char *)state->data, "Off") == 0 || strcmp((const char *)state->data, "0") == 0)
control_led(state, false);
} else if (strcmp(state->topic, "/print") == 0) {
} else if (strcmp(basic_topic, "/print") == 0) {
INFO_printf("%.*s\n", len, data);
} else if (strcmp(state->topic, "/ping") == 0) {
} else if (strcmp(basic_topic, "/ping") == 0) {
char buf[11];
snprintf(buf, sizeof(buf), "%u", to_ms_since_boot(get_absolute_time()) / 1000);
mqtt_publish(state->mqtt_client_inst, "/pong", buf, strlen(buf), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
} else if (strcmp(state->topic, "/exit") == 0) {
mqtt_publish(state->mqtt_client_inst, full_topic(state, "/pong"), buf, strlen(buf), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
} else if (strcmp(basic_topic, "/exit") == 0) {
state->stop_client = true; // stop the client when ALL subscriptions are stopped
sub_unsub_topics(state, false); // unsubscribe
}
Expand Down Expand Up @@ -276,9 +302,20 @@ int main(void) {
panic("Failed to inizialize CYW43");
}

// Use board unique id for the client id
static char client_id_buf[PICO_UNIQUE_BOARD_ID_SIZE_BYTES * 2 + 1];
pico_get_unique_board_id_string(client_id_buf, sizeof(client_id_buf));
// Use board unique id
char unique_id_buf[5];
pico_get_unique_board_id_string(unique_id_buf, sizeof(unique_id_buf));
for(int i=0; i < sizeof(unique_id_buf) - 1; i++) {
unique_id_buf[i] = tolower(unique_id_buf[i]);
}

// Generate a unique name, e.g. pico1234
char client_id_buf[sizeof(MQTT_DEVICE_NAME) + sizeof(unique_id_buf) - 1];
memcpy(&client_id_buf[0], MQTT_DEVICE_NAME, sizeof(MQTT_DEVICE_NAME) - 1);
memcpy(&client_id_buf[sizeof(MQTT_DEVICE_NAME) - 1], unique_id_buf, sizeof(unique_id_buf) - 1);
client_id_buf[sizeof(client_id_buf) - 1] = 0;
INFO_printf("Device name %s\n", client_id_buf);

state.mqtt_client_info.client_id = client_id_buf;
state.mqtt_client_info.keep_alive = MQTT_KEEP_ALIVE_S; // Keep alive in sec
#if defined(MQTT_USERNAME) && defined(MQTT_PASSWORD)
Expand All @@ -288,7 +325,9 @@ int main(void) {
state.mqtt_client_info.client_user = NULL;
state.mqtt_client_info.client_pass = NULL;
#endif
state.mqtt_client_info.will_topic = MQTT_WILL_TOPIC;
static char will_topic[MQTT_TOPIC_LEN];
strncpy(will_topic, full_topic(&state, MQTT_WILL_TOPIC), sizeof(will_topic));
state.mqtt_client_info.will_topic = will_topic;
state.mqtt_client_info.will_msg = MQTT_WILL_MSG;
state.mqtt_client_info.will_qos = MQTT_WILL_QOS;
state.mqtt_client_info.will_retain = true;
Expand Down

0 comments on commit 3593551

Please sign in to comment.