Skip to content

Commit ac232a5

Browse files
authored
Convert background job queue time / latency to nano seconds (#784)
1 parent b02a953 commit ac232a5

File tree

9 files changed

+207
-148
lines changed

9 files changed

+207
-148
lines changed

src/scout_apm/bottle.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55

66
import scout_apm.core
77
from scout_apm.core.config import scout_config
8+
from scout_apm.core.queue_time import track_request_queue_time
89
from scout_apm.core.tracked_request import TrackedRequest
9-
from scout_apm.core.web_requests import (
10-
create_filtered_path,
11-
ignore_path,
12-
track_request_queue_time,
13-
)
10+
from scout_apm.core.web_requests import create_filtered_path, ignore_path
1411

1512

1613
class ScoutPlugin(object):

src/scout_apm/celery.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
from celery.signals import before_task_publish, task_failure, task_postrun, task_prerun
77

8+
from scout_apm.core.queue_time import track_job_queue_time
9+
810
try:
911
import django
1012
from django.views.debug import SafeExceptionReporterFilter
@@ -34,15 +36,8 @@ def task_prerun_callback(task=None, **kwargs):
3436
tracked_request = TrackedRequest.instance()
3537
tracked_request.is_real_request = True
3638

37-
start = getattr(task.request, "scout_task_start", None)
38-
if start is not None:
39-
now = datetime_to_timestamp(dt.datetime.utcnow())
40-
try:
41-
queue_time = now - start
42-
except TypeError:
43-
pass
44-
else:
45-
tracked_request.tag("queue_time", queue_time)
39+
start_time_header = getattr(task.request, "scout_task_start", None)
40+
track_job_queue_time(start_time_header, tracked_request)
4641

4742
task_id = getattr(task.request, "id", None)
4843
if task_id:

src/scout_apm/core/queue_time.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# coding=utf-8
2+
3+
import datetime as dt
4+
import logging
5+
import time
6+
import typing
7+
8+
from scout_apm.compat import datetime_to_timestamp
9+
from scout_apm.core.tracked_request import TrackedRequest
10+
11+
logger = logging.getLogger(__name__)
12+
13+
# Cutoff epoch is used for determining ambiguous timestamp boundaries
14+
CUTOFF_EPOCH_S = time.mktime((dt.date.today().year - 10, 1, 1, 0, 0, 0, 0, 0, 0))
15+
CUTOFF_EPOCH_MS = CUTOFF_EPOCH_S * 1000.0
16+
CUTOFF_EPOCH_US = CUTOFF_EPOCH_S * 1000000.0
17+
CUTOFF_EPOCH_NS = CUTOFF_EPOCH_S * 1000000000.0
18+
19+
20+
def _convert_ambiguous_timestamp_to_ns(timestamp: float) -> float:
21+
"""
22+
Convert an ambiguous float timestamp that could be in nanoseconds,
23+
microseconds, milliseconds, or seconds to nanoseconds. Return 0.0 for
24+
values in the more than 10 years ago.
25+
"""
26+
if timestamp > CUTOFF_EPOCH_NS:
27+
converted_timestamp = timestamp
28+
elif timestamp > CUTOFF_EPOCH_US:
29+
converted_timestamp = timestamp * 1000.0
30+
elif timestamp > CUTOFF_EPOCH_MS:
31+
converted_timestamp = timestamp * 1000000.0
32+
elif timestamp > CUTOFF_EPOCH_S:
33+
converted_timestamp = timestamp * 1000000000.0
34+
else:
35+
return 0.0
36+
return converted_timestamp
37+
38+
39+
def track_request_queue_time(
40+
header_value: typing.Any, tracked_request: TrackedRequest
41+
) -> bool:
42+
"""
43+
Attempt to parse a queue time header and store the result in the tracked request.
44+
45+
Returns:
46+
bool: Whether we succeeded in marking queue time. Used for testing.
47+
"""
48+
if header_value.startswith("t="):
49+
header_value = header_value[2:]
50+
51+
try:
52+
first_char = header_value[0]
53+
except IndexError:
54+
return False
55+
56+
if not first_char.isdigit(): # filter out negatives, nan, inf, etc.
57+
return False
58+
59+
try:
60+
ambiguous_start_timestamp = float(header_value)
61+
except ValueError:
62+
return False
63+
64+
start_timestamp_ns = _convert_ambiguous_timestamp_to_ns(ambiguous_start_timestamp)
65+
if start_timestamp_ns == 0.0:
66+
return False
67+
68+
tr_start_timestamp_ns = datetime_to_timestamp(tracked_request.start_time) * 1e9
69+
70+
# Ignore if in the future
71+
if start_timestamp_ns > tr_start_timestamp_ns:
72+
return False
73+
74+
queue_time_ns = int(tr_start_timestamp_ns - start_timestamp_ns)
75+
tracked_request.tag("scout.queue_time_ns", queue_time_ns)
76+
return True
77+
78+
79+
def track_job_queue_time(
80+
header_value: typing.Any, tracked_request: TrackedRequest
81+
) -> bool:
82+
"""
83+
Attempt to parse a queue/latency time header and store the result in the request.
84+
85+
Returns:
86+
bool: Whether we succeeded in marking queue time for the job. Used for testing.
87+
"""
88+
if header_value is not None:
89+
now = datetime_to_timestamp(dt.datetime.utcnow()) * 1e9
90+
try:
91+
ambiguous_float_start = typing.cast(float, header_value)
92+
start = _convert_ambiguous_timestamp_to_ns(ambiguous_float_start)
93+
queue_time_ns = int(now - start)
94+
except TypeError:
95+
logger.debug("Invalid job queue time header: %r", header_value)
96+
return False
97+
else:
98+
tracked_request.tag("scout.job_queue_time_ns", queue_time_ns)
99+
return True

src/scout_apm/core/web_requests.py

Lines changed: 2 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
# coding=utf-8
22

3-
import datetime as dt
4-
import time
5-
6-
from scout_apm.compat import datetime_to_timestamp, parse_qsl, urlencode
3+
from scout_apm.compat import parse_qsl, urlencode
74
from scout_apm.core.config import scout_config
5+
from scout_apm.core.queue_time import track_request_queue_time
86

97
# Originally derived from:
108
# 1. Rails:
@@ -103,64 +101,6 @@ def ignore_path(path):
103101
return False
104102

105103

106-
def track_request_queue_time(header_value, tracked_request):
107-
if header_value.startswith("t="):
108-
header_value = header_value[2:]
109-
110-
try:
111-
first_char = header_value[0]
112-
except IndexError:
113-
return False
114-
115-
if not first_char.isdigit(): # filter out negatives, nan, inf, etc.
116-
return False
117-
118-
try:
119-
ambiguous_start_timestamp = float(header_value)
120-
except ValueError:
121-
return False
122-
123-
start_timestamp_ns = convert_ambiguous_timestamp_to_ns(ambiguous_start_timestamp)
124-
if start_timestamp_ns == 0.0:
125-
return False
126-
127-
tr_start_timestamp_ns = datetime_to_timestamp(tracked_request.start_time) * 1e9
128-
129-
# Ignore if in the future
130-
if start_timestamp_ns > tr_start_timestamp_ns:
131-
return False
132-
133-
queue_time_ns = int(tr_start_timestamp_ns - start_timestamp_ns)
134-
tracked_request.tag("scout.queue_time_ns", queue_time_ns)
135-
return True
136-
137-
138-
# Cutoff epoch is used for determining ambiguous timestamp boundaries
139-
CUTOFF_EPOCH_S = time.mktime((dt.date.today().year - 10, 1, 1, 0, 0, 0, 0, 0, 0))
140-
CUTOFF_EPOCH_MS = CUTOFF_EPOCH_S * 1000.0
141-
CUTOFF_EPOCH_US = CUTOFF_EPOCH_S * 1000000.0
142-
CUTOFF_EPOCH_NS = CUTOFF_EPOCH_S * 1000000000.0
143-
144-
145-
def convert_ambiguous_timestamp_to_ns(timestamp):
146-
"""
147-
Convert an ambiguous float timestamp that could be in nanoseconds,
148-
microseconds, milliseconds, or seconds to nanoseconds. Return 0.0 for
149-
values in the more than 10 years ago.
150-
"""
151-
if timestamp > CUTOFF_EPOCH_NS:
152-
converted_timestamp = timestamp
153-
elif timestamp > CUTOFF_EPOCH_US:
154-
converted_timestamp = timestamp * 1000.0
155-
elif timestamp > CUTOFF_EPOCH_MS:
156-
converted_timestamp = timestamp * 1000000.0
157-
elif timestamp > CUTOFF_EPOCH_S:
158-
converted_timestamp = timestamp * 1000000000.0
159-
else:
160-
return 0.0
161-
return converted_timestamp
162-
163-
164104
def asgi_track_request_data(scope, tracked_request):
165105
"""
166106
Track request data from an ASGI HTTP or Websocket scope.

src/scout_apm/django/middleware.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,9 @@
44
from django.urls import get_urlconf
55

66
from scout_apm.core.config import scout_config
7+
from scout_apm.core.queue_time import track_request_queue_time
78
from scout_apm.core.tracked_request import TrackedRequest
8-
from scout_apm.core.web_requests import (
9-
create_filtered_path,
10-
ignore_path,
11-
track_request_queue_time,
12-
)
9+
from scout_apm.core.web_requests import create_filtered_path, ignore_path
1310
from scout_apm.django.request import get_controller_name
1411

1512

src/scout_apm/falcon.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,9 @@
77

88
from scout_apm.api import install
99
from scout_apm.core.config import scout_config
10+
from scout_apm.core.queue_time import track_request_queue_time
1011
from scout_apm.core.tracked_request import TrackedRequest
11-
from scout_apm.core.web_requests import (
12-
create_filtered_path,
13-
ignore_path,
14-
track_request_queue_time,
15-
)
12+
from scout_apm.core.web_requests import create_filtered_path, ignore_path
1613

1714
logger = logging.getLogger(__name__)
1815

tests/integration/test_celery.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,9 @@ def test_hello_worker(celery_app, celery_worker, tracked_requests):
247247
assert tracked_request.tags["priority"] == 0
248248
assert tracked_request.tags["routing_key"] == "celery"
249249
assert tracked_request.tags["queue"] == "unknown"
250+
sixty_seconds = 60_000_000_000
250251
assert (
251-
0.0 <= tracked_request.tags["queue_time"] < 60.0
252+
0.0 <= tracked_request.tags["scout.job_queue_time_ns"] < sixty_seconds
252253
) # Assume test took <60 seconds
253254
assert tracked_request.active_spans == []
254255
assert len(tracked_request.complete_spans) == 1
@@ -272,7 +273,7 @@ def test_hello_worker_header_preset(celery_app, celery_worker, tracked_requests)
272273
assert len(tracked_request.complete_spans) == 1
273274
span = tracked_request.complete_spans[0]
274275
assert span.operation == "Job/tests.integration.test_celery.hello"
275-
assert "queue_time" not in span.tags
276+
assert "scout.job_queue_time_ns" not in span.tags
276277

277278

278279
@skip_unless_celery_4_plus

tests/unit/core/test_queue_time.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# coding=utf-8
2+
3+
import datetime as dt
4+
import time
5+
6+
import pytest
7+
8+
from scout_apm.compat import datetime_to_timestamp
9+
from scout_apm.core.queue_time import (
10+
CUTOFF_EPOCH_S,
11+
_convert_ambiguous_timestamp_to_ns,
12+
track_job_queue_time,
13+
track_request_queue_time,
14+
)
15+
16+
17+
@pytest.mark.parametrize("with_t", [True, False])
18+
def test_track_request_queue_time_valid(with_t, tracked_request):
19+
queue_start = int(datetime_to_timestamp(dt.datetime.utcnow())) - 2
20+
if with_t:
21+
header_value = str("t=") + str(queue_start)
22+
else:
23+
header_value = str(queue_start)
24+
25+
result = track_request_queue_time(header_value, tracked_request)
26+
27+
assert result is True
28+
queue_time_ns = tracked_request.tags["scout.queue_time_ns"]
29+
assert isinstance(queue_time_ns, int) and queue_time_ns > 0
30+
31+
32+
@pytest.mark.parametrize(
33+
"header_value",
34+
[
35+
str(""),
36+
str("t=X"), # first character not a digit
37+
str("t=0.3f"), # raises ValueError on float() conversion
38+
str(datetime_to_timestamp(dt.datetime.utcnow()) + 3600.0), # one hour in future
39+
str(datetime_to_timestamp(dt.datetime(2009, 1, 1))), # before ambig cutoff
40+
],
41+
)
42+
def test_track_request_queue_time_invalid(header_value, tracked_request):
43+
result = track_request_queue_time(header_value, tracked_request)
44+
45+
assert result is False
46+
assert "scout.queue_time_ns" not in tracked_request.tags
47+
48+
49+
@pytest.mark.parametrize("with_t", [True, False])
50+
def test_track_job_queue_time_valid(with_t, tracked_request):
51+
queue_start = datetime_to_timestamp(dt.datetime.utcnow()) - 2.0
52+
result = track_job_queue_time(queue_start, tracked_request)
53+
54+
assert result is True
55+
queue_time_ns = tracked_request.tags["scout.job_queue_time_ns"]
56+
assert isinstance(queue_time_ns, int) and queue_time_ns > 0
57+
58+
59+
@pytest.mark.parametrize(
60+
"header_value",
61+
[
62+
str(""),
63+
str("123"),
64+
str(datetime_to_timestamp(dt.datetime.utcnow()) + 3600.0), # one hour in future
65+
str(datetime_to_timestamp(dt.datetime(2009, 1, 1))), # before ambig cutoff
66+
],
67+
)
68+
def test_track_job_queue_time_invalid(header_value, tracked_request):
69+
result = track_job_queue_time(header_value, tracked_request)
70+
71+
assert result is False
72+
assert "scout.job_queue_time_ns" not in tracked_request.tags
73+
74+
75+
ref_time_s = time.mktime((2019, 6, 1, 0, 0, 0, 0, 0, 0))
76+
77+
78+
@pytest.mark.parametrize(
79+
"given,expected",
80+
[
81+
(ref_time_s, ref_time_s * 1e9),
82+
(ref_time_s * 1e3, ref_time_s * 1e9),
83+
(ref_time_s * 1e6, ref_time_s * 1e9),
84+
(CUTOFF_EPOCH_S + 10, (CUTOFF_EPOCH_S + 10) * 1e9),
85+
(0.0, 0.0),
86+
(1000.0, 0.0),
87+
(float("inf"), float("inf")),
88+
(float("-inf"), 0.0),
89+
(float("nan"), 0.0),
90+
],
91+
)
92+
def test_convert_ambiguous_timestamp_to_ns(given, expected):
93+
assert _convert_ambiguous_timestamp_to_ns(given) == expected

0 commit comments

Comments
 (0)