Skip to content

Commit d2d496d

Browse files
feat(profiling): track asyncio.wait
1 parent 84b4a8c commit d2d496d

File tree

5 files changed

+382
-1
lines changed

5 files changed

+382
-1
lines changed

ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def unregister_thread(name: str) -> None: ...
2424

2525
# Asyncio support
2626
def track_asyncio_loop(thread_id: int, loop: Optional[asyncio.AbstractEventLoop]) -> None: ...
27-
def link_tasks(parent: asyncio.AbstractEventLoop, child: asyncio.Task) -> None: ...
27+
def link_tasks(parent: asyncio.Task, child: asyncio.Future) -> None: ...
2828
def init_asyncio(
2929
current_tasks: Sequence[asyncio.Task],
3030
scheduled_tasks: Sequence[asyncio.Task],

ddtrace/profiling/_asyncio.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,19 @@ def _(f, args, kwargs):
132132
for child in children:
133133
stack_v2.link_tasks(parent, child)
134134

135+
@partial(wrap, sys.modules["asyncio"].tasks._wait)
136+
def _(f, args, kwargs):
137+
try:
138+
return f(*args, **kwargs)
139+
finally:
140+
futures = typing.cast(typing.Iterable["asyncio.Future"], get_argument_value(args, kwargs, 0, "fs"))
141+
loop = typing.cast("asyncio.AbstractEventLoop", get_argument_value(args, kwargs, 3, "loop"))
142+
143+
# Link the parent gathering task to the gathered children
144+
parent: "asyncio.Task" = globals()["current_task"](loop)
145+
for future in futures:
146+
stack_v2.link_tasks(parent, future)
147+
135148
_call_init_asyncio(asyncio)
136149

137150

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
features:
2+
- |
3+
profiling: This introduces tracking for ``asyncio.wait`` in the Profiler.
4+
This makes it possible to track dependencies between Tasks/Coroutines that await/are awaited through ``asyncio.wait``.
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import pytest
2+
3+
4+
@pytest.mark.subprocess(
5+
env=dict(
6+
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_utils_gather",
7+
),
8+
err=None,
9+
)
10+
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
11+
def test_asyncio_gather() -> None:
12+
import asyncio
13+
import os
14+
import time
15+
import uuid
16+
17+
from ddtrace import ext
18+
from ddtrace.internal.datadog.profiling import stack_v2
19+
from ddtrace.profiling import profiler
20+
from ddtrace.trace import tracer
21+
from tests.profiling.collector import pprof_utils
22+
23+
assert stack_v2.is_available, stack_v2.failure_msg
24+
25+
sleep_time = 0.2
26+
loop_run_time = 3
27+
28+
async def inner1() -> None:
29+
start_time = time.time()
30+
while time.time() < start_time + loop_run_time:
31+
await asyncio.sleep(sleep_time)
32+
33+
async def inner2() -> None:
34+
start_time = time.time()
35+
while time.time() < start_time + loop_run_time:
36+
await asyncio.sleep(sleep_time)
37+
38+
async def outer() -> None:
39+
t1 = asyncio.create_task(inner1(), name="inner 1")
40+
t2 = asyncio.create_task(inner2(), name="inner 2")
41+
await asyncio.gather(t1, t2)
42+
43+
resource = str(uuid.uuid4())
44+
span_type = ext.SpanTypes.WEB
45+
46+
p = profiler.Profiler(tracer=tracer)
47+
p.start()
48+
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
49+
span_id = span.span_id
50+
local_root_span_id = span._local_root.span_id
51+
52+
loop = asyncio.new_event_loop()
53+
asyncio.set_event_loop(loop)
54+
main_task = loop.create_task(outer(), name="outer")
55+
loop.run_until_complete(main_task)
56+
57+
p.stop()
58+
59+
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())
60+
61+
profile = pprof_utils.parse_newest_profile(output_filename)
62+
63+
samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
64+
assert len(samples_with_span_id) > 0
65+
66+
# get samples with task_name
67+
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
68+
# The next fails if stack_v2 is not properly configured with asyncio task
69+
# tracking via ddtrace.profiling._asyncio
70+
assert len(samples) > 0
71+
72+
pprof_utils.assert_profile_has_sample(
73+
profile,
74+
samples,
75+
expected_sample=pprof_utils.StackEvent(
76+
thread_name="MainThread",
77+
task_name="outer",
78+
span_id=span_id,
79+
local_root_span_id=local_root_span_id,
80+
locations=[
81+
pprof_utils.StackLocation(
82+
function_name="outer", filename="test_asyncio_gather.py", line_no=outer.__code__.co_firstlineno + 3
83+
),
84+
# TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack
85+
],
86+
),
87+
)
88+
89+
try:
90+
pprof_utils.assert_profile_has_sample(
91+
profile,
92+
samples,
93+
expected_sample=pprof_utils.StackEvent(
94+
thread_name="MainThread",
95+
task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1"
96+
span_id=span_id,
97+
local_root_span_id=local_root_span_id,
98+
locations=[
99+
pprof_utils.StackLocation(
100+
function_name="inner2",
101+
filename="test_asyncio_gather.py",
102+
line_no=inner2.__code__.co_firstlineno + 3,
103+
),
104+
pprof_utils.StackLocation(
105+
function_name="outer",
106+
filename="test_asyncio_gather.py",
107+
line_no=outer.__code__.co_firstlineno + 3,
108+
),
109+
],
110+
),
111+
)
112+
113+
pprof_utils.assert_profile_has_sample(
114+
profile,
115+
samples,
116+
expected_sample=pprof_utils.StackEvent(
117+
thread_name="MainThread",
118+
task_name="inner 1",
119+
span_id=span_id,
120+
local_root_span_id=local_root_span_id,
121+
locations=[
122+
pprof_utils.StackLocation(
123+
function_name="inner1",
124+
filename="test_asyncio_gather.py",
125+
line_no=inner1.__code__.co_firstlineno + 3,
126+
),
127+
pprof_utils.StackLocation(
128+
function_name="outer",
129+
filename="test_asyncio_gather.py",
130+
line_no=outer.__code__.co_firstlineno + 3,
131+
),
132+
],
133+
),
134+
)
135+
except AssertionError:
136+
pprof_utils.assert_profile_has_sample(
137+
profile,
138+
samples,
139+
expected_sample=pprof_utils.StackEvent(
140+
thread_name="MainThread",
141+
task_name="inner 2", # TODO: This is a bug and we need to fix it, it should be "inner 1"
142+
span_id=span_id,
143+
local_root_span_id=local_root_span_id,
144+
locations=[
145+
pprof_utils.StackLocation(
146+
function_name="inner2",
147+
filename="test_asyncio_gather.py",
148+
line_no=inner2.__code__.co_firstlineno + 3,
149+
),
150+
pprof_utils.StackLocation(
151+
function_name="outer",
152+
filename="test_asyncio_gather.py",
153+
line_no=outer.__code__.co_firstlineno + 3,
154+
),
155+
],
156+
),
157+
)
158+
159+
pprof_utils.assert_profile_has_sample(
160+
profile,
161+
samples,
162+
expected_sample=pprof_utils.StackEvent(
163+
thread_name="MainThread",
164+
task_name="outer",
165+
span_id=span_id,
166+
local_root_span_id=local_root_span_id,
167+
locations=[
168+
pprof_utils.StackLocation(
169+
function_name="inner1",
170+
filename="test_asyncio_gather.py",
171+
line_no=inner1.__code__.co_firstlineno + 3,
172+
),
173+
pprof_utils.StackLocation(
174+
function_name="outer",
175+
filename="test_asyncio_gather.py",
176+
line_no=outer.__code__.co_firstlineno + 3,
177+
),
178+
],
179+
),
180+
)
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import pytest
2+
3+
4+
@pytest.mark.subprocess(
5+
env=dict(
6+
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_wait",
7+
),
8+
err=None,
9+
)
10+
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
11+
def test_asyncio_wait() -> None:
12+
import asyncio
13+
import os
14+
import time
15+
import uuid
16+
17+
from ddtrace import ext
18+
from ddtrace.internal.datadog.profiling import stack_v2
19+
from ddtrace.profiling import profiler
20+
from ddtrace.trace import tracer
21+
from tests.profiling.collector import pprof_utils
22+
23+
assert stack_v2.is_available, stack_v2.failure_msg
24+
25+
sleep_time = 0.2
26+
loop_run_time = 3
27+
28+
async def inner1() -> None:
29+
start_time = time.time()
30+
while time.time() < start_time + loop_run_time:
31+
await asyncio.sleep(sleep_time)
32+
33+
async def inner2() -> None:
34+
start_time = time.time()
35+
while time.time() < start_time + loop_run_time:
36+
await asyncio.sleep(sleep_time)
37+
38+
async def outer() -> None:
39+
t1 = asyncio.create_task(inner1(), name="inner 1")
40+
t2 = asyncio.create_task(inner2(), name="inner 2")
41+
await asyncio.wait(fs=(t1, t2), return_when=asyncio.ALL_COMPLETED)
42+
43+
resource = str(uuid.uuid4())
44+
span_type = ext.SpanTypes.WEB
45+
46+
p = profiler.Profiler(tracer=tracer)
47+
p.start()
48+
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
49+
span_id = span.span_id
50+
local_root_span_id = span._local_root.span_id
51+
52+
loop = asyncio.new_event_loop()
53+
asyncio.set_event_loop(loop)
54+
main_task = loop.create_task(outer(), name="outer")
55+
loop.run_until_complete(main_task)
56+
57+
p.stop()
58+
59+
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())
60+
61+
profile = pprof_utils.parse_newest_profile(output_filename)
62+
63+
samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
64+
assert len(samples_with_span_id) > 0
65+
66+
# get samples with task_name
67+
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
68+
# The next fails if stack_v2 is not properly configured with asyncio task
69+
# tracking via ddtrace.profiling._asyncio
70+
assert len(samples) > 0
71+
72+
pprof_utils.assert_profile_has_sample(
73+
profile,
74+
samples,
75+
expected_sample=pprof_utils.StackEvent(
76+
thread_name="MainThread",
77+
task_name="outer",
78+
span_id=span_id,
79+
local_root_span_id=local_root_span_id,
80+
locations=[
81+
pprof_utils.StackLocation(
82+
function_name="outer", filename="test_asyncio_wait.py", line_no=outer.__code__.co_firstlineno + 3
83+
),
84+
# TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack
85+
],
86+
),
87+
)
88+
89+
# Note: there currently is a bug somewhere that makes one of the Tasks show up under the parent Task and the
90+
# other Tasks be under their own Task name. We need to fix this.
91+
# For the time being, though, which Task is "independent" is non-deterministic which means we must
92+
# test both possibilities ("inner 2" is part of "outer" or "inner 1" is part of "outer").
93+
try:
94+
pprof_utils.assert_profile_has_sample(
95+
profile,
96+
samples,
97+
expected_sample=pprof_utils.StackEvent(
98+
thread_name="MainThread",
99+
task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1"
100+
span_id=span_id,
101+
local_root_span_id=local_root_span_id,
102+
locations=[
103+
pprof_utils.StackLocation(
104+
function_name="inner1",
105+
filename="test_asyncio_wait.py",
106+
line_no=inner1.__code__.co_firstlineno + 3,
107+
),
108+
pprof_utils.StackLocation(
109+
function_name="outer",
110+
filename="test_asyncio_wait.py",
111+
line_no=outer.__code__.co_firstlineno + 3,
112+
),
113+
],
114+
),
115+
)
116+
117+
pprof_utils.assert_profile_has_sample(
118+
profile,
119+
samples,
120+
expected_sample=pprof_utils.StackEvent(
121+
thread_name="MainThread",
122+
task_name="inner 2",
123+
span_id=span_id,
124+
local_root_span_id=local_root_span_id,
125+
locations=[
126+
pprof_utils.StackLocation(
127+
function_name="inner2",
128+
filename="test_asyncio_wait.py",
129+
line_no=inner2.__code__.co_firstlineno + 3,
130+
),
131+
pprof_utils.StackLocation(
132+
function_name="outer",
133+
filename="test_asyncio_wait.py",
134+
line_no=outer.__code__.co_firstlineno + 3,
135+
),
136+
],
137+
),
138+
)
139+
except AssertionError:
140+
pprof_utils.assert_profile_has_sample(
141+
profile,
142+
samples,
143+
expected_sample=pprof_utils.StackEvent(
144+
thread_name="MainThread",
145+
task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1"
146+
span_id=span_id,
147+
local_root_span_id=local_root_span_id,
148+
locations=[
149+
pprof_utils.StackLocation(
150+
function_name="inner2",
151+
filename="test_asyncio_wait.py",
152+
line_no=inner2.__code__.co_firstlineno + 3,
153+
),
154+
pprof_utils.StackLocation(
155+
function_name="outer",
156+
filename="test_asyncio_wait.py",
157+
line_no=outer.__code__.co_firstlineno + 3,
158+
),
159+
],
160+
),
161+
)
162+
163+
pprof_utils.assert_profile_has_sample(
164+
profile,
165+
samples,
166+
expected_sample=pprof_utils.StackEvent(
167+
thread_name="MainThread",
168+
task_name="inner 1",
169+
span_id=span_id,
170+
local_root_span_id=local_root_span_id,
171+
locations=[
172+
pprof_utils.StackLocation(
173+
function_name="inner1",
174+
filename="test_asyncio_wait.py",
175+
line_no=inner1.__code__.co_firstlineno + 3,
176+
),
177+
pprof_utils.StackLocation(
178+
function_name="outer",
179+
filename="test_asyncio_wait.py",
180+
line_no=outer.__code__.co_firstlineno + 3,
181+
),
182+
],
183+
),
184+
)

0 commit comments

Comments
 (0)