-
-
Notifications
You must be signed in to change notification settings - Fork 186
/
Copy pathFlameProfiler.py
240 lines (185 loc) · 7.23 KB
/
FlameProfiler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# Copyright (c) 2022 Ultimaker B.V.
# Uranium is released under the terms of the LGPLv3 or higher.
import time
import math
import os
import threading
from contextlib import contextmanager
import functools
from typing import List, Callable, Any
from PyQt6.QtCore import pyqtSlot as PyQt6PyqtSlot
from UM.Logger import Logger
# A simple profiler which produces data suitable for viewing as a flame graph
# when using the Big Flame Graph plugin.
#
# An example of code which uses this profiling data is this Cura plugin:
# https://github.com/sedwards2009/cura-big-flame-graph
#
# Set the environment variable URANIUM_FLAME_PROFILER to something before
# starting the application to make the profiling code available.
def enabled() -> bool:
return "URANIUM_FLAME_PROFILER" in os.environ
record_profile = False # Flag to keep track of whether we are recording data.
# Profiling data is build up of a tree of these kinds of nodes. Each node
# has a name, start time, end time, and a list of children nodes which are
# other functions/methods which were called by this function.
class _ProfileCallNode:
def __init__(self, name, line_number, start_time, end_time, children):
self.__name = name
self.__line_number = line_number
self.__start_time = start_time
self.__end_time = end_time
self.__children = children if children is not None else [] # type: List[_ProfileCallNode]
def getStartTime(self):
return self.__start_time
def getEndTime(self):
return self.__end_time
def getDuration(self):
return self.__end_time - self.__start_time
def toJSON(self, root=False):
if root:
return """
{
"c": {
"callStats": """ + self._plainToJSON() + """,
"sampleIterval": 1,
"objectName": "Cura",
"runTime": """ + str(self.getDuration()) + """,
"totalSamples": """ + str(self.getDuration()) + """
},
"version": "0.34"
}
"""
else:
return self._plainToJSON()
def _plainToJSON(self):
return '''{
"stack": [
"''' + self.__name + '''",
"Code: ''' + self.__name + '''",
''' + str(self.__line_number) + ''',
''' + str(self.getDuration()) + '''
],
"sampleCount": '''+ str(self.getDuration()) + ''',
"children": [
''' + ",\n".join( [kid.toJSON() for kid in self.__children]) + '''
]
}
'''
child_accu_stack = [ [] ] # type: List[List[_ProfileCallNode]]
clear_profile_requested = False
record_profile_requested = False
stop_record_profile_requested = False
def getProfileData():
"""Fetch the accumulated profile data.
:return: :type{ProfileCallNode} or None if there is no data.
"""
raw_profile_calls = child_accu_stack[0]
if len(raw_profile_calls) == 0:
return None
start_time = raw_profile_calls[0].getStartTime()
end_time = raw_profile_calls[-1].getEndTime()
fill_children = _fillInProfileSpaces(start_time, end_time, raw_profile_calls)
return _ProfileCallNode("", 0, start_time, end_time, fill_children)
def clearProfileData():
"""Erase any profile data."""
global clear_profile_requested
clear_profile_requested = True
def startRecordingProfileData():
"""Start recording profile data."""
global record_profile_requested
global stop_record_profile_requested
stop_record_profile_requested = False
record_profile_requested = True
def stopRecordingProfileData():
"""Stop recording profile data."""
global stop_record_profile_requested
stop_record_profile_requested = True
def _fillInProfileSpaces(start_time, end_time, profile_call_list):
result = []
time_counter = start_time
for profile_call in profile_call_list:
if secondsToMS(profile_call.getStartTime()) != secondsToMS(time_counter):
result.append(_ProfileCallNode("", 0, time_counter, profile_call.getStartTime(), []))
result.append(profile_call)
time_counter = profile_call.getEndTime()
if secondsToMS(time_counter) != secondsToMS(end_time):
result.append(_ProfileCallNode("", 0, time_counter, end_time, []))
return result
def secondsToMS(value):
return math.floor(value *1000)
@contextmanager
def profileCall(name):
"""Profile a block of code.
Use this context manager to wrap and profile a block of code.
:param name: :type{str} The name to use to identify this code in the profile report.
"""
if enabled():
start_time = time.perf_counter()
child_accu_stack.append([])
yield
end_time = time.perf_counter()
child_values = child_accu_stack.pop()
if (end_time - start_time) > 0.001: # Filter out small durations (< 1ms)
call_stat = _ProfileCallNode(name, 0, start_time, end_time, _fillInProfileSpaces(start_time, end_time,
child_values))
child_accu_stack[-1].append(call_stat)
else:
yield
def isRecordingProfile() -> bool:
"""Return whether we are recording profiling information.
:return: :type{bool} True if we are recording.
"""
global record_profile
return record_profile and threading.main_thread() is threading.current_thread()
def updateProfileConfig():
global child_accu_stack
global record_profile
# We can only update the active profiling config when we are not deeply nested inside profiled calls.
if len(child_accu_stack) <= 1:
global clear_profile_requested
if clear_profile_requested:
clear_profile_requested = False
child_accu_stack = [[]]
global record_profile_requested
if record_profile_requested:
record_profile_requested = False
record_profile = True
Logger.log('d', 'Starting record record_profile_requested')
global stop_record_profile_requested
if stop_record_profile_requested:
stop_record_profile_requested = False
record_profile = False
Logger.log('d', 'Stopping record stop_record_profile_requested')
def profile(function):
"""Decorator which can be manually applied to methods to record profiling information."""
if enabled():
@functools.wraps(function)
def runIt(*args, ** kwargs):
if isRecordingProfile():
with profileCall(function.__qualname__):
return function(*args, ** kwargs)
else:
return function(*args, **kwargs)
return runIt
else:
return function
def pyqtSlot(*args, **kwargs) -> Callable[..., Any]:
"""Drop in replacement for PyQt6's pyqtSlot decorator which records profiling information.
See the PyQt6 documentation for information about pyqtSlot.
"""
if enabled():
def wrapIt(function):
@functools.wraps(function)
def wrapped(*args2, **kwargs2):
if isRecordingProfile():
with profileCall("[SLOT] "+ function.__qualname__):
return function(*args2, **kwargs2)
else:
return function(*args2, **kwargs2)
return PyQt6PyqtSlot(*args, **kwargs)(wrapped)
return wrapIt
else:
def dontWrapIt(function):
return PyQt6PyqtSlot(*args, **kwargs)(function)
return dontWrapIt