-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspotify_helper.py
196 lines (156 loc) · 7.53 KB
/
spotify_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# Handles the keyboard listening
import ast
import configparser
import logging
import sys
import os
import threading
import traceback
from collections import deque
from time import sleep
import requests
from pynput import keyboard
from pynput.keyboard import Key, KeyCode
from spotify import Spotify
from notif_handler import send_notif
from exceptions import AlreadyNotifiedException
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'config.ini'))
bindings_file = os.path.join(os.path.dirname(__file__), 'bindings.txt') # TODO reload bindings after file change
class SpotifyHelper:
def __init__(self):
try:
self.spotify = Spotify()
except requests.exceptions.ConnectionError:
send_notif('Spotify Helper closed', 'Check you have a working internet connection.')
sys.exit(1)
self.currently_pressed_keys = list()
self.looking_for = {}
self.has_released_key = True
self.load_bindings_from_file(bindings_file)
self.atomic_method_groups = SpotifyHelper.get_atomic_method_groups()
self.method_group_thread_queues = self.get_method_group_thread_queues()
def load_bindings_from_file(self, file):
with open(file) as file:
for line in file:
method_and_keycodes = line.split('=')
method = method_and_keycodes[0] # The method to run
rest_of_line = method_and_keycodes[1] # Includes bindings we have to parse
# Allows inline comments in the bindings file
if '#' in rest_of_line:
rest_of_line = rest_of_line[:rest_of_line.index('#')]
bindings = rest_of_line.rstrip()
if bindings != '':
# Can have multiple bindings split by commas.
for binding in bindings.split(','):
keys = list()
for single_key in binding.split('+'):
keys.append(self.get_key_from_string(single_key))
keys_tuple = tuple(keys)
# looking_for is a dictionary where the keys are the bindings and the values
# are all the methods linked to those keys, as you can have multiple bindings
# per method and vice versa.
if keys_tuple not in self.looking_for.keys():
self.looking_for[keys_tuple] = []
self.looking_for[keys_tuple].append(method)
# Some methods can run at the same time, others cannot: we group
# them as 'independent', which can be run in any order, 'self_dependent',
# which have to be run sequentially from themselves, and any amount of
# other groups, whose methods have to run sequentially from each other.
def get_method_group_thread_queues(self):
method_group_thread_queues = dict()
for group in self.atomic_method_groups:
# If it's self dependent, make a new thread group for each method
if group == 'self_dependent':
for method in self.atomic_method_groups[group]:
method_group_thread_queues[method] = deque([])
self.start_queue_listening_thread(method_group_thread_queues[method])
# If it's a custom group, set a single queue for that entire group
elif group != 'independent':
method_group_thread_queues[group] = deque([])
self.start_queue_listening_thread(method_group_thread_queues[group])
return method_group_thread_queues
# Return a dict with each atomic_group (independent, self_dependent, etc.)
# connected to a list of the methods assigned to it.
@staticmethod
def get_atomic_method_groups():
thread_groups = dict()
for group in config['method_groups']:
thread_groups[group] = ast.literal_eval(config['method_groups'][group])
return thread_groups
def start_queue_listening_thread(self, queue):
threading.Thread(target=self.check_methods_to_run,
args=(queue,), # A singleton tuple
daemon=True).start()
def queue_method(self, method):
def get_method_group(method):
for group in self.atomic_method_groups:
if method in self.atomic_method_groups[group]:
return group
# Independent groups send just that method to a thread to be run
if method in self.get_atomic_method_groups()['independent']:
self.start_queue_listening_thread(deque([method]))
# Self-dependent & custom groups add their method to the appropriate queue
elif method in self.get_atomic_method_groups()['self_dependent']:
self.method_group_thread_queues[method].append(method)
else:
self.method_group_thread_queues[get_method_group(method)].append(method)
# Given a queue, keep checking it, running methods in the order
# they show up.
def check_methods_to_run(self, method_queue):
while True:
if len(method_queue) > 0:
self.run_method(method_queue.popleft())
else:
# If there are no operations, we don't want to be checking too often
sleep(0.1)
def run_method(self, method):
try:
getattr(self.spotify, method)()
except ConnectionError:
send_notif('Connection Error', 'Internet connection not available')
except AlreadyNotifiedException:
pass
except Exception as e:
send_notif('Error', 'Something went wrong')
logging.error('{}:{}'.format(e, traceback.format_exc()))
traceback.print_exc()
def on_press(self, key):
# Keys are unique in each binding, as it makes no sense to have ctrl+ctrl+f5, for example.
# Also prevents the same key being added more than once if held down too long, which happens
# on some systems.
if key not in self.currently_pressed_keys:
self.currently_pressed_keys.append(key)
for key_tuple, methods in self.looking_for.items():
# has_released_key avoids running the same methods for the same keyboard
# press - must release a key to run it again.
if self.currently_pressed_keys == list(key_tuple) and self.has_released_key:
for method in methods:
self.queue_method(method)
self.has_released_key = False
def on_release(self, key):
self.has_released_key = True
# We ignore the key argument as dead/modified keys (e.g. shift+letter) can
# pollute the currently_pressed_keys list.
try:
self.currently_pressed_keys.pop()
except IndexError: # Sometimes it's already empty so raises this exception, to be ignored.
pass
# Get pynput key from a string - modifier keys are captured in the try statement,
# while normal letter keys are obtained from the KeyCode.from_char() method.
@staticmethod
def get_key_from_string(key_str):
try:
return getattr(Key, key_str)
except AttributeError:
return KeyCode.from_char(key_str)
# Begins the keyboard listener.
def run(self):
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release)
self.listener.start()
def stop(self):
self.listener.stop()
if __name__ == '__main__':
SpotifyHelper().run()