-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
230 lines (201 loc) · 7.61 KB
/
Copy path__init__.py
File metadata and controls
230 lines (201 loc) · 7.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
from mycroft import MycroftSkill, intent_file_handler
from mycroft.util.parse import match_one
import bluetooth
import subprocess
import pexpect
import time
import sys
import logging
class BluetoothAudio(MycroftSkill):
def __init__(self):
MycroftSkill.__init__(self)
def initialize(self):
self.bl = Bluetoothctl()
self.settings["known.device"] = self.settings.get('known.device', None)
if not self.settings["known.device"] is None:
self.pair_device(self.settings["known.device"])
self.connect_device(self.settings["known.device"])
@intent_file_handler('audio.bluetooth.intent')
def handle_audio_bluetooth(self, message):
devices = self.scann_bluetooth()
if len(devices) == 1:
if self.ask_yesno("found.one.device", data={"device": "".join(devices.keys())}) != "yes":
return
self.settings["known.device"] = list(devices.values())[0]
elif len(devices) > 1:
device = self.get_response('audio.bluetooth',
data={"number": str(len(devices)),
"devices": list(devices.keys())})
match, confidence = match_one(device, list(devices.keys()))
self.settings["known.device"] = devices.get(match)
else:
self.speak_dialog("no.device")
return
self.log.info("set standard Bluetooth Device to "+str(self.settings["known.device"]))
self.pair_device(self.settings["known.device"])
def scann_bluetooth(self):
self.log.info("Performing inquiry...")
nearby_devices = bluetooth.discover_devices(duration=8, lookup_names=True,
flush_cache=True, lookup_class=False)
self.log.info("Found {} devices".format(len(nearby_devices)))
devices = {}
for addr, name in nearby_devices:
try:
self.log.info(" {} - {}".format(addr, name))
except UnicodeEncodeError:
name = name.encode("utf-8", "replace")
self.log.info(" {} - {}".format(addr, name))
devices.update({name:addr})
self.log.info("devices: "+str(devices))
return devices
def pair_device(self, mac):
self.bl.pair(mac)
self.log.info("paird "+str(mac))
def connect_device(self, mac):
self.bl.connect(mac)
self.log.info("connect "+str(mac))
def create_skill():
return BluetoothAudio()
logger = logging.getLogger("btctl")
class Bluetoothctl:
"""A wrapper for bluetoothctl utility."""
def __init__(self):
subprocess.check_output("rfkill unblock bluetooth", shell=True)
self.process = pexpect.spawnu("bluetoothctl", echo=False)
def send(self, command, pause=0):
self.process.send(f"{command}\n")
time.sleep(pause)
if self.process.expect(["bluetooth", pexpect.EOF]):
raise Exception(f"failed after {command}")
def get_output(self, *args, **kwargs):
"""Run a command in bluetoothctl prompt, return output as a list of lines."""
self.send(*args, **kwargs)
return self.process.before.split("\r\n")
def start_scan(self):
"""Start bluetooth scanning process."""
try:
self.send("scan on")
except Exception as e:
logger.error(e)
def make_discoverable(self):
"""Make device discoverable."""
try:
self.send("discoverable on")
except Exception as e:
logger.error(e)
def parse_device_info(self, info_string):
"""Parse a string corresponding to a device."""
device = {}
block_list = ["[\x1b[0;", "removed"]
if not any(keyword in info_string for keyword in block_list):
try:
device_position = info_string.index("Device")
except ValueError:
pass
else:
if device_position > -1:
attribute_list = info_string[device_position:].split(" ", 2)
device = {
"mac_address": attribute_list[1],
"name": attribute_list[2],
}
return device
def get_available_devices(self):
"""Return a list of tuples of paired and discoverable devices."""
available_devices = []
try:
out = self.get_output("devices")
except Exception as e:
logger.error(e)
else:
for line in out:
device = self.parse_device_info(line)
if device:
available_devices.append(device)
return available_devices
def get_paired_devices(self):
"""Return a list of tuples of paired devices."""
paired_devices = []
try:
out = self.get_output("paired-devices")
except Exception as e:
logger.error(e)
else:
for line in out:
device = self.parse_device_info(line)
if device:
paired_devices.append(device)
return paired_devices
def get_discoverable_devices(self):
"""Filter paired devices out of available."""
available = self.get_available_devices()
paired = self.get_paired_devices()
return [d for d in available if d not in paired]
def get_device_info(self, mac_address):
"""Get device info by mac address."""
try:
out = self.get_output(f"info {mac_address}")
except Exception as e:
logger.error(e)
return False
else:
return out
def pair(self, mac_address):
"""Try to pair with a device by mac address."""
try:
self.send(f"pair {mac_address}", 4)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to pair", "Pairing successful", pexpect.EOF]
)
return res == 1
def trust(self, mac_address):
try:
self.send(f"trust {mac_address}", 4)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to trust", "Pairing successful", pexpect.EOF]
)
return res == 1
def remove(self, mac_address):
"""Remove paired device by mac address, return success of the operation."""
try:
self.send(f"remove {mac_address}", 3)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["not available", "Device has been removed", pexpect.EOF]
)
return res == 1
def connect(self, mac_address):
"""Try to connect to a device by mac address."""
try:
self.send(f"connect {mac_address}", 2)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to connect", "Connection successful", pexpect.EOF]
)
return res == 1
def disconnect(self, mac_address):
"""Try to disconnect to a device by mac address."""
try:
self.send(f"disconnect {mac_address}", 2)
except Exception as e:
logger.error(e)
return False
else:
res = self.process.expect(
["Failed to disconnect", "Successful disconnected", pexpect.EOF]
)
return res == 1