Skip to content

Commit 2b623db

Browse files
Deomid Ryabkovcesantabot
Deomid Ryabkov
authored andcommitted
Add backup server support
CL: mqtt: Add backup server support. See https://github.com/mongoose-os-libs/mqtt#reconnect-behavior-and-backup-server PUBLISHED_FROM=1bd14adaf1902b473c8235c63cb0aefeb7033609
1 parent e492faa commit 2b623db

File tree

3 files changed

+110
-27
lines changed

3 files changed

+110
-27
lines changed

README.md

+20-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Generic MQTT server
1+
# Generic MQTT client
22

3-
This library provides [MQTT protocol](https://en.wikipedia.org/wiki/MQTT)
3+
This library provides [MQTT protocol](https://en.wikipedia.org/wiki/MQTT) client
44
API that allows devices to talk to MQTT servers.
55

66
Mongoose OS implements MQTT 3.1.1 client functionality, and works with
@@ -32,9 +32,10 @@ The MQTT library adds `mqtt` section to the device configuration:
3232
"enable": false, // Enable MQTT functionality
3333
"keep_alive": 60, // How often to send PING messages in seconds
3434
"pass": "", // User password
35-
"reconnect_timeout_max": 60, // Maximum reconnection timeout in seconds
3635
"reconnect_timeout_min": 2, // Minimum reconnection timeout in seconds
37-
"server": "iot.eclipse.org:1883", // SERVER:PORT to connect to
36+
"reconnect_timeout_max": 60, // Maximum reconnection timeout in seconds
37+
"server": "iot.eclipse.org", // Server to connect to. if `:PORT` is not specified,
38+
// 1883 or 8883 is used depending on whether SSL is enabled.
3839
"ssl_ca_cert": "", // Set this to file name with CA certs to enable TLS
3940
"ssl_cert": "", // Client certificate for mutual TLS
4041
"ssl_cipher_suites": "", // TLS cipher suites
@@ -46,3 +47,18 @@ The MQTT library adds `mqtt` section to the device configuration:
4647
"will_topic": "" // MQTT last will topic
4748
}
4849
```
50+
51+
## Reconnect behavior and backup server
52+
53+
It is possible to have a "backup" server that device will connect to if it fails to connect to the primary server.
54+
55+
Backup server is configured under the `mqtt1` section which contains exactly the same parameters as `mqtt` described above.
56+
57+
Device will first try to connect to the main server configured under `mqtt`.
58+
It will keep connecting to it, increasing the reconnection interval from `reconnect_timeout_min` to `reconnect_timeout_max`.
59+
Reconnection interval is doubled after each attempt so for values above there will be
60+
connection attempts after 2, 4, 8, 16, 32 and 60 seconds.
61+
After reaching the maximum reconnect interval and if `mqtt1.enable` is set, it will switch to the `mqtt1`
62+
configuration and reset the reconnect interval, so it will try to connect to `mqtt1` the same way.
63+
If that works, it will stay connected to `mqtt1`. If connection drops, it will try to reconnect to `mqtt1`
64+
in the same way. If connection to backup server fails, it will go back to the main server and so on.

mos.yml

+23
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ sources:
88
includes:
99
- include
1010
config_schema:
11+
# NB: mqtt and mqtt1 must be identical.
1112
- ["mqtt", "o", {title: "MQTT settings"}]
1213
- ["mqtt.enable", "b", false, {title: "Enable MQTT"}]
1314
- ["mqtt.server", "s", "iot.eclipse.org:1883", {title: "MQTT server"}]
@@ -28,6 +29,28 @@ config_schema:
2829
- ["mqtt.will_message", "s", "", {title: "Will message"}]
2930
- ["mqtt.max_qos", "i", 2, {title: "Limit QoS of outgoing messages to at most this"}]
3031
- ["mqtt.recv_mbuf_limit", "i", 3072, {title: "Limit recv buffer size"}]
32+
# Alternative MQTT configuration. If enabled, client will alternate between mqtt and mqtt1
33+
# when unable to connect.
34+
- ["mqtt1", "o", {title: "MQTT settings"}]
35+
- ["mqtt1.enable", "b", false, {title: "Enable MQTT"}]
36+
- ["mqtt1.server", "s", "iot.eclipse.org:1883", {title: "MQTT server"}]
37+
- ["mqtt1.client_id", "s", "", {title: "ClientID t send to the broker. Defaults to device.id."}]
38+
- ["mqtt1.user", "s", "", {title: "User name"}]
39+
- ["mqtt1.pass", "s", "", {title: "Password"}]
40+
- ["mqtt1.reconnect_timeout_min", "d", 2.0, {title: "Starting reconnect timeout"}]
41+
- ["mqtt1.reconnect_timeout_max", "d", 60.0, {title: "Maximum reconnect timeout"}]
42+
- ["mqtt1.ssl_cert", "s", "", {title: "Client certificate to present to the server"}]
43+
- ["mqtt1.ssl_key", "s", "", {title: "Private key corresponding to the certificate"}]
44+
- ["mqtt1.ssl_ca_cert", "s", "", {title: "Verify server certificate using this CA bundle"}]
45+
- ["mqtt1.ssl_cipher_suites", "s", "", {title: "Cipher suites to offer to the server"}]
46+
- ["mqtt1.ssl_psk_identity", "s", "", {title: "PSK identity (must specify PSK cipher suites)"}]
47+
- ["mqtt1.ssl_psk_key", "s", "", {title: "PSK key"}]
48+
- ["mqtt1.clean_session", "b", true, {title: "Clean Session"}]
49+
- ["mqtt1.keep_alive", "i", 60, {title: "Keep alive interval"}]
50+
- ["mqtt1.will_topic", "s", "", {title: "Will topic"}]
51+
- ["mqtt1.will_message", "s", "", {title: "Will message"}]
52+
- ["mqtt1.max_qos", "i", 2, {title: "Limit QoS of outgoing messages to at most this"}]
53+
- ["mqtt1.recv_mbuf_limit", "i", 3072, {title: "Limit recv buffer size"}]
3154
- ["debug.stdout_topic", "s", "", {title: "MQTT topic to publish STDOUT to"}]
3255
- ["debug.stderr_topic", "s", "", {title: "MQTT topic to publish STDERR to"}]
3356
cdefs:

src/mgos_mqtt.c

+67-23
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ static struct mg_connection *s_conn = NULL;
6262
static bool s_connected = false;
6363
static mgos_mqtt_connect_fn_t s_connect_fn = NULL;
6464
static void *s_connect_fn_arg = NULL;
65-
static struct mgos_config_mqtt *s_cfg = NULL;
65+
static const struct mgos_config_mqtt *s_cfg = NULL;
66+
static int s_max_qos = -1;
6667

6768
SLIST_HEAD(topic_handlers, topic_handler) s_topic_handlers;
6869
SLIST_HEAD(global_handlers, global_handler) s_global_handlers;
@@ -72,11 +73,14 @@ static void mqtt_global_reconnect(void);
7273
void mgos_mqtt_set_max_qos(int qos) {
7374
if (s_cfg == NULL || s_cfg->max_qos == qos) return;
7475
LOG(LL_INFO, ("Setting max MQTT QOS to %d", qos));
75-
s_cfg->max_qos = qos;
76+
s_max_qos = qos;
7677
}
7778

7879
static int adjust_qos(int qos) {
79-
return s_cfg != NULL && s_cfg->max_qos < qos ? s_cfg->max_qos : qos;
80+
int max_qos = s_max_qos;
81+
if (max_qos < 0 && s_cfg != NULL) max_qos = s_cfg->max_qos;
82+
if (max_qos < 0) return qos;
83+
return MIN(qos, max_qos);
8084
}
8185

8286
uint16_t mgos_mqtt_get_packet_id(void) {
@@ -123,10 +127,13 @@ static void do_subscribe(struct topic_handler *th) {
123127

124128
static void mgos_mqtt_ev(struct mg_connection *nc, int ev, void *ev_data,
125129
void *user_data) {
130+
if (nc != s_conn) {
131+
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
132+
return;
133+
}
126134
if (ev > MG_MQTT_EVENT_BASE) {
127135
LOG(LL_DEBUG, ("MQTT event: %d", ev));
128136
}
129-
130137
switch (ev) {
131138
case MG_EV_CONNECT: {
132139
int status = *((int *) ev_data);
@@ -279,6 +286,7 @@ static void mgos_mqtt_free_config(struct mgos_config_mqtt *cfg) {
279286
free(cfg->will_topic);
280287
free(cfg->will_message);
281288
memset(cfg, 0, sizeof(*cfg));
289+
free(cfg);
282290
}
283291

284292
bool mgos_mqtt_set_config(const struct mgos_config_mqtt *cfg) {
@@ -295,13 +303,7 @@ bool mgos_mqtt_set_config(const struct mgos_config_mqtt *cfg) {
295303
new_cfg = (struct mgos_config_mqtt *) calloc(1, sizeof(*new_cfg));
296304
if (new_cfg == NULL) goto out;
297305
new_cfg->enable = cfg->enable;
298-
if (strchr(cfg->server, ':') == NULL) {
299-
int port = (cfg->ssl_ca_cert != NULL ? 8883 : 1883);
300-
mg_asprintf(&new_cfg->server, 0, "%s:%d", cfg->server, port);
301-
if (new_cfg->server == NULL) goto out;
302-
} else {
303-
new_cfg->server = strdup(cfg->server);
304-
}
306+
new_cfg->server = strdup(cfg->server);
305307
if (cfg->client_id) new_cfg->client_id = strdup(cfg->client_id);
306308
if (cfg->user) new_cfg->user = strdup(cfg->user);
307309
if (cfg->pass) new_cfg->pass = strdup(cfg->pass);
@@ -310,10 +312,12 @@ bool mgos_mqtt_set_config(const struct mgos_config_mqtt *cfg) {
310312
if (cfg->ssl_cert) new_cfg->ssl_cert = strdup(cfg->ssl_cert);
311313
if (cfg->ssl_key) new_cfg->ssl_key = strdup(cfg->ssl_key);
312314
if (cfg->ssl_ca_cert) new_cfg->ssl_ca_cert = strdup(cfg->ssl_ca_cert);
313-
if (cfg->ssl_cipher_suites)
315+
if (cfg->ssl_cipher_suites) {
314316
new_cfg->ssl_cipher_suites = strdup(cfg->ssl_cipher_suites);
315-
if (cfg->ssl_psk_identity)
317+
}
318+
if (cfg->ssl_psk_identity) {
316319
new_cfg->ssl_psk_identity = strdup(cfg->ssl_psk_identity);
320+
}
317321
if (cfg->ssl_psk_key) new_cfg->ssl_psk_key = strdup(cfg->ssl_psk_key);
318322
new_cfg->clean_session = cfg->clean_session;
319323
new_cfg->keep_alive = cfg->keep_alive;
@@ -326,14 +330,20 @@ bool mgos_mqtt_set_config(const struct mgos_config_mqtt *cfg) {
326330

327331
out:
328332
if (ret) {
333+
const struct mgos_config_mqtt *old_cfg = s_cfg;
329334
s_cfg = new_cfg;
330335
if (s_conn != NULL) {
331336
s_conn->flags |= MG_F_CLOSE_IMMEDIATELY;
332337
s_conn = NULL;
333338
}
339+
const struct mgos_config_mqtt *cfg0 = mgos_sys_config_get_mqtt();
340+
const struct mgos_config_mqtt *cfg1 =
341+
(const struct mgos_config_mqtt *) mgos_sys_config_get_mqtt1();
342+
if (old_cfg != NULL && old_cfg != cfg0 && old_cfg != cfg1) {
343+
mgos_mqtt_free_config((struct mgos_config_mqtt *) old_cfg);
344+
}
334345
} else {
335346
mgos_mqtt_free_config(new_cfg);
336-
free(new_cfg);
337347
}
338348
return ret;
339349
}
@@ -355,15 +365,19 @@ bool mgos_mqtt_init(void) {
355365
mgos_event_add_group_handler(MGOS_EVENT_GRP_NET, mgos_mqtt_net_ev, NULL);
356366
mgos_event_add_handler(MGOS_EVENT_LOG, s_debug_write_cb, NULL);
357367

358-
return mgos_mqtt_set_config(mgos_sys_config_get_mqtt());
368+
s_cfg = mgos_sys_config_get_mqtt();
369+
return true;
359370
}
360371

361372
bool mgos_mqtt_global_connect(void) {
362373
bool ret = true;
374+
char *server = NULL;
363375
struct mg_mgr *mgr = mgos_get_mgr();
364376
struct mg_connect_opts opts;
365377

366-
if (s_cfg == NULL || !s_cfg->enable) return false;
378+
if (s_cfg == NULL || !s_cfg->enable || s_cfg->server == NULL) {
379+
return false;
380+
}
367381

368382
/* If we're already connected, do nothing */
369383
if (s_conn != NULL) return true;
@@ -377,17 +391,25 @@ bool mgos_mqtt_global_connect(void) {
377391
opts.ssl_psk_identity = s_cfg->ssl_psk_identity;
378392
opts.ssl_psk_key = s_cfg->ssl_psk_key;
379393
#endif
380-
LOG(LL_INFO, ("MQTT connecting to %s", s_cfg->server));
394+
if (strchr(s_cfg->server, ':') == NULL) {
395+
int port = (s_cfg->ssl_ca_cert != NULL ? 8883 : 1883);
396+
mg_asprintf(&server, 0, "%s:%d", s_cfg->server, port);
397+
if (server == NULL) return false;
398+
} else {
399+
server = strdup(s_cfg->server);
400+
}
401+
LOG(LL_INFO, ("MQTT connecting to %s", server));
381402

382403
s_connected = false;
383-
s_conn = mg_connect_opt(mgr, s_cfg->server, mgos_mqtt_ev, NULL, opts);
404+
s_conn = mg_connect_opt(mgr, server, mgos_mqtt_ev, NULL, opts);
384405
if (s_conn != NULL) {
385406
mg_set_protocol_mqtt(s_conn);
386407
s_conn->recv_mbuf_limit = s_cfg->recv_mbuf_limit;
387408
} else {
388409
mqtt_global_reconnect();
389410
ret = false;
390411
}
412+
free(server);
391413
return ret;
392414
}
393415

@@ -397,11 +419,35 @@ static void reconnect_timer_cb(void *user_data) {
397419
(void) user_data;
398420
}
399421

422+
static void mqtt_switch_config(void) {
423+
const struct mgos_config_mqtt *cfg0 = mgos_sys_config_get_mqtt();
424+
const struct mgos_config_mqtt *cfg1 =
425+
(const struct mgos_config_mqtt *) mgos_sys_config_get_mqtt1();
426+
const struct mgos_config_mqtt *cfg;
427+
if (s_cfg == cfg0) {
428+
cfg = cfg1;
429+
} else if (s_cfg == cfg1) {
430+
cfg = cfg0;
431+
} else {
432+
/* User set a custom config - don't mess with it. */
433+
return;
434+
}
435+
if (cfg->enable) {
436+
s_cfg = cfg;
437+
s_reconnect_timeout_ms = s_cfg->reconnect_timeout_min * 1000;
438+
}
439+
}
440+
400441
static void mqtt_global_reconnect(void) {
401442
int rt_ms;
402443
if (s_cfg == NULL || s_cfg->server == NULL) return;
403444

404-
if (s_reconnect_timeout_ms <= 0) s_reconnect_timeout_ms = 1;
445+
if (s_reconnect_timeout_ms >= s_cfg->reconnect_timeout_max * 1000) {
446+
mqtt_switch_config();
447+
}
448+
449+
if (s_reconnect_timeout_ms <= 0) s_reconnect_timeout_ms = 1000;
450+
405451
rt_ms = s_reconnect_timeout_ms * 2;
406452

407453
if (rt_ms < s_cfg->reconnect_timeout_min * 1000) {
@@ -410,13 +456,11 @@ static void mqtt_global_reconnect(void) {
410456
if (rt_ms > s_cfg->reconnect_timeout_max * 1000) {
411457
rt_ms = s_cfg->reconnect_timeout_max * 1000;
412458
}
459+
s_reconnect_timeout_ms = rt_ms;
413460
/* Fuzz the time a little. */
414461
rt_ms = (int) mgos_rand_range(rt_ms * 0.9, rt_ms * 1.1);
415462
LOG(LL_INFO, ("MQTT connecting after %d ms", rt_ms));
416-
s_reconnect_timeout_ms = rt_ms;
417-
if (s_reconnect_timer_id != MGOS_INVALID_TIMER_ID) {
418-
mgos_clear_timer(s_reconnect_timer_id);
419-
}
463+
mgos_clear_timer(s_reconnect_timer_id);
420464
s_reconnect_timer_id = mgos_set_timer(rt_ms, 0, reconnect_timer_cb, NULL);
421465
}
422466

0 commit comments

Comments
 (0)