-
Notifications
You must be signed in to change notification settings - Fork 536
/
Copy pathtest_threading.py
275 lines (216 loc) · 8.24 KB
/
test_threading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import gc
from concurrent import futures
from textwrap import dedent
from threading import Thread
import pytest
import sentry_sdk
from sentry_sdk import capture_message
from sentry_sdk.integrations.threading import ThreadingIntegration
original_start = Thread.start
original_run = Thread.run
@pytest.mark.parametrize("integrations", [[ThreadingIntegration()], []])
def test_handles_exceptions(sentry_init, capture_events, integrations):
sentry_init(default_integrations=False, integrations=integrations)
events = capture_events()
def crash():
1 / 0
t = Thread(target=crash)
t.start()
t.join()
if integrations:
(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "threading"
assert not exception["mechanism"]["handled"]
else:
assert not events
@pytest.mark.parametrize("propagate_hub", (True, False))
def test_propagates_hub(sentry_init, capture_events, propagate_hub):
sentry_init(
default_integrations=False,
integrations=[ThreadingIntegration(propagate_hub=propagate_hub)],
)
events = capture_events()
def stage1():
sentry_sdk.get_isolation_scope().set_tag("stage1", "true")
t = Thread(target=stage2)
t.start()
t.join()
def stage2():
1 / 0
t = Thread(target=stage1)
t.start()
t.join()
(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "threading"
assert not exception["mechanism"]["handled"]
if propagate_hub:
assert event["tags"]["stage1"] == "true"
else:
assert "stage1" not in event.get("tags", {})
@pytest.mark.parametrize("propagate_hub", (True, False))
def test_propagates_threadpool_hub(sentry_init, capture_events, propagate_hub):
sentry_init(
traces_sample_rate=1.0,
integrations=[ThreadingIntegration(propagate_hub=propagate_hub)],
)
events = capture_events()
def double(number):
with sentry_sdk.start_span(op="task", name=str(number)):
return number * 2
with sentry_sdk.start_transaction(name="test_handles_threadpool"):
with futures.ThreadPoolExecutor(max_workers=1) as executor:
tasks = [executor.submit(double, number) for number in [1, 2, 3, 4]]
for future in futures.as_completed(tasks):
print("Getting future value!", future.result())
sentry_sdk.flush()
if propagate_hub:
assert len(events) == 1
(event,) = events
assert event["spans"][0]["trace_id"] == event["spans"][1]["trace_id"]
assert event["spans"][1]["trace_id"] == event["spans"][2]["trace_id"]
assert event["spans"][2]["trace_id"] == event["spans"][3]["trace_id"]
assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"]
else:
(event,) = events
assert len(event["spans"]) == 0
@pytest.mark.skip(reason="Temporarily disable to release SDK 2.0a1.")
def test_circular_references(sentry_init, request):
sentry_init(default_integrations=False, integrations=[ThreadingIntegration()])
gc.collect()
gc.disable()
request.addfinalizer(gc.enable)
class MyThread(Thread):
def run(self):
pass
t = MyThread()
t.start()
t.join()
del t
unreachable_objects = gc.collect()
assert unreachable_objects == 0
def test_double_patching(sentry_init, capture_events):
sentry_init(default_integrations=False, integrations=[ThreadingIntegration()])
events = capture_events()
# XXX: Workaround for race condition in the py library's magic import
# system (py is a dependency of pytest)
capture_message("hi")
del events[:]
class MyThread(Thread):
def run(self):
1 / 0
ts = []
for _ in range(10):
t = MyThread()
t.start()
ts.append(t)
for t in ts:
t.join()
assert len(events) == 10
for event in events:
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
def test_wrapper_attributes(sentry_init):
sentry_init(default_integrations=False, integrations=[ThreadingIntegration()])
def target():
assert t.run.__name__ == "run"
assert t.run.__qualname__ == original_run.__qualname__
t = Thread(target=target)
t.start()
t.join()
assert Thread.start.__name__ == "start"
assert Thread.start.__qualname__ == original_start.__qualname__
assert t.start.__name__ == "start"
assert t.start.__qualname__ == original_start.__qualname__
assert Thread.run.__name__ == "run"
assert Thread.run.__qualname__ == original_run.__qualname__
assert t.run.__name__ == "run"
assert t.run.__qualname__ == original_run.__qualname__
@pytest.mark.parametrize(
"propagate_scope",
(True, False),
ids=["propagate_scope=True", "propagate_scope=False"],
)
def test_scope_data_not_leaked_in_threads(sentry_init, propagate_scope):
sentry_init(
integrations=[ThreadingIntegration(propagate_scope=propagate_scope)],
)
sentry_sdk.set_tag("initial_tag", "initial_value")
initial_iso_scope = sentry_sdk.get_isolation_scope()
def do_some_work():
# check if we have the initial scope data propagated into the thread
if propagate_scope:
assert sentry_sdk.get_isolation_scope()._tags == {
"initial_tag": "initial_value"
}
else:
assert sentry_sdk.get_isolation_scope()._tags == {}
# change data in isolation scope in thread
sentry_sdk.set_tag("thread_tag", "thread_value")
t = Thread(target=do_some_work)
t.start()
t.join()
# check if the initial scope data is not modified by the started thread
assert initial_iso_scope._tags == {
"initial_tag": "initial_value"
}, "The isolation scope in the main thread should not be modified by the started thread."
@pytest.mark.parametrize(
"propagate_scope",
(True, False),
ids=["propagate_scope=True", "propagate_scope=False"],
)
def test_spans_from_multiple_threads(
sentry_init, capture_events, render_span_tree, propagate_scope
):
sentry_init(
traces_sample_rate=1.0,
integrations=[ThreadingIntegration(propagate_scope=propagate_scope)],
)
events = capture_events()
def do_some_work(number):
with sentry_sdk.start_span(
op=f"inner-run-{number}", name=f"Thread: child-{number}"
):
pass
threads = []
with sentry_sdk.start_transaction(op="outer-trx"):
for number in range(5):
with sentry_sdk.start_span(
op=f"outer-submit-{number}", name="Thread: main"
):
t = Thread(target=do_some_work, args=(number,))
t.start()
threads.append(t)
for t in threads:
t.join()
(event,) = events
if propagate_scope:
assert render_span_tree(event) == dedent(
"""\
- op="outer-trx": description=null
- op="outer-submit-0": description="Thread: main"
- op="inner-run-0": description="Thread: child-0"
- op="outer-submit-1": description="Thread: main"
- op="inner-run-1": description="Thread: child-1"
- op="outer-submit-2": description="Thread: main"
- op="inner-run-2": description="Thread: child-2"
- op="outer-submit-3": description="Thread: main"
- op="inner-run-3": description="Thread: child-3"
- op="outer-submit-4": description="Thread: main"
- op="inner-run-4": description="Thread: child-4"\
"""
)
elif not propagate_scope:
assert render_span_tree(event) == dedent(
"""\
- op="outer-trx": description=null
- op="outer-submit-0": description="Thread: main"
- op="outer-submit-1": description="Thread: main"
- op="outer-submit-2": description="Thread: main"
- op="outer-submit-3": description="Thread: main"
- op="outer-submit-4": description="Thread: main"\
"""
)