-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmqtt_wrapper.py
More file actions
64 lines (52 loc) · 2.11 KB
/
mqtt_wrapper.py
File metadata and controls
64 lines (52 loc) · 2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
from functools import wraps
from paho.mqtt import client as mqtt
# I dunno Bart, my dad's a really big wheel down at the WrapperFactory
class MqttWrapperFactory:
def __init__(self, address='localhost', port=1883, topic_prefix=''):
def new(subtopic=''):
return MqttWrapper(address, port, MqttWrapper.fixup_slash(topic_prefix + subtopic))
self.new = new
class MqttWrapper:
def __init__(self, address, port, topic_prefix):
self.mqtt = None
self.address = address
self.port = port
self.topic_prefix = topic_prefix
def fixup_slash(text):
newtext = '/'.join(p for p in text.split('/') if p != '')
if newtext[-1] != '#':
newtext += '/'
return newtext
def full_topic(self, subtopic):
return MqttWrapper.fixup_slash(self.topic_prefix + subtopic)
def on_connect(self, client, userdata, flags, rc):
pass
#print("connected to mqtt")
def check_connection(self):
if self.mqtt is None:
raise Exception("MqttWrapper.connect() not yet called")
def connect(self):
self.mqtt = mqtt.Client()
self.mqtt.on_connect = self.on_connect
self.mqtt.connect(self.address, self.port, 60)
self.mqtt.loop_start()
def stop(self):
self.mqtt.loop_stop()
self.mqtt.disconnect()
def sub(self, topic, callback):
self.check_connection()
full_topic = self.full_topic(topic)
self.mqtt.subscribe(full_topic)
wrapped_callback = self.wrap_message_handler(callback)
self.mqtt.message_callback_add(full_topic, wrapped_callback)
def wrap_message_handler(self, callback):
@wraps(callback)
def message_handler_wrapper(client, userdata, message):
return callback([s for s in message.topic.split('/') if s != ''], message.payload.decode('utf-8'))
return message_handler_wrapper
def pub(self, topic, payload):
self.check_connection()
self.mqtt.publish(self.full_topic(topic), payload)
def dump(self, topic, payload):
print("%s: %s" % (topic, payload))