-
-
Notifications
You must be signed in to change notification settings - Fork 186
/
Copy pathSignal.py
621 lines (475 loc) · 23.9 KB
/
Signal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
# Copyright (c) 2017 Ultimaker B.V.
# Copyright (c) Thiago Marcos P. Santos
# Copyright (c) Christopher S. Case
# Copyright (c) David H. Bronke
# Uranium is released under the terms of the LGPLv3 or higher.
import enum #For the compress parameter of postponeSignals.
import inspect
import threading
import os
import weakref
from weakref import ReferenceType
from typing import Any, Union, Callable, TypeVar, Generic, List, Tuple, Iterable, cast, Optional
import contextlib
import traceback
import functools
from UM.Event import CallFunctionEvent
from UM.Decorators import call_if_enabled
from UM.Logger import Logger
from UM.Platform import Platform
from UM import FlameProfiler
MYPY = False
if MYPY:
from UM.Application import Application
@contextlib.contextmanager
def acquire_timeout(lock, timeout):
result = lock.acquire(timeout=timeout)
yield result
if result:
lock.release()
# Helper functions for tracing signal emission.
def _traceEmit(signal: Any, *args: Any, **kwargs: Any) -> None:
Logger.log("d", "Emitting %s with arguments %s", str(signal.getName()), str(args) + str(kwargs))
if signal._Signal__type == Signal.Queued:
Logger.log("d", "> Queued signal, postponing emit until next event loop run")
if signal._Signal__type == Signal.Auto:
if Signal._signalQueue is not None and threading.current_thread() is not Signal._signalQueue.getMainThread():
Logger.log("d", "> Auto signal and not on main thread, postponing emit until next event loop run")
for func in signal._Signal__functions:
Logger.log("d", "> Calling %s", str(func))
for dest, func in signal._Signal__methods:
Logger.log("d", "> Calling %s on %s", str(func), str(dest))
for signal in signal._Signal__signals:
Logger.log("d", "> Emitting %s", str(signal._Signal__name))
def _traceConnect(signal: Any, *args: Any, **kwargs: Any) -> None:
Logger.log("d", "Connecting signal %s to %s", str(signal._Signal__name), str(args[0]))
def _traceDisconnect(signal: Any, *args: Any, **kwargs: Any) -> None:
Logger.log("d", "Connecting signal %s from %s", str(signal._Signal__name), str(args[0]))
def _isTraceEnabled() -> bool:
return "URANIUM_TRACE_SIGNALS" in os.environ
class SignalQueue:
def functionEvent(self, event):
pass
def getMainThread(self):
pass
# Integration with the Flame Profiler.
def _recordSignalNames() -> bool:
return FlameProfiler.enabled()
def profileEmit(func):
if FlameProfiler.enabled():
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
FlameProfiler.updateProfileConfig()
if FlameProfiler.isRecordingProfile():
with FlameProfiler.profileCall("[SIG] " + self.getName()):
func(self, *args, **kwargs)
else:
func(self, *args, **kwargs)
return wrapped
else:
return func
class Signal:
"""Simple implementation of signals and slots.
Signals and slots can be used as a light weight event system. A class can
define signals that other classes can connect functions or methods to, called slots.
Whenever the signal is called, it will proceed to call the connected slots.
To create a signal, create an instance variable of type Signal. Other objects can then
use that variable's `connect()` method to connect methods, callable objects or signals
to the signal. To emit the signal, call `emit()` on the signal. Arguments can be passed
along to the signal, but slots will be required to handle them. When connecting signals
to other signals, the connected signal will be emitted whenever the signal is emitted.
Signal-slot connections are weak references and as such will not prevent objects
from being destroyed. In addition, all slots will be implicitly disconnected when
the signal is destroyed.
**WARNING** It is imperative that the signals are created as instance variables, otherwise
emitting signals will get confused. To help with this, see the SignalEmitter class.
Loosely based on http://code.activestate.com/recipes/577980-improved-signalsslots-implementation-in-python/ pylint: disable=wrong-spelling-in-comment
:sa SignalEmitter
"""
Direct = 1
"""Signal types.
These indicate the type of a signal, that is, how the signal handles calling the connected
slots.
- Direct connections immediately call the connected slots from the thread that called emit().
- Auto connections will push the call onto the event loop if the current thread is
not the main thread, but make a direct call if it is.
- Queued connections will always push
the call on to the event loop.
"""
Auto = 2
Queued = 3
def __init__(self, type: int = Auto) -> None:
"""Initialize the instance.
:param type: The signal type. Defaults to Auto.
"""
# These collections must be treated as immutable otherwise we lose thread safety.
self.__functions = WeakImmutableList() # type: WeakImmutableList[Callable[[], None]]
self.__methods = WeakImmutablePairList() # type: WeakImmutablePairList[Any, Callable[[], None]]
self.__signals = WeakImmutableList() # type: WeakImmutableList[Signal]
self.__lock = threading.Lock() # Guards access to the fields above.
self.__type = type
self._postpone_emit = False
self._postpone_thread = None # type: Optional[threading.Thread]
self._compress_postpone = False # type: bool
self._postponed_emits = None # type: Any
if _recordSignalNames():
try:
if Platform.isWindows():
self.__name = inspect.stack()[1][0].f_locals["key"]
else:
self.__name = inspect.stack()[1].frame.f_locals["key"]
except KeyError:
self.__name = "Signal"
else:
self.__name = "Anon"
def getName(self):
return self.__name
def __call__(self) -> None:
""":exception NotImplementedError:"""
raise NotImplementedError("Call emit() to emit a signal")
def getType(self) -> int:
"""Get type of the signal
:return: Direct(1), Auto(2) or Queued(3)
"""
return self.__type
@call_if_enabled(_traceEmit, _isTraceEnabled())
@profileEmit
def emit(self, *args: Any, **kwargs: Any) -> None:
"""Emit the signal which indirectly calls all of the connected slots.
:param args: The positional arguments to pass along.
:param kwargs: The keyword arguments to pass along.
:note If the Signal type is Queued and this is not called from the application thread
the call will be posted as an event to the application main thread, which means the
function will be called on the next application event loop tick.
"""
# Check to see if we need to postpone emits
if self._postpone_emit:
if threading.current_thread() != self._postpone_thread:
Logger.log("w", "Tried to emit signal from thread %s while emits are being postponed by %s. Traceback:", threading.current_thread(), self._postpone_thread)
tb = traceback.format_stack()
for line in tb:
Logger.log("w", line)
if self._compress_postpone == CompressTechnique.CompressSingle:
# If emits should be compressed, we only emit the last emit that was called
self._postponed_emits = (args, kwargs)
else:
# If emits should not be compressed or compressed per parameter value, we catch all calls to emit and put them in a list to be called later.
if not self._postponed_emits:
self._postponed_emits = []
self._postponed_emits.append((args, kwargs))
return
if self.__type != Signal.Direct:
self.__handleEmitIndirect(*args, **kwargs)
else:
self.__performEmit(*args, **kwargs)
@call_if_enabled(_traceConnect, _isTraceEnabled())
def connect(self, connector: Union["Signal", Callable[[], None]]) -> None:
"""Connect to this signal.
:param connector: The signal or slot (function) to connect.
"""
if self._postpone_emit:
Logger.log("w", "Tried to connect to signal %s that is currently being postponed, this is not possible", self.__name)
return
with self.__lock:
if isinstance(connector, Signal):
if connector == self:
return
self.__signals = self.__signals.append(connector)
elif inspect.ismethod(connector):
# if SIGNAL_PROFILE:
# Logger.log('d', "Connector method qual name: " + connector.__func__.__qualname__)
self.__methods = self.__methods.append(cast(Any, connector).__self__, cast(Any, connector).__func__)
else:
# Once again, update the list of functions using a whole new list.
# if SIGNAL_PROFILE:
# Logger.log('d', "Connector function qual name: " + connector.__qualname__)
self.__functions = self.__functions.append(connector)
@call_if_enabled(_traceDisconnect, _isTraceEnabled())
def disconnect(self, connector):
"""Disconnect from this signal.
:param connector: The signal or slot (function) to disconnect.
"""
if self._postpone_emit:
Logger.log("w", "Tried to disconnect from signal %s that is currently being postponed, this is not possible", self.__name)
return
with self.__lock:
if isinstance(connector, Signal):
self.__signals = self.__signals.remove(connector)
elif inspect.ismethod(connector):
self.__methods = self.__methods.remove(connector.__self__, connector.__func__)
else:
self.__functions = self.__functions.remove(connector)
def disconnectAll(self):
"""Disconnect all connected slots."""
if self._postpone_emit:
Logger.log("w", "Tried to disconnect from signal %s that is currently being postponed, this is not possible", self.__name)
return
with self.__lock:
self.__functions = WeakImmutableList() # type: "WeakImmutableList"
self.__methods = WeakImmutablePairList() # type: "WeakImmutablePairList"
self.__signals = WeakImmutableList() # type: "WeakImmutableList"
def __getstate__(self):
"""To support Pickle
Since Weak containers cannot be serialized by Pickle we just return an empty dict as state.
"""
return {}
def __deepcopy__(self, memo):
"""To properly handle deepcopy in combination with __getstate__
Apparently deepcopy uses __getstate__ internally, which is not documented. The reimplementation
of __getstate__ then breaks deepcopy. On the other hand, if we do not reimplement it like that,
we break pickle. So instead make sure to also reimplement __deepcopy__.
"""
# Snapshot these fields
with self.__lock:
functions = self.__functions
methods = self.__methods
signals = self.__signals
signal = Signal(type = self.__type)
signal.__functions = functions
signal.__methods = methods
signal.__signals = signals
return signal
_app = None # type: Application
"""To avoid circular references when importing Application, this should be
set by the Application instance.
"""
_signalQueue = None # type: Application
def __handleEmitIndirect(self, *args, **kwargs) -> None:
# Handle any indirect emits of signals (eg; type is "Auto" or "Queued"
try:
if self.__type == Signal.Queued:
Signal._app.functionEvent(CallFunctionEvent(self.__performEmitIndirect, args, kwargs))
if self.__type == Signal.Auto:
if threading.current_thread() is not Signal._app.getMainThread():
Signal._app.functionEvent(CallFunctionEvent(self.__performEmitIndirect, args, kwargs))
else:
# Signal is emitted from the main thread, so call it directly!
self.__performEmit(*args, **kwargs)
except AttributeError: # If Signal._app is not set
pass
def __performEmitIndirect(self, *args, **kwargs):
self.__performEmit(*args, **kwargs)
# Private implementation of the actual emit.
# This is done to make it possible to freely push function events without needing to maintain state.
def __performEmit(self, *args, **kwargs) -> None:
# Quickly make some private references to the collections we need to process.
# Although the these fields are always safe to use read and use with regards to threading,
# we want to operate on a consistent snapshot of the whole set of fields.
# The acquire_timeout is here for debugging / profiling purposes, which might help us figure out why certain
# people experience slowdowns. At a certain point this can be removed.
copy_successful = False
while not copy_successful:
with acquire_timeout(self.__lock, 0.5) as acquired:
if acquired:
functions = self.__functions
methods = self.__methods
signals = self.__signals
copy_successful = True
else:
Logger.log("w", "Getting lock for signal [%s] took more than 0.5 seconds, this should not happen!", self)
if not FlameProfiler.isRecordingProfile():
# Call handler functions
for func in functions:
func(*args, **kwargs)
# Call handler methods
for dest, func in methods:
func(dest, *args, **kwargs)
# Emit connected signals
for signal in signals:
signal.emit(*args, **kwargs)
else:
# Call handler functions
for func in functions:
with FlameProfiler.profileCall(func.__qualname__):
func(*args, **kwargs)
# Call handler methods
for dest, func in methods:
with FlameProfiler.profileCall(func.__qualname__):
func(dest, *args, **kwargs)
# Emit connected signals
for signal in signals:
with FlameProfiler.profileCall("[SIG]" + signal.getName()):
signal.emit(*args, **kwargs)
# This __str__() is useful for debugging.
# def __str__(self):
# function_str = ", ".join([repr(f) for f in self.__functions])
# method_str = ", ".join([ "{dest: " + str(dest) + ", funcs: " + strMethodSet(funcs) + "}" for dest, funcs in self.__methods])
# signal_str = ", ".join([str(signal) for signal in self.__signals])
# return "Signal<{}> {{ __functions={{ {} }}, __methods={{ {} }}, __signals={{ {} }} }}".format(id(self), function_str, method_str, signal_str)
#def strMethodSet(method_set):
# return "{" + ", ".join([str(m) for m in method_set]) + "}"
class CompressTechnique(enum.Enum):
NoCompression = 0
CompressSingle = 1
CompressPerParameterValue = 2
@contextlib.contextmanager
def postponeSignals(*signals, compress: CompressTechnique = CompressTechnique.NoCompression):
"""A context manager that allows postponing of signal emissions
This context manager will collect any calls to emit() made for the provided signals
and only emit them after exiting. This ensures more batched processing of signals.
The optional "compress" argument will limit the emit calls to 1. This means that
when a bunch of calls are made to the signal's emit() method, only the last call
will be emitted on exit.
**WARNING** When compress is True, only the **last** call will be emitted. This means
that any other calls will be ignored, _including their arguments_.
:param signals: The signals to postpone emits for.
:param compress: Whether to enable compression of emits or not.
"""
# To allow for nested postpones on the same signals, we should check if signals are not already
# postponed and only change those that are not yet postponed.
restore_emit = []
for signal in signals:
if not signal._postpone_emit: # Do nothing if the signal has already been changed
signal._postpone_emit = True
signal._postpone_thread = threading.current_thread()
signal._compress_postpone = compress
# Since we made changes, make sure to restore the signal after exiting the context manager
restore_emit.append(signal)
# Execute the code block in the "with" statement
yield
for signal in restore_emit:
# We are done with the code, restore all changed signals to their "normal" state
signal._postpone_emit = False
if signal._postponed_emits:
# Send any signal emits that were collected while emits were being postponed
if signal._compress_postpone == CompressTechnique.CompressSingle:
signal.emit(*signal._postponed_emits[0], **signal._postponed_emits[1])
elif signal._compress_postpone == CompressTechnique.CompressPerParameterValue:
uniques = {(tuple(args), tuple(kwargs.items())) for args, kwargs in signal._postponed_emits} #Have to make them tuples in order to make them hashable.
for args, kwargs in uniques:
signal.emit(*args, **dict(kwargs))
else:
for args, kwargs in signal._postponed_emits:
signal.emit(*args, **kwargs)
signal._postponed_emits = None
signal._postpone_thread = None
signal._compress_postpone = False
def signalemitter(cls):
"""Class decorator that ensures a class has unique instances of signals.
Since signals need to be instance variables, normally you would need to create all
signals in the class" `__init__` method. However, this makes them rather awkward to
document. This decorator instead makes it possible to declare them as class variables,
which makes documenting them near the function they are used possible. This decorator
adjusts the class' __new__ method to create new signal instances for all class signals.
"""
# First, check if the base class has any signals defined
signals = inspect.getmembers(cls, lambda i: isinstance(i, Signal))
if not signals:
raise TypeError("Class {0} is marked as signal emitter but no signal were found".format(cls))
# Then, replace the class' new method with one that modifies the created instance to have
# unique signals.
old_new = cls.__new__
def new_new(subclass, *args, **kwargs):
if old_new == object.__new__:
sub = object.__new__(subclass)
else:
sub = old_new(subclass, *args, **kwargs)
for key, value in inspect.getmembers(cls, lambda i: isinstance(i, Signal)):
setattr(sub, key, Signal(type = value.getType()))
return sub
cls.__new__ = new_new
return cls
T = TypeVar('T')
class WeakImmutableList(Generic[T], Iterable):
"""Minimal implementation of a weak reference list with immutable tendencies.
Strictly speaking this isn't immutable because the garbage collector can modify
it, but no application code can. Also, this class doesn't implement the Python
list API, only the handful of methods we actually need in the code above.
"""
def __init__(self) -> None:
self.__list = [] # type: List[ReferenceType[Optional[T]]]
def append(self, item: T) -> "WeakImmutableList[T]":
"""Append an item and return a new list
:param item: the item to append
:return: a new list
"""
new_instance = WeakImmutableList() # type: WeakImmutableList[T]
new_instance.__list = self.__list.copy()
new_instance.__list.append(ReferenceType(item))
return new_instance
def remove(self, item: T) -> "WeakImmutableList[T]":
"""Remove an item and return a list
Note that unlike the normal Python list.remove() method, this ones
doesn't throw a ValueError if the item isn't in the list.
:param item: item to remove
:return: a list which does not have the item.
"""
for item_ref in self.__list:
if item_ref() is item:
new_instance = WeakImmutableList() # type: WeakImmutableList[T]
new_instance.__list = self.__cleanList()
new_instance.__list.remove(item_ref)
return new_instance
else:
return self # No changes needed
# Create a new list with the missing values removed.
def __cleanList(self) -> "List[ReferenceType[Optional[T]]]":
return [item_ref for item_ref in self.__list if item_ref() is not None]
def __iter__(self):
return WeakImmutableListIterator(self.__list)
class WeakImmutableListIterator(Generic[T], Iterable):
"""Iterator wrapper which filters out missing values.
It dereferences each weak reference object and filters out the objects
which have already disappeared via GC.
"""
def __init__(self, list_):
self.__it = list_.__iter__()
def __iter__(self):
return self
def __next__(self):
next_item = self.__it.__next__()()
while next_item is None: # Skip missing values
next_item = self.__it.__next__()()
return next_item
U = TypeVar('U')
class WeakImmutablePairList(Generic[T, U], Iterable):
"""A variation of WeakImmutableList which holds a pair of values using weak refernces."""
def __init__(self) -> None:
self.__list = [] # type: List[Tuple[ReferenceType[T],ReferenceType[U]]]
def append(self, left_item: T, right_item: U) -> "WeakImmutablePairList[T,U]":
"""Append an item and return a new list
:param item: the item to append
:return: a new list
"""
new_instance = WeakImmutablePairList() # type: WeakImmutablePairList[T,U]
new_instance.__list = self.__list.copy()
new_instance.__list.append( (weakref.ref(left_item), weakref.ref(right_item)) )
return new_instance
def remove(self, left_item: T, right_item: U) -> "WeakImmutablePairList[T,U]":
"""Remove an item and return a list
Note that unlike the normal Python list.remove() method, this ones
doesn't throw a ValueError if the item isn't in the list.
:param item: item to remove
:return: a list which does not have the item.
"""
for pair in self.__list:
left = pair[0]()
right = pair[1]()
if left is left_item and right is right_item:
new_instance = WeakImmutablePairList() # type: WeakImmutablePairList[T,U]
new_instance.__list = self.__cleanList()
new_instance.__list.remove(pair)
return new_instance
else:
return self # No changes needed
# Create a new list with the missing values removed.
def __cleanList(self) -> List[Tuple[ReferenceType, ReferenceType]]:
return [pair for pair in self.__list if pair[0]() is not None and pair[1]() is not None]
def __iter__(self):
return WeakImmutablePairListIterator(self.__list)
# A small iterator wrapper which dereferences the weak ref objects and filters
# out the objects which have already disappeared via GC.
class WeakImmutablePairListIterator:
def __init__(self, list_) -> None:
self.__it = list_.__iter__()
def __iter__(self):
return self
def __next__(self):
pair = self.__it.__next__()
left = pair[0]()
right = pair[1]()
while left is None or right is None: # Skip missing values
pair = self.__it.__next__()
left = pair[0]()
right = pair[1]()
return left, right