-
Notifications
You must be signed in to change notification settings - Fork 72
/
Copy pathextension_leju_aelosedu.py
108 lines (85 loc) · 2.84 KB
/
extension_leju_aelosedu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import time
import serial
from codelab_adapter.core_extension import Extension
def is_positive_valid(s):
try:
num = int(s)
if 0 < num < 255:
return True
else:
return False
except ValueError:
return False
def parse_cmd(content):
cmd = 0
if is_positive_valid(content):
cmd = int(content)
return cmd
class Dongle2401:
def __init__(self):
self.id = '1A86:7523'
self.port = self.auto_detect()
self.dongle = self.open_port(self.port)
def auto_detect(self):
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
# print("port = {}; desc = {} ;hwid = {}".format(port, desc, hwid))
if self.id in hwid:
try:
with serial.Serial(port, 9600) as ser:
ser.close()
print('found port {}'.format(port))
return port
except Exception as err:
pass
# print('open port failed', port, err)
assert False, 'Aelos usb dongle not found!'
def open_port(self, port):
return serial.Serial(port, 9600)
def send(self, data):
self.dongle.write(bytes(data))
def set_channel(self, channel):
self.send([0xcc, 0xcc, 0xcc, 0xcc, 0xcc])
time.sleep(0.2)
self.send([0x29, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, channel])
def parse_content(content):
try:
if type(content) == str:
arr = content.split(':')
if len(arr) != 2:
return None
return arr
except Exception as err:
return None
class LejuAelosEduExtention(Extension):
"""
Leju Robotics Aelos Edu Extension V2
"""
def __init__(self):
super().__init__()
self.EXTENSION_ID = "eim/leju/aelosedu"
self.usb_dongle = Dongle2401()
def extension_message_handle(self, topic, payload):
self.logger.info(f'topic = {topic}')
if type(payload) == str:
self.logger.info(f'scratch eim message:{payload}')
return
elif type(payload) == dict:
self.logger.info(f'eim message:{payload}')
python_code = payload.get('content')
try:
output = eval(python_code, {"__builtins__": None}, {
'usb_dongle': self.usb_dongle
})
except Exception as e:
output = e
self.logger.error(output)
payload["content"] = str(output)
message = {"payload": payload}
self.publish(message)
def run(self):
while self._running:
time.sleep(1)
export = LejuAelosEduExtention
if __name__ == "__main__":
LejuAelosEduExtention().run() # or start_as_thread()