Skip to content

Commit 9592cd4

Browse files
feat(profiling): port tracking asyncio.wait'ed Tasks
1 parent 9ac873c commit 9592cd4

File tree

4 files changed

+277
-2
lines changed

4 files changed

+277
-2
lines changed

ddtrace/profiling/_asyncio.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,22 @@ def _(f, args, kwargs):
132132
for child in children:
133133
stack_v2.link_tasks(parent, child)
134134

135+
@partial(wrap, sys.modules["asyncio"].tasks._wait)
136+
def _(f, args, kwargs):
137+
try:
138+
return f(*args, **kwargs)
139+
finally:
140+
futures = get_argument_value(args, kwargs, 0, "fs")
141+
assert futures is not None # nosec: assert is used for typing
142+
143+
# Pass an invalid positional index for 'loop'
144+
loop = get_argument_value(args, kwargs, 3, "loop")
145+
146+
# Link the parent gathering task to the gathered children
147+
parent = globals()["current_task"](loop)
148+
for future in futures:
149+
stack_v2.link_tasks(parent, typing.cast("asyncio.Task", future))
150+
135151
_call_init_asyncio(asyncio)
136152

137153

tests/profiling/collector/pprof_utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,38 @@ def assert_profile_has_sample(
354354
samples: List[pprof_pb2.Sample],
355355
expected_sample: StackEvent,
356356
):
357+
# Print all samples with line number + function name + labels
358+
print(f"\n=== Printing all {len(samples)} samples ===")
359+
for i, sample in enumerate(samples):
360+
print(f"\nSample {i}:")
361+
362+
# Print locations (stack trace)
363+
print(" Stack trace:")
364+
for j, location_id in enumerate(sample.location_id):
365+
location = get_location_with_id(profile, location_id)
366+
if location.line:
367+
line = location.line[0]
368+
function = get_function_with_id(profile, line.function_id)
369+
function_name = profile.string_table[function.name]
370+
filename = profile.string_table[function.filename]
371+
print(f" [{j}] {filename}:{line.line} in {function_name}()")
372+
373+
# Print labels
374+
print(" Labels:")
375+
for label in sample.label:
376+
key_str = profile.string_table[label.key]
377+
if label.str:
378+
value_str = profile.string_table[label.str]
379+
print(f" {key_str}: {value_str}")
380+
elif label.num:
381+
print(f" {key_str}: {label.num}")
382+
383+
# Print values
384+
if sample.value:
385+
print(f" Values: {sample.value}")
386+
387+
print("=== End of samples ===\n")
388+
357389
found = False
358390
for sample in samples:
359391
try:
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
import pytest
2+
3+
4+
@pytest.mark.subprocess(
5+
env=dict(
6+
DD_PROFILING_OUTPUT_PPROF="/tmp/test_stack_asyncio_gather",
7+
),
8+
err=None,
9+
)
10+
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
11+
def test_asyncio_gather() -> None:
12+
import asyncio
13+
import os
14+
import time
15+
import uuid
16+
17+
from ddtrace import ext
18+
from ddtrace.internal.datadog.profiling import stack_v2
19+
from ddtrace.profiling import profiler
20+
from ddtrace.trace import tracer
21+
from tests.profiling.collector import pprof_utils
22+
23+
assert stack_v2.is_available, stack_v2.failure_msg
24+
25+
sleep_time = 0.2
26+
loop_run_time = 3
27+
28+
async def inner() -> None:
29+
start_time = time.time()
30+
while time.time() < start_time + loop_run_time:
31+
await asyncio.sleep(sleep_time)
32+
33+
async def outer():
34+
t1 = asyncio.create_task(inner(), name="inner 1")
35+
t2 = asyncio.create_task(inner(), name="inner 2")
36+
await asyncio.gather(t1, t2)
37+
38+
resource = str(uuid.uuid4())
39+
span_type = ext.SpanTypes.WEB
40+
41+
p = profiler.Profiler(tracer=tracer)
42+
p.start()
43+
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
44+
span_id = span.span_id
45+
local_root_span_id = span._local_root.span_id
46+
47+
loop = asyncio.new_event_loop()
48+
asyncio.set_event_loop(loop)
49+
main_task = loop.create_task(outer(), name="outer")
50+
loop.run_until_complete(main_task)
51+
52+
p.stop()
53+
54+
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())
55+
56+
profile = pprof_utils.parse_newest_profile(output_filename)
57+
58+
samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
59+
assert len(samples_with_span_id) > 0
60+
61+
# get samples with task_name
62+
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
63+
# The next fails if stack_v2 is not properly configured with asyncio task
64+
# tracking via ddtrace.profiling._asyncio
65+
assert len(samples) > 0
66+
67+
pprof_utils.assert_profile_has_sample(
68+
profile,
69+
samples,
70+
expected_sample=pprof_utils.StackEvent(
71+
thread_name="MainThread",
72+
task_name="outer",
73+
span_id=span_id,
74+
local_root_span_id=local_root_span_id,
75+
locations=[
76+
pprof_utils.StackLocation(
77+
function_name="outer", filename="test_stack_asyncio.py", line_no=outer.__code__.co_firstlineno + 3
78+
),
79+
# TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack
80+
],
81+
),
82+
)
83+
84+
pprof_utils.assert_profile_has_sample(
85+
profile,
86+
samples,
87+
expected_sample=pprof_utils.StackEvent(
88+
thread_name="MainThread",
89+
task_name="inner 1",
90+
span_id=span_id,
91+
local_root_span_id=local_root_span_id,
92+
locations=[
93+
pprof_utils.StackLocation(
94+
function_name="inner", filename="test_stack_asyncio.py", line_no=inner.__code__.co_firstlineno + 3
95+
),
96+
],
97+
),
98+
)
99+
100+
pprof_utils.assert_profile_has_sample(
101+
profile,
102+
samples,
103+
expected_sample=pprof_utils.StackEvent(
104+
thread_name="MainThread",
105+
task_name="inner 2",
106+
span_id=span_id,
107+
local_root_span_id=local_root_span_id,
108+
locations=[
109+
pprof_utils.StackLocation(
110+
function_name="inner", filename="test_stack_asyncio.py", line_no=inner.__code__.co_firstlineno + 3
111+
),
112+
],
113+
),
114+
)
115+
116+
117+
@pytest.mark.subprocess(
118+
env=dict(
119+
DD_PROFILING_OUTPUT_PPROF="/tmp/test_stack_asyncio_wait",
120+
),
121+
err=None,
122+
)
123+
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
124+
def test_asyncio_wait() -> None:
125+
import asyncio
126+
import os
127+
import time
128+
import uuid
129+
130+
from ddtrace import ext
131+
from ddtrace.internal.datadog.profiling import stack_v2
132+
from ddtrace.profiling import profiler
133+
from ddtrace.trace import tracer
134+
from tests.profiling.collector import pprof_utils
135+
136+
assert stack_v2.is_available, stack_v2.failure_msg
137+
138+
sleep_time = 0.2
139+
loop_run_time = 3
140+
141+
async def inner() -> None:
142+
start_time = time.time()
143+
while time.time() < start_time + loop_run_time:
144+
await asyncio.sleep(sleep_time)
145+
146+
async def outer() -> None:
147+
t1 = asyncio.create_task(inner(), name="inner 1")
148+
t2 = asyncio.create_task(inner(), name="inner 2")
149+
await asyncio.wait(fs=(t1, t2), return_when=asyncio.ALL_COMPLETED)
150+
151+
resource = str(uuid.uuid4())
152+
span_type = ext.SpanTypes.WEB
153+
154+
p = profiler.Profiler(tracer=tracer)
155+
p.start()
156+
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
157+
span_id = span.span_id
158+
local_root_span_id = span._local_root.span_id
159+
160+
loop = asyncio.new_event_loop()
161+
asyncio.set_event_loop(loop)
162+
main_task = loop.create_task(outer(), name="outer")
163+
loop.run_until_complete(main_task)
164+
165+
p.stop()
166+
167+
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())
168+
169+
profile = pprof_utils.parse_newest_profile(output_filename)
170+
171+
samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
172+
assert len(samples_with_span_id) > 0
173+
174+
# get samples with task_name
175+
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
176+
# The next fails if stack_v2 is not properly configured with asyncio task
177+
# tracking via ddtrace.profiling._asyncio
178+
assert len(samples) > 0
179+
180+
pprof_utils.assert_profile_has_sample(
181+
profile,
182+
samples,
183+
expected_sample=pprof_utils.StackEvent(
184+
thread_name="MainThread",
185+
task_name="outer",
186+
span_id=span_id,
187+
local_root_span_id=local_root_span_id,
188+
locations=[
189+
pprof_utils.StackLocation(
190+
function_name="outer", filename="test_stack_asyncio.py", line_no=outer.__code__.co_firstlineno + 3
191+
),
192+
# TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack
193+
],
194+
),
195+
)
196+
197+
pprof_utils.assert_profile_has_sample(
198+
profile,
199+
samples,
200+
expected_sample=pprof_utils.StackEvent(
201+
thread_name="MainThread",
202+
task_name="inner 1",
203+
span_id=span_id,
204+
local_root_span_id=local_root_span_id,
205+
locations=[
206+
pprof_utils.StackLocation(
207+
function_name="inner", filename="test_stack_asyncio.py", line_no=inner.__code__.co_firstlineno + 3
208+
),
209+
],
210+
),
211+
)
212+
213+
pprof_utils.assert_profile_has_sample(
214+
profile,
215+
samples,
216+
expected_sample=pprof_utils.StackEvent(
217+
thread_name="MainThread",
218+
task_name="inner 2",
219+
span_id=span_id,
220+
local_root_span_id=local_root_span_id,
221+
locations=[
222+
pprof_utils.StackLocation(
223+
function_name="inner", filename="test_stack_asyncio.py", line_no=inner.__code__.co_firstlineno + 3
224+
),
225+
],
226+
),
227+
)

tests/profiling_v2/collector/test_stack_asyncio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ async def hello():
4747

4848
loop = asyncio.new_event_loop()
4949
asyncio.set_event_loop(loop)
50-
maintask = loop.create_task(hello(), name="main")
50+
main_task = loop.create_task(hello(), name="main")
5151

52-
t1, t2 = loop.run_until_complete(maintask)
52+
t1, t2 = loop.run_until_complete(main_task)
5353
p.stop()
5454

5555
t1_name = t1.get_name()

0 commit comments

Comments
 (0)