-
Notifications
You must be signed in to change notification settings - Fork 806
fix(openai): Adds responses api streaming support #3396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add ResponseStream class to wrap streaming responses - Create OpenTelemetry spans for streaming responses - Handle both sync and async streaming - Add comprehensive test coverage including test for issue traceloop#3395 The responses API was returning raw Stream objects without any instrumentation, causing no spans to be generated. This fix wraps streams in a ResponseStream class that manages span lifecycle. Fixes traceloop#3395
- Match ChatStream architecture: create spans before streaming starts - Add comprehensive metrics support (tokens, duration, time-to-first-token) - Implement proper garbage collection with __del__ method - Add @dont_throw decorators for safe cleanup - Pass metric instruments from v1/__init__.py to wrapper functions - Clean up duplicate exception handling code - Consolidate test_issue_3395.py into test_responses_streaming.py
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a responses-specific telemetry wrapper factory and threads six telemetry metric objects through v1 responses wrappers; implements ResponseStream proxy classes to manage streaming spans, metrics, and traced data for sync/async flows; updates wrapper signatures/decorators and adds tests for streaming traces and attributes. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App
participant OpenAI as OpenAI Client
participant Wrapper as responses_*_wrapper
participant OTel as OTel Tracer/Metrics
participant Stream as ResponseStream
App->>OpenAI: Responses.create/retrieve(...)
OpenAI->>Wrapper: instrumentation wrapper invoked (tracer + metrics)
Wrapper->>OTel: start CLIENT span (openai.response)
Wrapper->>OpenAI: call underlying API
alt non-streaming
OpenAI-->>Wrapper: response object
Wrapper->>OTel: record metrics & attrs, end span
Wrapper-->>App: response
else streaming
OpenAI-->>Wrapper: streaming iterator/async iterator
Wrapper->>Stream: wrap iterator with ResponseStream (attach span/telemetry)
Wrapper-->>App: ResponseStream proxy
loop each chunk
App->>Stream: iterate/read chunk
Stream->>OTel: update TTFT/T2G, counters, traced data
end
Stream->>OTel: end span on close/exit/error
end
sequenceDiagram
autonumber
actor App
participant AsyncOpenAI as Async OpenAI Client
participant AWrapper as async_responses_*_wrapper
participant OTel as OTel Tracer/Metrics
participant AStream as ResponseStream (async)
App->>AsyncOpenAI: await Responses.create/retrieve(stream=True)
AsyncOpenAI->>AWrapper: instrumentation wrapper invoked (tracer + metrics)
AWrapper->>OTel: start CLIENT span (openai.response)
AWrapper->>AsyncOpenAI: await API call
AsyncOpenAI-->>AWrapper: async streaming object
AWrapper->>AStream: wrap and return ResponseStream proxy
App->>AStream: async iterate chunks
loop chunks
AStream->>OTel: update timing/metrics and traced data
end
AStream->>OTel: end span on aclose/exit/error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Disabled knowledge base sources:
📒 Files selected for processing (2)
🧰 Additional context used📓 Path-based instructions (1)**/*.py📄 CodeRabbit inference engine (CLAUDE.md)
Files:
🧬 Code graph analysis (2)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (4)
🪛 Ruff (0.13.3)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py291-291: Do not catch blind exception: (BLE001) 307-307: Do not catch blind exception: (BLE001) 431-431: Do not catch blind exception: (BLE001) 526-526: Do not catch blind exception: (BLE001) 535-535: Do not catch blind exception: (BLE001) 🔇 Additional comments (23)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Changes requested ❌
Reviewed everything up to 104d842 in 2 minutes and 33 seconds. Click for details.
- Reviewed
921
lines of code in4
files - Skipped
0
files when reviewing. - Skipped posting
4
draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py:115
- Draft comment:
Consider adding a brief docstring to _with_responses_telemetry_wrapper describing its parameters and intended usage, similar to the other telemetry wrappers. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 20% vs. threshold = 50% The comment suggests adding documentation, which could be helpful. However, none of the other similar wrapper functions have docstrings, so this would be inconsistent with the codebase's style. The function's purpose is fairly clear from its name and parameters. This seems like a nice-to-have rather than a necessary change. The code could be harder to maintain without documentation. Future developers might appreciate having the docstring. While documentation is good, we should maintain consistency with the codebase's existing patterns. If docstrings are needed, it should be done as a separate documentation effort for all wrapper functions. The comment should be deleted as it suggests deviating from the established pattern in the codebase and isn't highlighting a critical issue that needs to be fixed.
2. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py:842
- Draft comment:
For non-stream responses, a new span is started when parsed_response.status == 'completed', but the initial span created earlier is never ended. This may lead to orphaned spans. Consider reusing and ending the original span instead of starting a new one. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
3. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py:888
- Draft comment:
Similarly, in async_responses_get_or_create_wrapper, the initial span is not ended on non-stream responses, which could lead to orphaned spans. Verify if this is intended or if the original span should be ended. - Reason this comment was not posted:
Comment looked like it was already resolved.
4. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py:745
- Draft comment:
There is an inconsistency in time measurements: start_time is obtained with time.time() (seconds) but traced_data.start_time is set using time.time_ns() (nanoseconds). Please ensure consistency in the time units used for span timing. - Reason this comment was not posted:
Comment looked like it was already resolved.
Workflow ID: wflow_TsHL5ITqJWjyuIGW
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
...lemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (1)
712-790
: End the request span on the non-streaming success pathWe start a CLIENT span before invoking
wrapped
, but on the non-streaming path we never close it—we return after buildingTracedData
, and even reassignspan
whenstatus == "completed"
, leaving the original span open forever. That leaks spans, drops duration metrics, and makes exporters flush orphaned spans. The async wrapper has the same issue. Please make sure every non-streaming exit (happy path and instrumentation fallback) sets an OK status and ends the original span.@@ - if isinstance(response, Stream): + if isinstance(response, Stream): ... return ResponseStream(...) @@ - except Exception: - return response + except Exception: + if span.is_recording(): + span.set_status(Status(StatusCode.OK)) + span.end() + return response @@ - if parsed_response.status == "completed": - span = tracer.start_span( + if parsed_response.status == "completed": + completed_span = tracer.start_span( SPAN_NAME, kind=SpanKind.CLIENT, start_time=int(traced_data.start_time), ) - set_data_attributes(traced_data, span) - span.end() - - return response + set_data_attributes(traced_data, completed_span) + completed_span.end() + + if span.is_recording(): + span.set_status(Status(StatusCode.OK)) + span.end() + + return response @@ - if isinstance(response, (Stream, AsyncStream)): + if isinstance(response, (Stream, AsyncStream)): ... return ResponseStream(...) @@ - except Exception: - return response + except Exception: + if span.is_recording(): + span.set_status(Status(StatusCode.OK)) + span.end() + return response @@ - if parsed_response.status == "completed": - span = tracer.start_span( + if parsed_response.status == "completed": + completed_span = tracer.start_span( SPAN_NAME, kind=SpanKind.CLIENT, start_time=int(traced_data.start_time), ) - set_data_attributes(traced_data, span) - span.end() - - return response + set_data_attributes(traced_data, completed_span) + completed_span.end() + + if span.is_recording(): + span.set_status(Status(StatusCode.OK)) + span.end() + + return responseAlso applies to: 842-850, 999-1007
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py
(1 hunks)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py
(2 hunks)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
(7 hunks)packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py
: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
🧬 Code graph analysis (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (3)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (1)
wrapper
(418-431)packages/opentelemetry-instrumentation-llamaindex/opentelemetry/instrumentation/llamaindex/utils.py (2)
wrapper
(18-19)wrapper
(48-58)packages/opentelemetry-instrumentation-transformers/opentelemetry/instrumentation/transformers/utils.py (2)
wrapper
(21-22)wrapper
(39-49)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (3)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (3)
_with_responses_telemetry_wrapper
(116-143)dont_throw
(162-190)should_send_prompts
(207-210)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (2)
_ensure_cleanup
(783-818)_process_complete_response
(737-780)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes
(64-261)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (2)
responses_get_or_create_wrapper
(695-850)async_responses_get_or_create_wrapper
(855-1007)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py (1)
packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (4)
export
(45-51)InMemorySpanExporter
(22-61)clear
(35-38)get_finished_spans
(40-43)
🪛 Ruff (0.13.1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
260-260: Do not catch blind exception: Exception
(BLE001)
276-276: Do not catch blind exception: Exception
(BLE001)
370-371: try
-except
-pass
detected, consider logging the exception
(S110)
370-370: Do not catch blind exception: Exception
(BLE001)
377-377: Do not catch blind exception: Exception
(BLE001)
433-433: Do not catch blind exception: Exception
(BLE001)
442-442: Do not catch blind exception: Exception
(BLE001)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
10-10: Unused function argument: instrument_legacy
(ARG001)
71-71: Unused function argument: instrument_legacy
(ARG001)
114-114: Unused function argument: instrument_legacy
(ARG001)
114-114: Unused function argument: openai_client
(ARG001)
161-161: Unused function argument: instrument_legacy
(ARG001)
194-194: Unused function argument: instrument_legacy
(ARG001)
231-231: Unused function argument: instrument_legacy
(ARG001)
246-246: Loop control variable chunk
not used within loop body
Rename unused chunk
to _chunk
(B007)
249-250: try
-except
-pass
detected, consider logging the exception
(S110)
249-249: Do not catch blind exception: Exception
(BLE001)
...lemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
Show resolved
Hide resolved
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
Outdated
Show resolved
Hide resolved
…t_responses_streaming.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py
: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
🧬 Code graph analysis (1)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py (1)
packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (4)
export
(45-51)InMemorySpanExporter
(22-61)clear
(35-38)get_finished_spans
(40-43)
🪛 Flake8 (7.2.0)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
[error] 254-254: IndentationError: unindent does not match any outer indentation level
(E999)
🪛 Ruff (0.13.1)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
254-254: unindent does not match any outer indentation level
(invalid-syntax)
255-255: Unexpected indentation
(invalid-syntax)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
Outdated
Show resolved
Hide resolved
- Record exceptions before cleanup to prevent loss of error details - Add _error_recorded flag to track error state - Update cleanup methods to preserve ERROR status when set - Ensure both sync and async paths follow same exception handling order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py (1)
114-119
: Consider using an async client fixture for consistency.The test creates a new
AsyncOpenAI()
client directly, making theopenai_client
parameter unused. For consistency with other tests and to leverage fixture configuration (API keys, base URLs, etc.), consider using anasync_openai_client
fixture if available.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
(7 hunks)packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py
: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
🧬 Code graph analysis (2)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py (1)
packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (4)
export
(45-51)InMemorySpanExporter
(22-61)clear
(35-38)get_finished_spans
(40-43)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (2)
metric_shared_attributes
(364-377)_get_openai_base_url
(275-281)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (3)
_with_responses_telemetry_wrapper
(116-143)dont_throw
(162-190)should_send_prompts
(207-210)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (3)
_ensure_cleanup
(783-818)_process_complete_response
(737-780)_shared_attributes
(726-734)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes
(64-261)
🪛 Ruff (0.13.1)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py
10-10: Unused function argument: instrument_legacy
(ARG001)
71-71: Unused function argument: instrument_legacy
(ARG001)
114-114: Unused function argument: instrument_legacy
(ARG001)
114-114: Unused function argument: openai_client
(ARG001)
161-161: Unused function argument: instrument_legacy
(ARG001)
194-194: Unused function argument: instrument_legacy
(ARG001)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
263-263: Do not catch blind exception: Exception
(BLE001)
279-279: Do not catch blind exception: Exception
(BLE001)
371-372: try
-except
-pass
detected, consider logging the exception
(S110)
371-371: Do not catch blind exception: Exception
(BLE001)
378-378: Do not catch blind exception: Exception
(BLE001)
458-458: Do not catch blind exception: Exception
(BLE001)
467-467: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (15)
packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py (4)
9-68
: LGTM! Comprehensive test for issue #3395.The test correctly reproduces the reported issue and verifies that streaming responses now emit spans with proper attributes. The
instrument_legacy
fixture is required for test setup despite the Ruff warning.
70-110
: LGTM! Solid streaming test with proper attribute verification.The test correctly verifies span creation, attributes, and content accumulation for streaming responses. The nested content extraction logic appropriately handles the response structure.
159-190
: LGTM! Context manager test ensures proper span lifecycle.The test correctly verifies that spans are created and finalized after the context manager exits, which is critical for proper resource cleanup and telemetry.
192-226
: LGTM! Excellent test for span nesting and context propagation.This test verifies that streaming instrumentation correctly integrates with parent spans created via
start_as_current_span
, which is essential for distributed tracing scenarios.packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (11)
2-2
: LGTM! Import additions support streaming telemetry.The new imports (logging, threading, trace context management, ObjectProxy) are all necessary for the ResponseStream implementation and proper span lifecycle management.
Also applies to: 5-5, 55-59, 64-65, 69-69, 77-77
196-248
: LGTM! Well-structured streaming wrapper initialization.The
ResponseStream
class appropriately wraps the underlying stream usingObjectProxy
and initializes all necessary telemetry components. The_error_recorded
flag and cleanup lock ensure thread-safe error handling and cleanup.
249-284
: LGTM! Context manager protocol correctly implemented.Both sync and async context manager methods properly delegate to the wrapped stream and ensure cleanup. The bare exception catches in lines 263 and 279 are appropriate here to prevent cleanup errors from masking original exceptions.
289-321
: LGTM! Iterator error handling correctly ordered.The exception handling properly records the error and sets span status before calling
_ensure_cleanup()
, addressing the previous review concern. The_error_recorded
flag ensures the error status is preserved during cleanup.
323-380
: LGTM! Chunk processing handles streaming data appropriately.The method correctly updates
TracedData
from chunks and records time-to-first-token metrics. The exception handling at lines 371-372 (silent fallback for text extraction) and 378-379 (logged errors) are appropriate for best-effort streaming data collection.
381-390
: LGTM! Shared attributes helper is straightforward.The method correctly builds metric attributes consistent with the OpenTelemetry conventions and aligns with the existing ChatStream pattern.
392-437
: LGTM! Complete response processing correctly finalizes telemetry.The method properly:
- Records all streaming metrics (tokens, choices, duration, time-to-generate)
- Preserves error status when
_error_recorded
is true (line 431)- Sets span attributes and ends the span
- Marks cleanup as completed
This addresses the previous review concern about error status preservation.
438-469
: LGTM! Cleanup logic is thread-safe and preserves error states.The method correctly:
- Uses a lock to prevent race conditions
- Checks
_error_recorded
before setting OK status (line 450)- Handles cleanup failures gracefully with nested exception handling
- Ensures
_cleanup_completed
is always set to prevent infinite loopsThe bare exception catches (lines 458, 467) are appropriate for cleanup safety.
717-811
: LGTM! Wrapper correctly handles streaming and error paths.The sync wrapper properly:
- Creates a CLIENT span with manual lifecycle control (
end_on_exit=False
)- Records metrics and exceptions on error (lines 745-759)
- Returns a
ResponseStream
for streaming responses with all telemetry parameters (lines 799-811)- Uses
time.time()
for metrics (nottime.time_ns()
) which is consistent
874-964
: LGTM! Async wrapper mirrors sync implementation correctly.The async wrapper properly:
- Mirrors the sync wrapper's span creation and error handling
- Checks for both
Stream
andAsyncStream
(line 918) which correctly handles async streaming responses- Returns a
ResponseStream
that supports both sync and async iteration protocols- Maintains consistency in telemetry recording
812-871
: LGTM! Non-streaming response handling is complete.The non-streaming path correctly:
- Parses the response and merges with existing TracedData
- Creates a span with the correct start time when status is "completed"
- Sets all attributes and ends the span
- Handles exceptions gracefully by returning the raw response
…on support The ResponseStream class was missing the __aiter__ method which is required for async iteration with 'async for' loops. This caused TypeError when trying to iterate over streaming responses asynchronously. Added __aiter__ method that returns self, following the standard async iterator protocol pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (1)
342-402
: Consider logging exceptions in chunk processingThe
_process_chunk
method accumulates streaming data and metrics correctly. However, the try-except-pass block at lines 393-394 and the generic exception handler at line 400 silently suppress errors during chunk processing.While the current approach prevents streaming interruption, consider whether certain chunk processing errors should be logged for debugging:
except Exception: - pass + logger.debug("Error extracting output text from chunk: %s", traceback.format_exc())And for the outer exception handler:
except Exception as e: - logger.debug("Error processing response chunk: %s", e) + logger.debug("Error processing response chunk: %s", traceback.format_exc())This provides more diagnostic context while still preventing streaming failures.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
(12 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py
: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
🧬 Code graph analysis (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (2)
metric_shared_attributes
(364-377)_get_openai_base_url
(275-281)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (4)
_with_responses_telemetry_wrapper
(116-143)_with_tracer_wrapper
(146-153)dont_throw
(162-190)should_send_prompts
(207-210)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (3)
_ensure_cleanup
(783-818)_process_complete_response
(737-780)_shared_attributes
(726-734)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes
(64-261)
🪛 Ruff (0.13.2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
279-279: Do not catch blind exception: Exception
(BLE001)
295-295: Do not catch blind exception: Exception
(BLE001)
393-394: try
-except
-pass
detected, consider logging the exception
(S110)
393-393: Do not catch blind exception: Exception
(BLE001)
400-400: Do not catch blind exception: Exception
(BLE001)
486-486: Do not catch blind exception: Exception
(BLE001)
495-495: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (22)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (22)
1-77
: LGTM: Imports and module setupThe new imports properly support streaming telemetry functionality. The addition of
logging
,threading
, OpenTelemetry tracing components,ObjectProxy
, and the telemetry wrapper are all appropriately used throughout the file.
99-106
: LGTM: Input processing helperThe
process_input
function properly normalizes input parameters and ensures proper type annotations for list items.
157-164
: LGTM: Response parsing helperThe
parse_response
function correctly handles both legacy and modern OpenAI response formats.
167-182
: LGTM: Tool extraction helperThe
get_tools_from_kwargs
function properly extracts and converts function tool definitions from request kwargs.
185-209
: LGTM: Content block normalizationThe
process_content_block
function appropriately normalizes different content types (text, image, file) to a standard format.
212-264
: LGTM: ResponseStream initializationThe class is properly structured as an
ObjectProxy
wrapper with appropriate initialization of telemetry metrics, span, and cleanup state. The cleanup lock and error tracking flags are correctly initialized for thread-safe operation.
265-267
: LGTM: Garbage collection cleanupThe
__del__
method ensures span cleanup when the object is garbage collected, preventing resource leaks.
269-300
: LGTM: Context manager implementationBoth sync and async context manager methods are properly implemented. They delegate to the wrapped stream's context management and ensure cleanup occurs on exit, with appropriate exception logging.
302-306
: LGTM: Iterator protocolThe iterator protocol methods correctly delegate to
self
for proper iteration behavior.
308-340
: LGTM: Streaming iteration with proper error handlingBoth
__next__
and__anext__
correctly handle streaming completion and errors. Based on past review comments, the error recording now properly occurs before cleanup (lines 316-318 and 333-335), ensuring error spans are correctly captured with exception details and ERROR status.
403-412
: LGTM: Shared attributes constructionThe
_shared_attributes
method correctly builds metric attributes using the common pattern with proper model fallback and streaming flag.
414-461
: LGTM: Complete response processingThe
_process_complete_response
method correctly:
- Sets span attributes from traced data
- Records token metrics with proper token type attributes
- Records choice/block count metrics
- Records duration and time-to-generate metrics
- Respects the
_error_recorded
flag to avoid overwriting error status (line 456)- Properly ends the span and marks cleanup complete
463-497
: LGTM: Thread-safe cleanup implementationThe
_ensure_cleanup
method is well-designed:
- Thread-safe via
_cleanup_lock
- Prevents double cleanup with
_cleanup_completed
flag- Respects
_error_recorded
flag to avoid overwriting error status- Nested exception handling ensures cleanup always completes even if span operations fail
- Appropriate debug logging throughout
The broad exception catching at lines 486 and 495 is intentional and correct for cleanup fallback logic.
499-747
: LGTM: Comprehensive span attribute settingThe
set_data_attributes
function properly sets all relevant OpenTelemetry span attributes from the traced response data, including:
- System, model, and response ID
- Usage statistics (tokens, cache usage, reasoning tokens)
- Reasoning attributes (summary and effort)
- Tool/function definitions
- Prompts and completions with proper role handling
- Various content types (messages, computer calls, tool calls)
The logic correctly handles both dict-style and object-style usage data and processes content blocks appropriately.
751-795
: LGTM: Sync wrapper with proper span lifecycle and error handlingThe
responses_get_or_create_wrapper
correctly:
- Uses the new
@_with_responses_telemetry_wrapper
decorator to accept telemetry metrics- Creates a CLIENT span and uses
trace.use_span
withend_on_exit=False
for manual lifecycle management- Records duration and exception metrics on errors
- Sets error type attribute and records exception on the span
- Properly ends the span after recording error details
797-847
: LGTM: Streaming response handlingThe streaming path correctly:
- Detects
Stream
responses- Retrieves existing data if
response_id
is provided- Constructs a comprehensive
TracedData
object with all request parameters and reasoning attributes- Returns a
ResponseStream
with all telemetry metrics and state needed for streaming span management
848-907
: LGTM: Non-streaming response handlingThe non-streaming path correctly:
- Parses the response and merges with existing data
- Constructs complete
TracedData
with output blocks, usage, and reasoning attributes- Only creates and closes a span if the response status is "completed"
- Uses the original start_time from TracedData for accurate span timing
912-956
: LGTM: Async wrapper with proper span lifecycle and error handlingThe
async_responses_get_or_create_wrapper
mirrors the sync version correctly, with proper async/await usage and identical span lifecycle management and error handling.
958-1004
: LGTM: Async streaming response handlingThe async streaming path correctly handles both
Stream
andAsyncStream
types and constructs theResponseStream
wrapper with all necessary telemetry components.
1005-1065
: LGTM: Async non-streaming response handlingThe async non-streaming path correctly mirrors the sync version with proper async operations and span management.
1068-1093
: LGTM: Sync cancel wrapperThe
responses_cancel_wrapper
correctly:
- Uses
@_with_tracer_wrapper
(appropriate since cancel doesn't need full telemetry metrics)- Pops the response from the global
responses
dict- Creates a span with the original start time and records a cancellation exception
- Sets span attributes and ends the span
1098-1123
: LGTM: Async cancel wrapperThe
async_responses_cancel_wrapper
correctly mirrors the sync version with proper async operations.
The issue was that Python's async iteration protocol checker looks for __aiter__ at the type level, and wrapt's ObjectProxy metaclass magic was interfering with this check. Fixed by: - Creating ResponseStreamBase class with __aiter__ and __anext__ methods - Using multiple inheritance: ResponseStream(ResponseStreamBase, ObjectProxy) - This ensures the async iteration protocol methods are visible to Python's type system while still maintaining ObjectProxy functionality This resolves the 'async for requires an object with __aiter__ method' error when using ResponseStream with async iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (1)
9-9
: Remove unusedinspect
import.Static analysis indicates the
inspect
module is imported but never used in this file.-import inspect
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
(12 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py
: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
🧬 Code graph analysis (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (2)
metric_shared_attributes
(364-377)_get_openai_base_url
(275-281)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (3)
_with_responses_telemetry_wrapper
(116-143)dont_throw
(162-190)should_send_prompts
(207-210)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (3)
_ensure_cleanup
(783-818)_process_complete_response
(737-780)_shared_attributes
(726-734)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes
(64-261)
🪛 Flake8 (7.3.0)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
[error] 9-9: 'inspect' imported but unused
(F401)
🪛 Ruff (0.13.2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
291-291: Do not catch blind exception: Exception
(BLE001)
307-307: Do not catch blind exception: Exception
(BLE001)
402-403: try
-except
-pass
detected, consider logging the exception
(S110)
402-402: Do not catch blind exception: Exception
(BLE001)
409-409: Do not catch blind exception: Exception
(BLE001)
495-495: Do not catch blind exception: Exception
(BLE001)
504-504: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (9)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (9)
100-210
: LGTM! Helper functions are well-documented.The updated docstrings for
process_input
,parse_response
,get_tools_from_kwargs
, andprocess_content_block
clearly explain their purposes and behavior.
213-276
: LGTM! ResponseStream class is well-structured.The class properly inherits from both
ResponseStreamBase
andObjectProxy
, initializes all necessary state including synchronization primitives, and correctly tracks error state with the_error_recorded
flag.
277-313
: LGTM! Context manager lifecycle is properly implemented.Both sync and async context manager protocols are correctly implemented. The broad exception handling in cleanup (lines 291, 307) is appropriate here to prevent cleanup errors from masking the original exception.
314-349
: LGTM! Iteration error handling is correctly implemented.The iteration methods now properly record exceptions and set ERROR status before calling cleanup (lines 324-328, 341-345), ensuring streaming errors are captured with the
_error_recorded
flag. This addresses the previous review feedback.
351-421
: LGTM! Chunk processing logic is sound.The
_process_chunk
method correctly accumulates streaming data, records metrics, and updates TracedData. The try-except-pass at lines 402-403 is acceptable here as output text extraction is best-effort and shouldn't break streaming.
423-506
: LGTM! Cleanup logic correctly preserves error state.Both
_process_complete_response
and_ensure_cleanup
properly check_error_recorded
before setting OK status (lines 465, 487), ensuring error details are preserved during cleanup. This correctly implements the previous review feedback.
758-916
: LGTM! Wrapper correctly handles both streaming and non-streaming flows.The
responses_get_or_create_wrapper
properly:
- Uses the new
@_with_responses_telemetry_wrapper
decorator with six telemetry parameters- Creates CLIENT spans with proper lifecycle management
- Records exceptions and metrics on errors (lines 790-804)
- Returns
ResponseStream
for streaming responses with all necessary context- Handles non-streaming responses with span creation only when status is "completed" (lines 907-914)
919-1074
: LGTM! Async wrapper mirrors sync implementation correctly.The
async_responses_get_or_create_wrapper
correctly implements the async version with the same telemetry, error handling, and span lifecycle patterns as the sync wrapper. Line 967 properly checks for bothStream
andAsyncStream
types.
1077-1133
: LGTM! Cancel wrappers are well-documented.Both sync and async cancel wrappers now have clear docstrings explaining their purpose and behavior.
The output text was missing from spans because we were replacing it with each chunk instead of accumulating it. Fixed by: - Changed _process_chunk to accumulate output_text with += instead of = - Iterate through all content items in output blocks to capture all text - Initialize output_text as None and only create empty string when needed - Fixed indentation issues in set_data_attributes for output_blocks iteration - Added safety check for output_blocks being None before iteration This ensures the complete response text is captured in the span attributes for streaming responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (2)
9-9
: Remove unused import.The
inspect
module is imported but never used in this file.Apply this diff:
-import inspect
405-406
: Log exceptions in try-except blocks for debugging.This try-except-pass silently swallows exceptions when extracting text from output blocks. Consider adding a debug log statement to help troubleshoot issues during streaming.
Apply this diff:
except Exception: - pass + logger.debug("Error extracting text from output blocks: %s", traceback.format_exc())Note: You'll need to add
import traceback
at the top of the file if not already present.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
(14 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py
: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
🧬 Code graph analysis (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (2)
metric_shared_attributes
(364-377)_get_openai_base_url
(275-281)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (3)
_with_responses_telemetry_wrapper
(116-143)dont_throw
(162-190)should_send_prompts
(207-210)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (3)
_ensure_cleanup
(783-818)_process_complete_response
(737-780)_shared_attributes
(726-734)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes
(64-261)
🪛 Flake8 (7.3.0)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
[error] 9-9: 'inspect' imported but unused
(F401)
🪛 Ruff (0.13.2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
291-291: Do not catch blind exception: Exception
(BLE001)
307-307: Do not catch blind exception: Exception
(BLE001)
405-406: try
-except
-pass
detected, consider logging the exception
(S110)
405-405: Do not catch blind exception: Exception
(BLE001)
412-412: Do not catch blind exception: Exception
(BLE001)
498-498: Do not catch blind exception: Exception
(BLE001)
507-507: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (9)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (9)
100-107
: LGTM!The helper functions have clear docstrings and handle input normalization correctly. The content block processing properly normalizes different content types to a standard format.
Also applies to: 158-165, 168-183, 186-210
224-276
: LGTM!The
ResponseStream
class is well-structured with proper initialization of all telemetry metrics and state tracking. The use ofObjectProxy
fromwrapt
ensures transparent proxying of the underlying stream object, and initialization properly stores the TracedData in the global responses dict.
281-312
: LGTM!The context manager implementations correctly delegate to the wrapped stream's context manager methods and ensure cleanup runs on exit. The bare exception handlers in cleanup are appropriate here since they log errors without masking the original exception being propagated.
317-349
: LGTM!The iteration methods correctly handle both normal completion (StopIteration/StopAsyncIteration) and error cases. The error handling flow properly records exceptions and sets ERROR status before calling cleanup, ensuring error information is preserved. This addresses the concerns from previous reviews.
426-473
: LGTM!The
_process_complete_response
method correctly records all telemetry metrics (tokens, choices, duration, time-to-generate) and sets span attributes. It properly respects the_error_recorded
flag to avoid overwriting error status with OK.
512-760
: LGTM!The
set_data_attributes
function comprehensively sets all relevant OpenTelemetry span attributes from the traced response data. It properly handles different content types, tool calls, reasoning blocks, and usage metrics. The logic for processing output_blocks correctly handles all supported block types (function_call, file_search_call, web_search_call, computer_call, reasoning).
763-858
: LGTM!The
responses_get_or_create_wrapper
correctly handles both streaming and non-streaming responses:
- Exception handling properly records metrics, sets error status, and ends the span before re-raising
- Streaming responses are wrapped in
ResponseStream
with all telemetry metrics- Non-streaming responses are processed with proper data merging and span creation for completed responses
The use of
trace.use_span(span, end_on_exit=False)
ensures proper context propagation while maintaining manual span lifecycle control.
922-1015
: LGTM!The async wrapper
async_responses_get_or_create_wrapper
correctly mirrors the sync wrapper's behavior with proper async/await handling. It properly checks for bothStream
andAsyncStream
types when detecting streaming responses at line 969.
1080-1134
: LGTM!The cancel wrappers correctly handle response cancellation by:
- Popping the existing tracked data from the responses dict
- Creating a span with the original start time
- Recording a cancellation exception
- Setting all relevant attributes before ending the span
The async variant properly mirrors the sync implementation.
...lemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
Outdated
Show resolved
Hide resolved
Added extensive debug logging throughout the response streaming pipeline to diagnose issues with missing input, output, and token usage data. Debug logging added to: - process_input: Log input type and value - _process_chunk: Log chunk details, text accumulation, and usage capture - _process_complete_response: Log final state before setting span attributes - set_data_attributes: Log what's being set as prompts and completions - Stream initialization: Log kwargs and processed input data This will help identify where data is being lost in the streaming pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (2)
878-937
: Fix time unit mismatch in TracedData initialization.The
TracedData.start_time
field stores nanoseconds (fromtime.time_ns()
at lines 838, 1001), but the non-streaming path falls back tostart_time
in seconds when no existing data is found:traced_data = TracedData( start_time=existing_data.get("start_time", start_time), # Mixes nanoseconds with seconds! ... )If
existing_data
is None, this usesstart_time
(line 806:time.time()
in seconds) instead of nanoseconds, breaking the unit consistency.Apply this diff:
traced_data = TracedData( - start_time=existing_data.get("start_time", start_time), + start_time=existing_data.get("start_time", time.time_ns()), response_id=parsed_response.id,
1039-1099
: Same time unit mismatch as sync wrapper.This has the same issue as the sync wrapper at line 900. When
existing_data
is None,start_time
(in seconds from line 969) is used instead of nanoseconds forTracedData.start_time
.Apply this diff:
traced_data = TracedData( - start_time=existing_data.get("start_time", start_time), + start_time=existing_data.get("start_time", time.time_ns()), response_id=parsed_response.id,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
(13 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py
: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
🧬 Code graph analysis (1)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (2)
metric_shared_attributes
(364-377)_get_openai_base_url
(275-281)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (4)
_with_responses_telemetry_wrapper
(116-143)_with_tracer_wrapper
(146-153)dont_throw
(162-190)should_send_prompts
(207-210)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (3)
_ensure_cleanup
(783-818)_process_complete_response
(737-780)_shared_attributes
(726-734)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes
(64-261)
🪛 Flake8 (7.3.0)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
[error] 9-9: 'inspect' imported but unused
(F401)
🪛 Ruff (0.13.2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
292-292: Do not catch blind exception: Exception
(BLE001)
308-308: Do not catch blind exception: Exception
(BLE001)
410-410: Do not catch blind exception: Exception
(BLE001)
417-417: Do not catch blind exception: Exception
(BLE001)
504-504: Do not catch blind exception: Exception
(BLE001)
513-513: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (11)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py (11)
100-108
: LGTM!The helper functions are well-documented with clear docstrings and appropriate logging for debugging. The logic is clean and straightforward.
Also applies to: 159-166, 169-184, 187-211
225-280
: LGTM!The
ResponseStream
initialization properly sets up all telemetry objects, threading primitives, and integrates with the globalresponses
dictionary. The use ofObjectProxy
for wrapping is appropriate.
282-313
: LGTM!Context manager implementations properly handle cleanup for both sync and async flows. The exception handling in cleanup (lines 292, 308) uses broad catches intentionally to prevent cleanup failures from masking the original exception, which is acceptable practice.
318-350
: Verify thread-safety of_error_recorded
flag.The error handling correctly records exceptions and sets status before cleanup (addressing the previous review). However, there's still a potential race condition:
- Error handler sets
_error_recorded = True
at lines 328/345 (outside lock)- Concurrently,
_ensure_cleanup()
checksif not self._error_recorded:
at lines 496/509 (inside lock)- If cleanup reads the flag before the error handler writes it, the span status could be set to OK instead of ERROR
This race window is small in practice since cleanup is called sequentially after setting the flag. However, for complete correctness, consider:
Solution: Acquire lock when setting the flag
if self._span and self._span.is_recording(): self._span.record_exception(e) self._span.set_status(Status(StatusCode.ERROR, str(e))) - self._error_recorded = True + with self._cleanup_lock: + self._error_recorded = True self._ensure_cleanup()Apply similarly at line 345 for async.
Based on learnings (past review comment flagged this race condition at lines 324-328, 341-345, 468-469, 490-491, 503-504)
352-419
: LGTM!The
_process_chunk
method properly:
- Records streaming events on the span
- Tracks time to first token
- Accumulates output data from multiple chunk formats
- Updates the global
responses
dictionary- Handles errors gracefully with logging
The broad exception catches at lines 410 and 417 are acceptable here since they're logging errors during non-critical data extraction without disrupting the stream.
431-479
: LGTM!The
_process_complete_response
method correctly:
- Sets span attributes from accumulated data
- Records all telemetry metrics (tokens, choices, duration, time to generate)
- Checks
_error_recorded
before setting OK status (line 474)- Ends the span and marks cleanup complete
The metrics recording logic is well-structured and handles optional counters/histograms appropriately.
481-515
: LGTM!The
_ensure_cleanup
method is thread-safe with proper locking and idempotency checks. It correctly:
- Guards against duplicate cleanup with
_cleanup_completed
flag- Checks
_error_recorded
before setting OK status (lines 496, 509)- Handles cleanup failures gracefully
The broad exception catches at lines 504 and 513 are appropriate for cleanup paths to ensure the span is always closed even if intermediate operations fail.
518-775
: LGTM!The
set_data_attributes
function comprehensively handles:
- Model and usage attributes with reasoning token support
- Request reasoning attributes (summary, effort)
- Tool/function definitions from traced data
- Multiple prompt formats (string, list of messages, computer calls)
- Various completion block types (messages, function calls, file/web search, computer calls, reasoning)
The logging statements will be helpful for debugging. The logic correctly normalizes different content formats and handles optional fields safely.
778-877
: LGTM!The synchronous wrapper properly:
- Creates a CLIENT span for the API call
- Uses
trace.use_span
withend_on_exit=False
for manual span management- Records exception metrics (duration, counter) and span attributes on errors
- Returns a
ResponseStream
for streaming responses with all telemetry parameters- Handles the streaming case correctly by creating TracedData with proper initialization
The exception handling is comprehensive and ensures spans are properly closed on failures.
941-1038
: LGTM!The async wrapper correctly mirrors the sync implementation with proper async/await handling. It properly:
- Handles both
Stream
andAsyncStream
types (line 988)- Creates spans and records telemetry consistently with the sync version
- Returns
ResponseStream
for streaming responses (works for both sync and async iteration)The use of the same
ResponseStream
class for both sync and async is correct since it inherits fromResponseStreamBase
with both__next__
and__anext__
methods.
1103-1157
: LGTM!Both cancel wrappers (sync and async) properly:
- Pop the existing data from the global
responses
dict- Create a span with the original start time if data exists
- Record a cancellation exception on the span
- Set all traced attributes before ending the span
The logic correctly handles cleanup for cancelled responses.
...lemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py
Outdated
Show resolved
Hide resolved
The issue causing 488706-hour durations was a time scale mismatch: - Wrappers capture start_time using time.time() (seconds) - TracedData now stores it as nanoseconds (int(start_time * 1e9)) - Spans receive the nanosecond value directly (no double conversion) This aligns with OpenTelemetry's expectation of nanoseconds since Unix epoch for span start times, while keeping the same pattern as existing code.
Updated chunk processing to handle OpenAI streaming events directly instead of attempting to parse them as Response objects. The streaming API returns event objects (ResponseCreatedEvent, ResponseTextDeltaEvent, ResponseInProgressEvent, ResponseCompletedEvent, etc.) rather than Response objects. Changes: - Handle ResponseCreatedEvent to extract response_id - Handle ResponseTextDeltaEvent to accumulate output text - Handle ResponseOutputItemAddedEvent to track output items - Handle ResponseInProgressEvent and ResponseCompletedEvent for usage data - Changed log level from debug to info for easier debugging This fixes the missing input, output, and token data in spans for streaming responses (issue traceloop#3395).
Fixed ResponseTextDeltaEvent to use 'delta' attribute instead of 'text_delta'. Added handlers for ResponseTextDoneEvent, ResponseContentPartDoneEvent, and ResponseOutputItemDoneEvent to properly extract the complete output text from streaming responses. This fixes the issue where output_text was None in completed streaming spans.
Changed logging level from debug to info for set_data_attributes function to make it easier to diagnose input/output attribute setting issues.
Added info-level logs to show input_data when initializing streaming responses (both sync and async) to help diagnose input attribute issues.
Added handling for function_call_output type inputs in set_data_attributes to properly capture tool/function outputs in span attributes. These are set with role 'tool' and include both call_id and output content.
Updated ResponseOutputItemDoneEvent handler to store the complete item in output_blocks, not just extract text. This ensures function_call, file_search_call, web_search_call, computer_call, reasoning, and other output item types are properly captured for span attributes. This makes streaming behavior consistent with non-streaming, where output_blocks contains all output items which are then processed into tool_calls attributes by set_data_attributes.
Removed all info-level logging statements that were added for debugging during development. The implementation is now working correctly and the verbose logging is no longer needed.
- Fix memory leak by deleting responses dict entries after span completion - Optimize string concatenation using list append + join pattern - Fix type annotation to use empty string instead of None for response_model - Remove try/except/pass blocks in favor of defensive checks - Simplify nested conditionals and use ternary operators where appropriate - Use getattr for simple single-attribute access patterns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed everything up to af3a2a4 in 2 minutes and 24 seconds. Click for details.
- Reviewed
1376
lines of code in4
files - Skipped
0
files when reviewing. - Skipped posting
5
draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py:155
- Draft comment:
Consider using a weak-reference dictionary (e.g. WeakValueDictionary) for the global 'responses' container to mitigate potential memory leaks if cleanup fails. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
2. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py:367
- Draft comment:
Consider adding a final 'else' or logging for unexpected event types in _process_chunk to aid debugging of unhandled cases. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 20% vs. threshold = 50% While the suggestion could help with debugging, the code already has error logging in the try-except block. The function is focused on telemetry/tracing, so missing an event type isn't necessarily an error - it may just be an event type we don't need to track. Adding logging for every unhandled event type could create noise in the logs without providing much value. The comment has a point - without logging unhandled event types, we might miss important events that should be handled. This could lead to missing telemetry data. However, the existing error logging will catch any actual errors, and the code is focused on collecting specific telemetry data. Not every event needs to be handled or logged. The comment should be deleted as it suggests adding potentially noisy logging that isn't necessary for the core functionality of collecting targeted telemetry data.
3. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py:928
- Draft comment:
Using the dictionary merge operator ('|') requires Python 3.9+; ensure this version requirement is documented in your project specifications. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
4. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py:852
- Draft comment:
The sync and async wrappers (responses_get_or_create_wrapper and async_responses_get_or_create_wrapper) contain similar logic; consider abstracting common portions to reduce duplication and improve maintainability. - Reason this comment was not posted:
Comment was on unchanged code.
5. packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py:190
- Draft comment:
The test cases comprehensively cover streaming scenarios (sync, async, context manager, and nested tracer contexts). Consider also adding tests covering cancellation paths if not already covered. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 20% vs. threshold = 50% While testing cancellation paths could be valuable, this is more of a "nice to have" suggestion rather than pointing out a clear issue. The comment is speculative ("if not already covered") and doesn't identify a specific problem. The existing tests already cover the core functionality thoroughly. The suggestion about cancellation paths could be valid from a completeness perspective. Cancellation handling is an important edge case in streaming scenarios. However, per the rules, we should not make speculative suggestions or comments that don't point to clear issues. The comment doesn't identify any actual problems with the current code. The comment should be deleted as it is a speculative suggestion rather than pointing out a clear issue requiring changes.
Workflow ID: wflow_z7pUD5d9ZapvYuc7
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
Fixed span leak where non-streaming completed responses were starting a new span instead of reusing the existing one, leaving the original span unclosed. Also added docstring to _with_responses_telemetry_wrapper.
feat(instrumentation): ...
orfix(instrumentation): ...
.This addresses #3395. The underlying issue was simply that the responses_wrapper code didn't support streaming. I tried to follow the ChatStream approach taken for adding streaming support as closely as I was able to given the API differences with Responses.
Screenshot of this working w/ Langfuse. It's hard to demonstrate it specifically being the streaming response. If ya'll have suggestions on how best to demonstrate this, let me know.
Important
Adds streaming support to OpenAI responses API with telemetry and tests for sync and async operations.
ResponseStream
class inresponses_wrappers.py
.responses_get_or_create_wrapper
andasync_responses_get_or_create_wrapper
to handle telemetry for streaming responses.test_responses_streaming.py
for streaming responses, async responses, and context manager usage._with_responses_telemetry_wrapper
inutils.py
for telemetry handling.This description was created by
for af3a2a4. You can customize this summary. It will automatically update as commits are pushed.
Summary by CodeRabbit