Skip to content
Merged
14 changes: 13 additions & 1 deletion synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ def __init__(
self._request: Optional[Request] = request
self._iterator = iterator
self._paused = False
self.tracing_scope = start_active_span(
"write_bytes_to_request",
)
self.tracing_scope.__enter__()

try:
self._request.registerProducer(self, True)
Expand All @@ -712,8 +716,8 @@ def __init__(
logger.info("Connection disconnected before response was written: %r", e)

# We drop our references to data we'll not use.
self._request = None
self._iterator = iter(())
self.tracing_scope.__exit__(type(e), None, e.__traceback__)
else:
# Start producing if `registerProducer` was successful
self.resumeProducing()
Expand All @@ -727,6 +731,9 @@ def _send_data(self, data: List[bytes]) -> None:
self._request.write(b"".join(data))

def pauseProducing(self) -> None:
opentracing_span = active_span()
if opentracing_span is not None:
opentracing_span.log_kv({"event": "producer_paused"})
self._paused = True

def resumeProducing(self) -> None:
Expand All @@ -737,6 +744,10 @@ def resumeProducing(self) -> None:

self._paused = False

opentracing_span = active_span()
if opentracing_span is not None:
opentracing_span.log_kv({"event": "producer_resumed"})

# Write until there's backpressure telling us to stop.
while not self._paused:
# Get the next chunk and write it to the request.
Expand Down Expand Up @@ -771,6 +782,7 @@ def resumeProducing(self) -> None:
def stopProducing(self) -> None:
# Clear a circular reference.
self._request = None
self.tracing_scope.__exit__(None, None, None)


def _encode_json_bytes(json_object: object) -> bytes:
Expand Down
Loading