-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshuttle_lms_controller.py
executable file
·182 lines (156 loc) · 5.83 KB
/
shuttle_lms_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
import argparse
import evdev
import os
import sys
import logging
from pylms.player import Player
from pylms.server import Server
"""
TODO:
- test
- systemd unit
"""
HOST="192.168.0.100"
PORT=9090
NAME="cube"
class DeviceNotFound(Exception):
pass
class LMSException(Exception):
pass
class ButtonMethods(object):
VOLUME_INCREMENT = 5
def __init__(self, host, port, name):
self._last_volume = 0
self._player = None
self._host = host
self._port = port
self._name = name
self._get_player()
def _get_player(self):
"""
lookup the LMS player object we'll use for all the control methods
FIXME: this probably won't work if the listener starts at the same time as LMS
should write a decorator that attempts to lookup the player each time a command is run
"""
host = self._host
port = self._port
name = self._name
s = Server(hostname=host, port=port)
try:
logging.info("connecting")
s.connect()
except Exception as e:
logging.error(f"could not contact server {host} {port} {name} ({str(e)}")
return
for player in s.players:
this_name = player.name
if this_name == name:
logging.info(f"setting player with name {name}")
self._player = player
if self._player is None:
raise LMSException(f"could not find player with name {name}")
@property
def player(self):
# check if player is initialised yet
if self._player is None:
logging.warning("player is None - getting")
self._get_player()
# this tries to detect if the player has gone stale and needs to be refreshed
try:
if self._player.name != self._name:
logging.warning("player has wrong name - reacquiring")
self._get_player()
self._player.update(self._player.index)
except Exception as e:
logging.warning(f"Exception getting player {str(e)} - getting again")
self._get_player()
return self._player
def _check_player(func):
def check(self, *args, **kwargs):
if self.player is None:
logging.error("no player available; cannot run command")
else:
func(self, *args, **kwargs)
return check
@_check_player
def volume(self, event):
val = event.value
if val > self._last_volume:
logging.info(f"volume up in player {self.player.name}")
self.player.volume_up(self.VOLUME_INCREMENT)
elif val < self._last_volume:
logging.info(f"volume down {self.player.name}")
self.player.volume_down(self.VOLUME_INCREMENT)
else:
# this could be a recurring event - nothing to do
pass
if(self._last_volume == val):
logging.debug(f"current volume: {self.player.get_volume()}")
else:
logging.info(f"current volume: {self.player.get_volume()}")
self._last_volume = val
@_check_player
def play_pause(self, event):
if event.value == 1:
logging.info("toggle")
self.player.toggle()
@_check_player
def skip_forward(self, event):
if event.value == 1:
logging.info("forward")
self.player.next()
@_check_player
def skip_backward(self, event):
if event.value == 1:
logging.info("back-to-beginning")
self.player.seek_to(0)
def echo(self, event):
"""
used as a blank response for events we want to record but not attach a method for
"""
logging.info(f"type: {event.type} -- code: {event.code} -- value: {event.value}")
class ShuttleManager(object):
def __init__(self, event_map, search_string="Shuttle"):
self._event_map = event_map
self._device = self._find_device(search_string)
@staticmethod
def _find_device(search_string):
devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
for device in devices:
if search_string in device.name:
return device
logging.warning(f"{','.join([ d.name for d in devices ])}")
raise DeviceNotFound(f"Could not find device with name {search_string}")
def main_loop(self):
device = self._device
for event in device.read_loop():
logging.debug(f"saw event type: {event.type} code: {event.code} value: {event.value}")
if(event.code in self._event_map):
func = self._event_map[event.code]
func(event)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--host', default=HOST, help="hostname/IP")
parser.add_argument('--port', default=PORT, help="port number")
parser.add_argument("--name", default=NAME, help="name of player to control")
parser.add_argument("-L", "--loglevel", default="INFO", help="log level")
args = parser.parse_args()
if not hasattr(logging, args.loglevel):
print(f"unrecognised log level {args.loglevel}")
sys.exit(1)
logging.basicConfig(level=getattr(logging, args.loglevel),
format='%(asctime)s %(levelname)-8s %(message)s')
bm = ButtonMethods(args.host, args.port, args.name)
logging.debug(bm._player)
EVENT_MAP = {
evdev.ecodes.ecodes["BTN_4"]: bm.echo,
evdev.ecodes.ecodes["BTN_5"]: bm.skip_backward,
evdev.ecodes.ecodes["BTN_6"]: bm.play_pause,
evdev.ecodes.ecodes["BTN_7"]: bm.skip_forward,
evdev.ecodes.ecodes["BTN_8"]: bm.echo,
evdev.ecodes.ecodes["REL_DIAL"]: bm.volume
}
sm = ShuttleManager(event_map=EVENT_MAP)
logging.debug(sm._device)
sm.main_loop()