1
+ '''
2
+ to_mqtt
3
+ from_mqtt
4
+ '''
5
+
6
+ import asyncio
7
+ import os
8
+ import time
9
+ import json
10
+ # from hbmqtt.client import MQTTClient, ClientException
11
+ # from hbmqtt.mqtt.constants import QOS_1, QOS_2
12
+
13
+ from codelab_adapter .settings import FROM_MQTT_TOPIC , TO_MQTT_TOPIC
14
+ from codelab_adapter .core_extension import Extension
15
+ from codelab_adapter .utils import threaded
16
+ import paho .mqtt .client as mqtt
17
+
18
+
19
+ class MqttGatewayExtension (Extension ):
20
+ '''
21
+ gateway的职责: 转发进出的消息: from_mqtt/to_mqtt
22
+
23
+ hbmqtt_pub --url mqtt://guest:[email protected] -t 'eim/mqtt_gateway' -m "hello from hbmqtt"
24
+ '''
25
+
26
+ def __init__ (self ):
27
+ self .EXTENSION_ID = "eim/mqtt_gateway"
28
+ super ().__init__ ()
29
+ self .set_subscriber_topic (TO_MQTT_TOPIC ) # sub zmq message
30
+ self .mqtt_sub_topics = [FROM_MQTT_TOPIC ]
31
+
32
+ # mqtt settings
33
+ self .mqtt_addr = "iot.codelab.club"
34
+ self .username = "guest"
35
+ self .password = "test"
36
+ self .mqtt_port = 1883
37
+
38
+ # mqtt client
39
+ self .client = mqtt .Client ()
40
+ self .client .on_connect = self .mqtt_on_connect
41
+ self .client .on_message = self .mqtt_on_message
42
+ self .client .username_pw_set (self .username , self .password )
43
+ self .client .connect (self .mqtt_addr , self .mqtt_port , 60 )
44
+ self .client .loop_start () # as thread
45
+
46
+ def exit_message_handle (self , topic , payload ):
47
+ # stop mqtt client
48
+ self .client .loop_stop ()
49
+ self .terminate ()
50
+
51
+ def extension_message_handle (self , topic , payload ):
52
+ '''
53
+ zmq -> mqtt
54
+ 注意 进出消息格式不同
55
+
56
+ Q: 无法进入这里吧, 这里要求extension_id一致
57
+ '''
58
+ if topic == TO_MQTT_TOPIC :
59
+ self .logger .info (topic , payload )
60
+ payload = json .dumps (payload ).encode ()
61
+ self .client .publish (topic , payload )
62
+
63
+ def mqtt_on_connect (self , client , userdata , flags , rc ):
64
+ self .logger .info (
65
+ "MQTT Gateway Connected to MQTT {}:{} with result code {}." .format (
66
+ self .mqtt_addr , self .mqtt_port , str (rc )))
67
+ # when mqtt is connected to subscribe to mqtt topics
68
+ if self .mqtt_sub_topics :
69
+ for sub in self .mqtt_sub_topics :
70
+ self .client .subscribe (sub )
71
+
72
+ def mqtt_on_message (self , client , userdata , msg ):
73
+ '''
74
+ mqtt -> zmq
75
+ 传递json 而不是content
76
+ '''
77
+ topic = msg .topic # str
78
+ if topic == FROM_MQTT_TOPIC :
79
+ m = msg .payload .decode () # 在通道的两端做好decode和encode
80
+ payload = json .loads (m ) # json
81
+ zmq_topic = payload ["zmq_topic" ]
82
+ zmq_payload = payload ["zmq_payload" ]
83
+ self .logger .info (f'mqtt topic:{ topic } payload: { payload } ' )
84
+ self .publish_payload (zmq_payload , zmq_topic )
85
+
86
+ def run (self ):
87
+
88
+ while self ._running :
89
+ # to publish mqtt message
90
+ time .sleep (0.1 )
91
+
92
+
93
+ export = MqttGatewayExtension
0 commit comments