Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18804.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Instrument `_ByteProducer` with tracing to measure potential dead time while writing bytes to the request.
14 changes: 13 additions & 1 deletion synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ def __init__(
self._request: Optional[Request] = request
self._iterator = iterator
self._paused = False
self.tracing_scope = start_active_span(
"write_bytes_to_request",
)
self.tracing_scope.__enter__()

try:
self._request.registerProducer(self, True)
Expand All @@ -712,8 +716,8 @@ def __init__(
logger.info("Connection disconnected before response was written: %r", e)

# We drop our references to data we'll not use.
self._request = None
self._iterator = iter(())
self.tracing_scope.__exit__(type(e), None, e.__traceback__)
else:
# Start producing if `registerProducer` was successful
self.resumeProducing()
Expand All @@ -727,6 +731,9 @@ def _send_data(self, data: List[bytes]) -> None:
self._request.write(b"".join(data))

def pauseProducing(self) -> None:
opentracing_span = active_span()
if opentracing_span is not None:
opentracing_span.log_kv({"event": "producer_paused"})
self._paused = True

def resumeProducing(self) -> None:
Expand All @@ -737,6 +744,10 @@ def resumeProducing(self) -> None:

self._paused = False

opentracing_span = active_span()
if opentracing_span is not None:
opentracing_span.log_kv({"event": "producer_resumed"})

# Write until there's backpressure telling us to stop.
while not self._paused:
# Get the next chunk and write it to the request.
Expand Down Expand Up @@ -771,6 +782,7 @@ def resumeProducing(self) -> None:
def stopProducing(self) -> None:
# Clear a circular reference.
self._request = None
self.tracing_scope.__exit__(None, None, None)


def _encode_json_bytes(json_object: object) -> bytes:
Expand Down
Loading