diff --git a/.gitignore b/.gitignore index 5af82ed8f..80abbcbf5 100644 --- a/.gitignore +++ b/.gitignore @@ -109,3 +109,6 @@ Thumbs.db # Scratch files .scratch/ + +# Workspace Definitions +*.code-workspace \ No newline at end of file diff --git a/python/instrumentation/openinference-instrumentation-pipecat/CHANGELOG.md b/python/instrumentation/openinference-instrumentation-pipecat/CHANGELOG.md new file mode 100644 index 000000000..6ab6bc279 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/CHANGELOG.md @@ -0,0 +1,11 @@ +# Changelog + +## [0.1.0] - TBD + +### Features + +* Initial release of openinference-instrumentation-pipecat +* Support for converting Pipecat traces to OpenInference format +* Compatible with Phoenix and Arize observability platforms + +## Changelog diff --git a/python/instrumentation/openinference-instrumentation-pipecat/LICENSE b/python/instrumentation/openinference-instrumentation-pipecat/LICENSE new file mode 100644 index 000000000..6c74c4010 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright The OpenInference Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/python/instrumentation/openinference-instrumentation-pipecat/README.md b/python/instrumentation/openinference-instrumentation-pipecat/README.md new file mode 100644 index 000000000..5e0fbc573 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/README.md @@ -0,0 +1,85 @@ +# OpenInference Pipecat Instrumentation + +Python auto-instrumentation library for Pipecat. This library allows you to convert Pipecat traces to OpenInference, which is OpenTelemetry compatible, and view those traces in [Arize Phoenix](https://github.com/Arize-ai/phoenix). + +## Installation + +```shell +pip install openinference-instrumentation-pipecat +``` + +## Quickstart + +This quickstart shows you how to view your Pipecat traces in Phoenix. + +Install required packages. + +```shell +pip install arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp pipecat-ai +``` + +Start Phoenix in the background as a collector. By default, it listens on `http://localhost:6006`. You can visit the app via a browser at the same address. (Phoenix does not send data over the internet. It only operates locally on your machine.) + +```shell +phoenix serve +``` + +Here's a simple example that demonstrates how to convert Pipecat traces into OpenInference and view those traces in Phoenix: + +```python +import os +import grpc +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from phoenix.otel import register +from openinference.instrumentation.pipecat import OpenInferenceSpanProcessor +from pipecat.utils.tracing import setup_tracing + +# Set your API keys +os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY" + +# Set up the tracer provider +tracer_provider = register( + project_name="default" #Phoenix project name +) + +tracer_provider.add_span_processor(OpenInferenceSpanProcessor()) + +tracer_provider.add_span_processor( + BatchSpanProcessor( + OTLPSpanExporter( + endpoint="http://localhost:4317", #if using phoenix cloud, change to phoenix cloud endpoint (phoenix cloud space -> settings -> endpoint/hostname) + headers={}, + compression=grpc.Compression.Gzip, # use enum instead of string + ) + ) +) + +# Initialize Pipecat tracing +setup_tracing( + service_name="pipecat-phoenix-demo", + exporter=OTLPSpanExporter( + endpoint="http://localhost:4317", + headers={}, + compression=grpc.Compression.Gzip, + ), +) + +# Build your Pipecat pipeline +# ... (add your Pipecat pipeline code here) + +# Now view your converted Pipecat traces in Phoenix! +``` +## This example: + +1. Uses Pipecat's built-in tracing utilities to instrument the application. +2. Defines a Pipecat pipeline for voice/conversational AI +3. Traces are exported to Phoenix using the span processor. + +The traces will be visible in the Phoenix UI at `http://localhost:6006`. + +## More Info + +- [More info on OpenInference and Phoenix](https://docs.arize.com/phoenix) +- [How to customize spans to track sessions, metadata, etc.](https://github.com/Arize-ai/openinference/tree/main/python/openinference-instrumentation#customizing-spans) +- [How to account for private information and span payload customization](https://github.com/Arize-ai/openinference/tree/main/python/openinference-instrumentation#tracing-configuration) diff --git a/python/instrumentation/openinference-instrumentation-pipecat/examples/trace/001-trace.py b/python/instrumentation/openinference-instrumentation-pipecat/examples/trace/001-trace.py new file mode 100644 index 000000000..2a2ef9a37 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/examples/trace/001-trace.py @@ -0,0 +1,169 @@ +import os +from datetime import datetime + +from arize.otel import register as register_arize +from dotenv import load_dotenv +from loguru import logger +from phoenix.otel import register as register_phoenix +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.services.openai.stt import OpenAISTTService +from pipecat.services.openai.tts import OpenAITTSService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams + +from openinference.instrumentation.pipecat import PipecatInstrumentor + +load_dotenv(override=True) + +conversation_id = f"test-conversation-001_{datetime.now().strftime('%Y%m%d_%H%M%S')}" +debug_log_filename = os.path.join(os.getcwd(), f"pipecat_frames_{conversation_id}.log") + + +def setup_tracer_provider(): + """ + Setup the tracer provider. + """ + project_name = os.getenv("PROJECT_NAME", "pipecat-voice-agent") + if os.getenv("ARIZE_SPACE_ID") and os.getenv("ARIZE_API_KEY"): + return register_arize( + space_id=os.getenv("ARIZE_SPACE_ID"), + api_key=os.getenv("ARIZE_API_KEY"), + project_name=project_name, + ) + else: + return register_phoenix(project_name=project_name) + + +tracer_provider = setup_tracer_provider() +PipecatInstrumentor().instrument( + tracer_provider=tracer_provider, + debug_log_filename=debug_log_filename, +) + +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info("Starting bot") + + ### STT ### + stt = OpenAISTTService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4o-transcribe", + prompt="Expect normal helpful conversation.", + ) + ### alternative stt - cartesia ### + # stt = CartesiaSTTService(api_key=os.getenv("CARTESIA_API_KEY")) + + ### LLM ### + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + ### TTS ### + tts = OpenAITTSService( + api_key=os.getenv("OPENAI_API_KEY"), + voice="ballad", + params=OpenAITTSService.InputParams( + instructions="Please speak clearly and at a moderate pace." + ), + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. " + + "Your goal is to demonstrate your capabilities in a succinct way. " + + "Your output will be converted to audio so don't " + + "include special characters in your answers. " + + "Respond to what the user said in a creative and helpful way.", + } + ] + + context = LLMContext(messages) + context_aggregator = LLMContextAggregatorPair(context) + + ### PIPELINE ### + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + ### TASK ### + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + conversation_id=conversation_id, # Use dynamic conversation ID for session tracking + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/python/instrumentation/openinference-instrumentation-pipecat/examples/trace/example.env b/python/instrumentation/openinference-instrumentation-pipecat/examples/trace/example.env new file mode 100644 index 000000000..6823b26e1 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/examples/trace/example.env @@ -0,0 +1,6 @@ +OPENAI_API_KEY=... +PROJECT_NAME="pipecat-voice-agent" + +# if using Arize +ARIZE_API_KEY=... +ARIZE_SPACE_ID=... diff --git a/python/instrumentation/openinference-instrumentation-pipecat/pyproject.toml b/python/instrumentation/openinference-instrumentation-pipecat/pyproject.toml new file mode 100644 index 000000000..6bf85ead5 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/pyproject.toml @@ -0,0 +1,121 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "openinference-instrumentation-pipecat" +dynamic = ["version"] +description = "OpenInference Pipecat Instrumentation" +readme = "README.md" +license = "Apache-2.0" +requires-python = ">=3.9, <3.15" +authors = [ + { name = "OpenInference Authors", email = "oss@arize.com" }, +] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", +] +dependencies = [ + "opentelemetry-api", + "opentelemetry-instrumentation", + "opentelemetry-semantic-conventions", + "openinference-instrumentation>=0.1.34", + "openinference-semantic-conventions>=0.1.21", + "websockets>=13.1,<16.0", + "mypy>=1.18.2", +] + +[project.optional-dependencies] +instruments = [ + "pipecat-ai", +] +test = [ + "pipecat-ai", + "opentelemetry-sdk>=1.20.0", + "opentelemetry-exporter-otlp-proto-http", + "pytest-recording", + "pytest>=7.0.0", + "pytest-asyncio>=0.21.0", +] +examples = [ + "arize-otel>=0.0.1", + "arize-phoenix>=0.0.1", + "daily-python~=0.20.0", + "transformers", + "onnxruntime>=1.20.1,<2", + "websockets>=13.1,<16.0", + "python-dotenv>=1.0.0,<2.0.0", + "uvicorn>=0.32.0,<1.0.0", + "fastapi>=0.115.6,<0.117.0", + "pipecat-ai-small-webrtc-prebuilt>=1.0.0", + "pipecat-ai[tracing]", + "aiortc>=1.13.0,<2", + "opencv-python>=4.11.0.86,<5", +] + +[project.entry-points.opentelemetry_instrumentor] +pipecat = "openinference.instrumentation.pipecat:PipecatInstrumentor" + +[project.entry-points.openinference_instrumentor] +pipecat = "openinference.instrumentation.pipecat:PipecatInstrumentor" + +[project.urls] +Homepage = "https://github.com/Arize-ai/openinference/tree/main/python/instrumentation/openinference-instrumentation-pipecat" + +[tool.hatch.version] +path = "src/openinference/instrumentation/pipecat/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/openinference"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +testpaths = [ + "tests", +] + +[tool.mypy] +strict = true +explicit_package_bases = true +exclude = [ + "examples", + "dist", + "sdist", + "tests", +] + +[[tool.mypy.overrides]] +ignore_missing_imports = true +module = [ + "wrapt", +] + +[tool.ruff] +line-length = 100 +target-version = "py38" + +[tool.ruff.lint.per-file-ignores] +"*.ipynb" = ["E402", "E501"] + +[tool.ruff.lint] +select = ["E", "F", "W", "I"] + +[tool.ruff.lint.isort] +force-single-line = false diff --git a/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/__init__.py b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/__init__.py new file mode 100644 index 000000000..bbbe18c41 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/__init__.py @@ -0,0 +1,165 @@ +"""OpenInference instrumentation for Pipecat.""" + +import logging +from typing import Any, Callable, Collection, Dict, Optional, Tuple + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore +from wrapt import wrap_function_wrapper + +from openinference.instrumentation import OITracer, TraceConfig +from openinference.instrumentation.pipecat._observer import OpenInferenceObserver +from openinference.instrumentation.pipecat.package import _instruments +from openinference.instrumentation.pipecat.version import __version__ +from pipecat.pipeline.task import PipelineTask + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) +logger.setLevel(logging.INFO) + +__all__ = ["PipecatInstrumentor"] + + +class PipecatInstrumentor(BaseInstrumentor): # type: ignore + """ + An instrumentor for Pipecat pipelines. + + Automatically instruments PipelineTask to observe frame flow and create + OpenInference-compliant spans for LLM, TTS, and STT services. + """ + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments if isinstance(_instruments, tuple) else () + + def create_observer(self) -> OpenInferenceObserver: + """ + Create an OpenInferenceObserver manually. + + Returns: + OpenInferenceObserver instance + + Raises: + RuntimeError: If instrumentor is not instrumented yet + """ + if not self.is_instrumented_by_opentelemetry: + raise RuntimeError( + "Instrumentor must be instrumented before creating observers. " + "Call .instrument() first." + ) + + return OpenInferenceObserver(tracer=self._tracer, config=self._config) + + def _instrument(self, **kwargs: Any) -> None: + """ + Instrument Pipecat by wrapping PipelineTask.__init__ to inject observer. + + Args: + tracer_provider: OpenTelemetry TracerProvider + config: OpenInference TraceConfig + debug_log_filename: Optional debug log filename to use for all observers + """ + if not (tracer_provider := kwargs.get("tracer_provider")): + tracer_provider = trace_api.get_tracer_provider() + + if not (config := kwargs.get("config")): + config = TraceConfig() + else: + assert isinstance(config, TraceConfig) + + # Create OITracer + tracer = OITracer( + trace_api.get_tracer(__name__, __version__, tracer_provider), + config=config, + ) + + # Store for creating observers + self._tracer = tracer + self._config = config + self._debug_log_filename = kwargs.get("debug_log_filename") + + try: + # Store original __init__ + self._original_task_init = PipelineTask.__init__ + + # Wrap PipelineTask.__init__ to inject our observer + wrap_function_wrapper( + module="pipecat.pipeline.task", + name="PipelineTask.__init__", + wrapper=_TaskInitWrapper( + tracer=tracer, + config=config, + default_debug_log_filename=self._debug_log_filename, + ), + ) + + logger.info("Pipecat instrumentation enabled") + + except ImportError as e: + logger.warning(f"Failed to instrument Pipecat: {e}") + + def _uninstrument(self, **kwargs: Any) -> None: + """ + Uninstrument Pipecat by restoring original PipelineTask.__init__. + """ + try: + if hasattr(self, "_original_task_init"): + PipelineTask.__init__ = self._original_task_init # type: ignore + logger.info("Pipecat instrumentation disabled") + except (ImportError, AttributeError): + pass + + +class _TaskInitWrapper: + """Wrapper for PipelineTask.__init__ to inject OpenInferenceObserver.""" + + def __init__( + self, + tracer: OITracer, + config: TraceConfig, + default_debug_log_filename: Optional[str] = None, + ): + self._tracer = tracer + self._config = config + self._default_debug_log_filename = default_debug_log_filename + + def __call__( + self, + wrapped: Callable[[Any, Any], Any], + instance: PipelineTask, + args: Tuple[Any, ...], + kwargs: Dict[str, Any], + ) -> None: + """ + Call original __init__, then inject our observer. + + This creates a new observer instance for each task (thread-safe). + """ + # Call original __init__ + wrapped(*args, **kwargs) + + # Extract conversation_id from PipelineTask if available + # PipelineTask stores it as _conversation_id (private attribute) + conversation_id = getattr(instance, "_conversation_id", None) + additional_span_attributes = getattr(instance, "_additional_span_attributes", None) + + # Use task-specific debug log filename if set, otherwise use default from instrument() + debug_log_filename = ( + getattr(instance, "_debug_log_filename", None) or self._default_debug_log_filename + ) + + observer = OpenInferenceObserver( + tracer=self._tracer, + config=self._config, + conversation_id=conversation_id, + debug_log_filename=debug_log_filename, + additional_span_attributes=additional_span_attributes, + ) + + # Inject observer into task + instance.add_observer(observer) + + logger.info(f"Injected OpenInferenceObserver into PipelineTask {id(instance)} ") + if additional_span_attributes: + logger.info(f"Additional span attributes: {str(additional_span_attributes)}") + if conversation_id: + logger.info(f"Conversation ID: {conversation_id}") diff --git a/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py new file mode 100644 index 000000000..4ed5d876c --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py @@ -0,0 +1,884 @@ +"""Attribute extraction from Pipecat frames.""" + +import logging +from typing import Any, Callable, Dict, List, Type + +from openinference.instrumentation.helpers import safe_json_dumps +from openinference.semconv.trace import ( + AudioAttributes, + MessageAttributes, + OpenInferenceSpanKindValues, + SpanAttributes, + ToolCallAttributes, +) +from pipecat.frames.frames import ( + AudioRawFrame, + Frame, + FunctionCallFromLLM, + FunctionCallInProgressFrame, + FunctionCallResultFrame, + InterimTranscriptionFrame, + LLMContextFrame, + LLMFullResponseEndFrame, + LLMFullResponseStartFrame, + LLMMessagesAppendFrame, + LLMMessagesFrame, + LLMTextFrame, + MetricsFrame, + TextFrame, + TranscriptionFrame, + TTSTextFrame, +) +from pipecat.metrics.metrics import ( + LLMTokenUsage, + LLMUsageMetricsData, + MetricsData, + ProcessingMetricsData, + TTFBMetricsData, + TTSUsageMetricsData, +) +from pipecat.processors.aggregators.llm_context import ( + LLMContext, + LLMSpecificMessage, +) +from pipecat.processors.frame_processor import FrameProcessor +from pipecat.services.ai_service import AIService +from pipecat.services.image_service import ImageGenService +from pipecat.services.llm_service import LLMService +from pipecat.services.stt_service import STTService +from pipecat.services.tts_service import TTSService +from pipecat.services.vision_service import VisionService +from pipecat.services.websocket_service import WebsocketService + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +__all__ = [ + "extract_attributes_from_frame", + "extract_service_attributes", + "detect_service_type", + "detect_provider_from_service", +] + +FRAME_TYPE_MAP = { + TranscriptionFrame.__name__: "transcription", + TTSTextFrame.__name__: "tts_text", + TextFrame.__name__: "text", + AudioRawFrame.__name__: "audio", + FunctionCallFromLLM.__name__: "function_call_from_llm", + FunctionCallInProgressFrame.__name__: "function_call_in_progress", + FunctionCallResultFrame.__name__: "function_call_result", + LLMContextFrame.__name__: "llm_context", + LLMMessagesFrame.__name__: "llm_messages", +} + +SERVICE_TYPE_MAP = { + STTService.__name__: "stt", + LLMService.__name__: "llm", + TTSService.__name__: "tts", + ImageGenService.__name__: "image_gen", + VisionService.__name__: "vision", + WebsocketService.__name__: "websocket", + AIService.__name__: "ai", +} + + +def safe_extract(extractor: Callable[[], Any], default: Any = None) -> Any: + """ + Safely execute an extractor function, returning default value on error. + + Args: + extractor: Function to execute + default: Default value to return on error + + Returns: + Result of extractor or default value on error + """ + try: + return extractor() + except Exception as e: + logger.debug(f"Failed to extract attribute: {e}") + return default + + +def detect_frame_type(frame: Frame) -> str: + """Detect the type of frame using MRO for inheritance support.""" + # Walk through the Method Resolution Order to find first matching frame type + for base_class in frame.__class__.__mro__: + frame_type = FRAME_TYPE_MAP.get(base_class.__name__) + if frame_type: + return frame_type + return "unknown" + + +def detect_service_type(service: FrameProcessor) -> str: + """Detect the type of service using MRO for inheritance support.""" + # Walk through the Method Resolution Order to find first matching service type + for base_class in service.__class__.__mro__: + service_type = SERVICE_TYPE_MAP.get(base_class.__name__) + if service_type: + return service_type + return "unknown" + + +def detect_provider_from_service(service: FrameProcessor) -> str: + """Detect the provider from a service.""" + try: + module = service.__class__.__module__ + parts = module.split(".") + + # Module format: pipecat.services.{provider}.{service_type} + if len(parts) >= 3 and parts[0] == "pipecat" and parts[1] == "services": + return parts[2] + else: + return "unknown" + except Exception as e: + logger.warning(f"Failed to detect provider from service: {e}") + return "unknown" + + +class FrameAttributeExtractor: + """Extract attributes from Pipecat frames.""" + + _base_attributes: Dict[str, Any] = { + "frame.id": lambda frame: frame.id, + SpanAttributes.USER_ID: lambda frame: getattr(frame, "user_id", None), + "frame.name": lambda frame: getattr(frame, "name", None), + "frame.pts": lambda frame: getattr(frame, "pts", None), + "frame.timestamp": lambda frame: getattr(frame, "timestamp", None), + "frame.metadata": lambda frame: safe_json_dumps(getattr(frame, "metadata", {})), + "frame.transport_source": lambda frame: getattr(frame, "transport_source", None), + "frame.transport_destination": lambda frame: getattr(frame, "transport_destination", None), + "frame.error.message": lambda frame: getattr(frame, "error", None), + } + attributes: Dict[str, Any] = {} + + def extract_from_frame(self, frame: Frame) -> Dict[str, Any]: + result: Dict[str, Any] = {} + attributes = {**self._base_attributes, **self.attributes} + for attribute, operation in attributes.items(): + # Use safe_extract to prevent individual attribute failures from breaking extraction + value = safe_extract(lambda: operation(frame)) + if value is not None: + result[attribute] = value + return result + + +class TextFrameExtractor(FrameAttributeExtractor): + """Extract attributes from text frames (TextFrame, LLMTextFrame, TranscriptionFrame, etc.).""" + + def extract_from_frame(self, frame: Frame) -> Dict[str, Any]: + results: Dict[str, Any] = super().extract_from_frame(frame) + if hasattr(frame, "text") and frame.text: + text = frame.text + + # Handle different text frame types + if isinstance(frame, (TranscriptionFrame, InterimTranscriptionFrame)): + # Transcription frames are INPUT (user speech) + results[SpanAttributes.INPUT_VALUE] = text + results[AudioAttributes.AUDIO_TRANSCRIPT] = text + + results["llm.input_messages.0.message.role"] = "user" + results["llm.input_messages.0.message.content"] = text + results["llm.input_messages.0.message.name"] = "stt_text" + + # Add is_final flag for transcriptions + if isinstance(frame, TranscriptionFrame): + results["transcription.is_final"] = True + results[SpanAttributes.INPUT_VALUE] = text + elif isinstance(frame, InterimTranscriptionFrame): + results["transcription.is_final"] = False + + elif isinstance(frame, TTSTextFrame): + # TTSTextFrame represents input TO the TTS service (text to be synthesized) + # Note: Don't set INPUT_VALUE here - observer will accumulate streaming chunks + results["text"] = text # Match Pipecat native tracing attribute name + results["text.chunk"] = text # Raw chunk for accumulation + + elif isinstance(frame, LLMTextFrame): + # LLMTextFrame represents output FROM the LLM service + # Note: Don't set OUTPUT_VALUE here - observer will accumulate streaming chunks + results["text.chunk"] = text # Raw chunk for accumulation + + elif isinstance(frame, TextFrame): + # Generic text frame (output) + results[SpanAttributes.OUTPUT_VALUE] = text + results["llm.output_messages.0.message.role"] = "user" + results["llm.output_messages.0.message.content"] = text + results["llm.output_messages.0.message.name"] = "text" + + return results + + +# Singleton text frame extractor +_text_frame_extractor = TextFrameExtractor() + + +class LLMContextFrameExtractor(FrameAttributeExtractor): + """Extract attributes from an LLM context frame.""" + + def extract_from_frame(self, frame: Frame) -> Dict[str, Any]: + results: Dict[str, Any] = {} + if hasattr(frame, "context") and frame.context: + context = frame.context + # Extract messages from context + # Note: context.messages is the public API, context._messages is the internal list + # Try _messages first (more reliable), then fall back to messages + messages_list = None + if hasattr(context, "_messages") and context._messages: + messages_list = context._messages + results["llm.messages_count"] = len(context._messages) + elif hasattr(context, "messages") and context.messages: + messages_list = context.messages + results["llm.messages_count"] = len(context.messages) + + if messages_list: + # Convert messages to serializable format + serializable_messages = [] + for msg in messages_list: + if isinstance(msg, dict): + serializable_messages.append(msg) + elif hasattr(msg, "role") and hasattr(msg, "content"): + # LLMMessage object - convert to dict + msg_dict = { + "role": (str(msg.role) if hasattr(msg.role, "__str__") else msg.role), + "content": ( + str(msg.content) + if not isinstance(msg.content, str) + else msg.content + ), + } + if hasattr(msg, "name") and msg.name: + msg_dict["name"] = msg.name + serializable_messages.append(msg_dict) + else: + # Fallback: try to extract from object attributes + try: + msg_dict = { + "role": getattr(msg, "role", "unknown"), + "content": str(msg), + } + serializable_messages.append(msg_dict) + except Exception as e: + logger.debug(f"Could not serialize LLMContext message: {e}") + pass + + # Store full message history in flattened format that Arize expects + if serializable_messages: + for index, message in enumerate(serializable_messages): + if isinstance(message, dict): + results[f"llm.input_messages.{index}.message.role"] = message.get( + "role" + ) + results[f"llm.input_messages.{index}.message.content"] = message.get( + "content" + ) + if message.get("name"): + results[f"llm.input_messages.{index}.message.name"] = message.get( + "name" + ) + + # For input.value, only capture the LAST user message (current turn's input) + last_user_message = None + for msg in reversed(serializable_messages): + if isinstance(msg, dict) and msg.get("role") == "user": + last_user_message = msg + break + + if last_user_message: + # Set input.value to just the content of the current turn's user message + content = last_user_message.get("content", "") + results[SpanAttributes.INPUT_VALUE] = content + + # Set message attributes with proper role attribution + results[MessageAttributes.MESSAGE_ROLE] = last_user_message.get( + "role", "user" + ) + results[MessageAttributes.MESSAGE_CONTENT] = content + if last_user_message.get("name"): + results[MessageAttributes.MESSAGE_NAME] = last_user_message.get("name") + # Extract tools if present + if hasattr(context, "_tools") and context._tools: + try: + tools = context._tools + if isinstance(tools, list): + results["llm.tools_count"] = len(tools) + + # Extract tool names + tool_names = [] + for tool in tools: + if isinstance(tool, dict) and "name" in tool: + tool_names.append(tool["name"]) + elif hasattr(tool, "name"): + tool_names.append(tool.name) + elif ( + isinstance(tool, dict) + and "function" in tool + and "name" in tool["function"] + ): + tool_names.append(tool["function"]["name"]) + + if tool_names: + results["tools.names"] = ",".join(tool_names) + + # Serialize full tool definitions (with size limit) + try: + tools_json = safe_json_dumps(tools) + if tools_json and len(tools_json) < 10000: # 10KB limit + results["tools.definitions"] = tools_json + except (TypeError, ValueError) as e: + logger.debug(f"Could not serialize tool definitions: {e}") + + except (TypeError, AttributeError) as e: + logger.debug(f"Could not extract tool information: {e}") + + return results + + +# Singleton LLM context frame extractor +_llm_context_frame_extractor = LLMContextFrameExtractor() + + +class LLMMessagesFrameExtractor(FrameAttributeExtractor): + """Extract attributes from an LLM messages frame.""" + + def extract_from_frame(self, frame: Frame) -> Dict[str, Any]: + results: Dict[str, Any] = {} + if hasattr(frame, "context") and frame.context and isinstance(frame.context, LLMContext): + context = frame.context + # Extract messages from context (context._messages is a list) + if hasattr(context, "_messages") and context._messages: + results["llm.messages_count"] = len(context._messages) + + # Convert messages to serializable format + try: + # Messages can be LLMStandardMessage or LLMSpecificMessage + # They should be dict-like for serialization + messages_list: List[Any] = [] + for msg in context._messages: + if isinstance(msg, dict): + raw_content = msg.get("content") + if isinstance(raw_content, str): + content = msg.get("content") + elif isinstance(raw_content, dict): + content = safe_json_dumps(raw_content) + else: + content = str(raw_content) + messages = { + "role": msg.get("role"), + "content": content, + "name": msg.get("name", ""), + } + messages_list.append(messages) + elif isinstance(msg, LLMSpecificMessage): + # Fallback: try to serialize the object + messages_list.append(msg.message) + + # Store full message history for reference + for index, message in enumerate(messages_list): + if isinstance(message, dict): + results[f"llm.input_messages.{index}.message.role"] = message.get( + "role" + ) + results[f"llm.input_messages.{index}.message.content"] = message.get( + "content" + ) + results[f"llm.input_messages.{index}.message.name"] = message.get( + "name" + ) + else: + results[f"llm.input_messages.{index}.message.role"] = "unknown" + results[f"llm.input_messages.{index}.message.content"] = str(message) + results[f"llm.input_messages.{index}.message.name"] = "unknown" + except (TypeError, ValueError, AttributeError) as e: + logger.debug(f"Could not serialize LLMContext messages: {e}") + + # Extract tools if present + if hasattr(context, "_tools") and context._tools: + try: + tools = context._tools + + # Get tool count + if isinstance(tools, list): + results["llm.tools_count"] = len(tools) + + # Extract tool names as comma-separated list + tool_names = [] + for tool in tools: + if isinstance(tool, dict) and "name" in tool: + tool_names.append(tool["name"]) + elif hasattr(tool, "name"): + tool_names.append(tool.name) + elif ( + isinstance(tool, dict) + and "function" in tool + and "name" in tool["function"] + ): + tool_names.append(tool["function"]["name"]) + + if tool_names: + results["tools.names"] = ",".join(tool_names) + + # Serialize full tool definitions (with size limit) + try: + tools_json = safe_json_dumps(tools) + if tools_json and len(tools_json) < 10000: # 10KB limit + results["tools.definitions"] = tools_json + except (TypeError, ValueError) as e: + logger.debug(f"Could not serialize tool definitions: {e}") + + except (TypeError, AttributeError) as e: + logger.debug(f"Could not extract tool information: {e}") + + return results + + +# Singleton LLM messages frame extractor +_llm_messages_frame_extractor = LLMMessagesFrameExtractor() + + +class LLMMessagesSequenceFrameExtractor(FrameAttributeExtractor): + """Extract attributes from an LLM messages append frame.""" + + phase: str = "append" + + def extract_from_frame(self, frame: Frame) -> Dict[str, Any]: + results: Dict[str, Any] = { + "llm.response_phase": self.phase, + } + if hasattr(frame, "_messages") and frame._messages: + messages = frame._messages + results[SpanAttributes.LLM_INPUT_MESSAGES] = [] + # Extract text content for input.value + for message in messages: + if isinstance(message, dict): + results[SpanAttributes.LLM_INPUT_MESSAGES].append( + { + MessageAttributes.MESSAGE_ROLE: message.get("role"), + MessageAttributes.MESSAGE_CONTENT: message.get("content"), + MessageAttributes.MESSAGE_NAME: message.get("name"), + } + ) + else: + results[SpanAttributes.LLM_INPUT_MESSAGES].append( + { + MessageAttributes.MESSAGE_ROLE: "unknown", + MessageAttributes.MESSAGE_CONTENT: str(message), + MessageAttributes.MESSAGE_NAME: "unknown", + } + ) + return results + + +# Singleton LLM messages sequence frame extractor +_llm_messages_sequence_frame_extractor = LLMMessagesSequenceFrameExtractor() + + +class LLMMessagesAppendFrameExtractor(LLMMessagesSequenceFrameExtractor): + """Extract attributes from an LLM messages append frame.""" + + phase: str = "append" + + +# Singleton LLM messages append frame extractor +_llm_messages_append_frame_extractor = LLMMessagesAppendFrameExtractor() + + +class LLMFullResponseStartFrameExtractor(LLMMessagesSequenceFrameExtractor): + """Extract attributes from an LLM full response start frame.""" + + phase: str = "start" + + +# Singleton LLM full response start frame extractor +_llm_full_response_start_frame_extractor = LLMFullResponseStartFrameExtractor() + + +class LLMFullResponseEndFrameExtractor(LLMMessagesSequenceFrameExtractor): + """Extract attributes from an LLM full response end frame.""" + + phase: str = "end" + + +# Singleton LLM full response end frame extractor +_llm_full_response_end_frame_extractor = LLMFullResponseEndFrameExtractor() + + +class FunctionCallResultFrameExtractor(FrameAttributeExtractor): + """Extract attributes from function call result frames.""" + + attributes: Dict[str, Any] = { + SpanAttributes.TOOL_NAME: lambda frame: getattr(frame, "function_name", None), + ToolCallAttributes.TOOL_CALL_ID: lambda frame: getattr(frame, "tool_call_id", None), + ToolCallAttributes.TOOL_CALL_FUNCTION_NAME: lambda frame: getattr( + frame, "function_name", None + ), + ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON: lambda frame: ( + safe_json_dumps(getattr(frame, "arguments", {})) + ), + "tool.result": lambda frame: (safe_json_dumps(getattr(frame, "result", {}))), + } + + +# Singleton function call result frame extractor +_function_call_result_frame_extractor = FunctionCallResultFrameExtractor() + + +class MetricsDataExtractor: + """Extract attributes from metrics frames.""" + + attributes: Dict[str, Any] = {} + _base_attributes: Dict[str, Any] = { + "metrics.processor": lambda metrics_data: getattr(metrics_data, "processor", None), + "metrics.model": lambda metrics_data: getattr(metrics_data, "model", None), + } + + def extract_from_metrics_data(self, metrics_data: MetricsData) -> Dict[str, Any]: + results: Dict[str, Any] = {} + attributes = {**self._base_attributes, **self.attributes} + for attribute, operation in attributes.items(): + value = safe_extract(lambda: operation(metrics_data)) + if value is not None: + results[attribute] = value + return results + + +class LLMTokenMetricsDataExtractor: + """Extract attributes from LLM token metrics data.""" + + attributes: Dict[str, Any] = { + SpanAttributes.LLM_TOKEN_COUNT_PROMPT: lambda metrics_data: getattr( + metrics_data, "prompt_tokens", None + ), + SpanAttributes.LLM_TOKEN_COUNT_COMPLETION: lambda metrics_data: getattr( + metrics_data, "completion_tokens", None + ), + SpanAttributes.LLM_TOKEN_COUNT_TOTAL: lambda metrics_data: getattr( + metrics_data, "total_tokens", None + ), + SpanAttributes.LLM_TOKEN_COUNT_PROMPT_DETAILS_CACHE_READ: lambda metrics_data: getattr( + metrics_data, "cache_read_input_tokens", None + ), + SpanAttributes.LLM_TOKEN_COUNT_PROMPT_DETAILS_AUDIO: lambda metrics_data: getattr( + metrics_data, "audio_tokens", None + ), + SpanAttributes.LLM_TOKEN_COUNT_COMPLETION_DETAILS_REASONING: lambda metrics_data: getattr( + metrics_data, "reasoning_tokens", None + ), + SpanAttributes.LLM_TOKEN_COUNT_COMPLETION_DETAILS_AUDIO: lambda metrics_data: getattr( + metrics_data, "audio_tokens", None + ), + } + + def extract_from_metrics_data(self, metrics_data: LLMTokenUsage) -> Dict[str, Any]: + results: Dict[str, Any] = {} + for attribute, operation in self.attributes.items(): + value = safe_extract(lambda: operation(metrics_data)) + if value is not None: + results[attribute] = value + return results + + +# Singleton LLM token metrics data extractor +_llm_token_metrics_data_extractor = LLMTokenMetricsDataExtractor() + + +class LLMUsageMetricsDataExtractor(MetricsDataExtractor): + """Extract attributes from LLM usage metrics data.""" + + def extract_from_metrics_data(self, metrics_data: MetricsData) -> Dict[str, Any]: + results: Dict[str, Any] = super().extract_from_metrics_data(metrics_data) + if isinstance(metrics_data, LLMUsageMetricsData): + llm_usage_metrics_data: LLMTokenUsage = metrics_data.value + results.update( + _llm_token_metrics_data_extractor.extract_from_metrics_data(llm_usage_metrics_data) + ) + return results + + +# Singleton LLM usage metrics data extractor +_llm_usage_metrics_data_extractor = LLMUsageMetricsDataExtractor() + + +class TTSUsageMetricsDataExtractor(MetricsDataExtractor): + """Extract attributes from TTS usage metrics data.""" + + attributes: Dict[str, Any] = { + "tts.character_count": lambda metrics_data: getattr(metrics_data, "value", None), + } + + +# Singleton TTS usage metrics data extractor +_tts_usage_metrics_data_extractor = TTSUsageMetricsDataExtractor() + + +class TTFBMetricsDataExtractor(MetricsDataExtractor): + """Extract attributes from TTFB metrics data.""" + + attributes: Dict[str, Any] = { + "service.ttfb_seconds": lambda metrics_data: getattr(metrics_data, "value", None), + } + + +# Singleton TTFB metrics data extractor +_ttfb_metrics_data_extractor = TTFBMetricsDataExtractor() + + +class ProcessingMetricsDataExtractor(MetricsDataExtractor): + """Extract attributes from processing metrics data.""" + + attributes: Dict[str, Any] = { + "service.processing_time_seconds": lambda metrics_data: getattr( + metrics_data, "value", None + ), + } + + +# Singleton processing metrics data extractor +_processing_metrics_data_extractor = ProcessingMetricsDataExtractor() + + +class MetricsFrameExtractor(FrameAttributeExtractor): + """Extract attributes from metrics frames.""" + + metrics_extractor_map: Dict[Type[MetricsData], MetricsDataExtractor] = { + LLMUsageMetricsData: _llm_usage_metrics_data_extractor, + TTSUsageMetricsData: _tts_usage_metrics_data_extractor, + TTFBMetricsData: _ttfb_metrics_data_extractor, + ProcessingMetricsData: _processing_metrics_data_extractor, + } + + def extract_from_frame(self, frame: Frame) -> Dict[str, Any]: + results: Dict[str, Any] = {} + + if not hasattr(frame, "data") or not frame.data: + return results + + metrics: List[MetricsData] = frame.data + for metrics_data in metrics: + for base_class in metrics_data.__class__.__mro__: + extractor = self.metrics_extractor_map.get(base_class) + if extractor: + results.update(extractor.extract_from_metrics_data(metrics_data)) + break # Only extract attributes from the first matching extractor + return results + + +# Singleton metrics frame extractor +_metrics_frame_extractor = MetricsFrameExtractor() + + +class GenericFrameExtractor(FrameAttributeExtractor): + """Extract attributes from a generic frame.""" + + frame_extractor_map: Dict[Type[Frame], FrameAttributeExtractor] = { + TextFrame: _text_frame_extractor, + LLMContextFrame: _llm_context_frame_extractor, + LLMMessagesFrame: _llm_messages_frame_extractor, + LLMMessagesAppendFrame: _llm_messages_append_frame_extractor, + LLMFullResponseStartFrame: _llm_full_response_start_frame_extractor, + LLMFullResponseEndFrame: _llm_full_response_end_frame_extractor, + FunctionCallResultFrame: _function_call_result_frame_extractor, + MetricsFrame: _metrics_frame_extractor, + } + + def extract_from_frame(self, frame: Frame) -> Dict[str, Any]: + results: Dict[str, Any] = {} + for base_class in frame.__class__.__mro__: + extractor = self.frame_extractor_map.get(base_class) + if extractor: + results.update(extractor.extract_from_frame(frame)) + return results + + +# Singleton generic frame extractor +_generic_frame_extractor = GenericFrameExtractor() + + +def extract_attributes_from_frame(frame: Frame) -> Dict[str, Any]: + """ + Extract attributes from a frame using the singleton extractor. + + This is the main entry point for attribute extraction. + """ + return _generic_frame_extractor.extract_from_frame(frame) + + +# ============================================================================ +# Service Attribute Extraction (for span creation) +# ============================================================================ + + +class ServiceAttributeExtractor: + """Base class for extracting attributes from services for span creation.""" + + attributes: Dict[str, Any] = {} + _base_attributes: Dict[str, Any] = {} + + def extract_from_service(self, service: FrameProcessor) -> Dict[str, Any]: + """Extract attributes from a service.""" + result: Dict[str, Any] = {} + attributes = {**self._base_attributes, **self.attributes} + for attribute, operation in attributes.items(): + # Use safe_extract to prevent individual attribute failures from breaking extraction + value = safe_extract(lambda: operation(service)) if operation else None + if value is not None: + result[attribute] = value + return result + + +class BaseServiceAttributeExtractor(ServiceAttributeExtractor): + """Extract base attributes common to all services.""" + + _base_attributes: Dict[str, Any] = { + "service.type": lambda service: detect_service_type(service), + "service.provider": lambda service: detect_provider_from_service(service), + } + + +# Singleton base service attribute extractor +_base_service_attribute_extractor = BaseServiceAttributeExtractor() + + +class LLMServiceAttributeExtractor(ServiceAttributeExtractor): + """Extract attributes from an LLM service for span creation.""" + + attributes: Dict[str, Any] = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: lambda service: ( + OpenInferenceSpanKindValues.LLM.value + ), + SpanAttributes.LLM_MODEL_NAME: lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + SpanAttributes.LLM_PROVIDER: lambda service: detect_provider_from_service(service), + # GenAI semantic conventions (dual attributes) + "gen_ai.system": lambda service: detect_provider_from_service(service), + "gen_ai.request.model": lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + "gen_ai.operation.name": lambda service: "chat", + "gen_ai.output.type": lambda service: "text", + # Streaming flag + "stream": lambda service: getattr(service, "_stream", True), + } + + def extract_from_service(self, service: FrameProcessor) -> Dict[str, Any]: + """Extract LLM service attributes including settings.""" + results = super().extract_from_service(service) + + # Extract LLM settings/configuration as metadata + if hasattr(service, "_settings"): + if isinstance(service._settings, dict): + results[SpanAttributes.METADATA] = safe_json_dumps(service._settings) + else: + results[SpanAttributes.METADATA] = str(service._settings) + + return results + + +# Singleton LLM service attribute extractor +_llm_service_attribute_extractor = LLMServiceAttributeExtractor() + + +class STTServiceAttributeExtractor(ServiceAttributeExtractor): + """Extract attributes from an STT service for span creation.""" + + attributes: Dict[str, Any] = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: lambda service: ( + OpenInferenceSpanKindValues.LLM.value + ), + SpanAttributes.LLM_MODEL_NAME: lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + SpanAttributes.LLM_PROVIDER: lambda service: detect_provider_from_service(service), + "service.model": lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + "audio.sample_rate": lambda service: getattr(service, "sample_rate", None), + "audio.is_muted": lambda service: getattr(service, "is_muted", None), + "audio.user_id": lambda service: getattr(service, "_user_id", None), + } + + +# Singleton STT service attribute extractor +_stt_service_attribute_extractor = STTServiceAttributeExtractor() + + +class TTSServiceAttributeExtractor(ServiceAttributeExtractor): + """Extract attributes from a TTS service for span creation.""" + + attributes: Dict[str, Any] = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: lambda service: ( + OpenInferenceSpanKindValues.LLM.value + ), + SpanAttributes.LLM_MODEL_NAME: lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + SpanAttributes.LLM_PROVIDER: lambda service: detect_provider_from_service(service), + "service.model": lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + "audio.voice_id": lambda service: getattr(service, "_voice_id", None), + "audio.voice": lambda service: getattr(service, "_voice_id", None), + "audio.sample_rate": lambda service: getattr(service, "sample_rate", None), + } + + +# Singleton TTS service attribute extractor +_tts_service_attribute_extractor = TTSServiceAttributeExtractor() + + +class ImageGenServiceAttributeExtractor(ServiceAttributeExtractor): + """Extract attributes from an image generation service for span creation.""" + + attributes: Dict[str, Any] = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: lambda service: ( + OpenInferenceSpanKindValues.CHAIN.value + ), + "service.model": lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + } + + +# Singleton image gen service attribute extractor +_image_gen_service_attribute_extractor = ImageGenServiceAttributeExtractor() + + +class VisionServiceAttributeExtractor(ServiceAttributeExtractor): + """Extract attributes from a vision service for span creation.""" + + attributes: Dict[str, Any] = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: lambda service: ( + OpenInferenceSpanKindValues.CHAIN.value + ), + "service.model": lambda service: getattr(service, "model_name", None) + or getattr(service, "model", None), + } + + +# Singleton vision service attribute extractor +_vision_service_attribute_extractor = VisionServiceAttributeExtractor() + + +class GenericServiceAttributeExtractor(ServiceAttributeExtractor): + """Extract attributes from a generic service for span creation.""" + + service_attribute_extractor_map: Dict[Type[FrameProcessor], ServiceAttributeExtractor] = { + LLMService: _llm_service_attribute_extractor, + STTService: _stt_service_attribute_extractor, + TTSService: _tts_service_attribute_extractor, + ImageGenService: _image_gen_service_attribute_extractor, + VisionService: _vision_service_attribute_extractor, + } + + def extract_from_service(self, service: FrameProcessor) -> Dict[str, Any]: + """Extract attributes from a generic service.""" + results: Dict[str, Any] = {} + for base_class in service.__class__.__mro__: + extractor = self.service_attribute_extractor_map.get(base_class) + if extractor: + results.update(extractor.extract_from_service(service)) + return results + + +# Singleton generic service attribute extractor +_generic_service_attribute_extractor = GenericServiceAttributeExtractor() + + +def extract_service_attributes(service: FrameProcessor) -> Dict[str, Any]: + """Extract attributes from a service using the singleton extractor.""" + return _generic_service_attribute_extractor.extract_from_service(service) diff --git a/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_observer.py b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_observer.py new file mode 100644 index 000000000..89e9fd331 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_observer.py @@ -0,0 +1,686 @@ +"""OpenInference observer for Pipecat pipelines.""" + +import asyncio +import logging +import time +from collections import deque +from contextvars import Token +from datetime import datetime +from typing import Any, Deque, Dict, List, Optional, Set + +from opentelemetry import trace as trace_api +from opentelemetry.context import Context +from opentelemetry.trace import Span + +from openinference.instrumentation import OITracer, TraceConfig +from openinference.instrumentation.pipecat._attributes import ( + detect_service_type, + extract_attributes_from_frame, + extract_service_attributes, +) +from openinference.semconv.trace import ( + OpenInferenceSpanKindValues, + SpanAttributes, +) +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + CancelFrame, + EndFrame, + Frame, + LLMContextFrame, + StartFrame, + TranscriptionFrame, + TTSTextFrame, + UserStartedSpeakingFrame, +) +from pipecat.observers.base_observer import BaseObserver, FramePushed +from pipecat.processors.frame_processor import FrameProcessor +from pipecat.transports.base_output import BaseOutputTransport + +# Suppress OpenTelemetry context detach errors - these are expected in async code +# where contexts may be created and detached in different async contexts +logging.getLogger("opentelemetry.context").setLevel(logging.CRITICAL) + +logger = logging.getLogger(__name__) + + +class OpenInferenceObserver(BaseObserver): + """ + Observer that creates OpenInference spans for Pipecat frame processing. + + Observes frame flow through pipeline and creates spans for LLM, TTS, and STT services. + Implements proper span hierarchy with session ID propagation. + """ + + def __init__( + self, + tracer: OITracer, + config: TraceConfig, + additional_span_attributes: Optional[Dict[str, Any]] = None, + conversation_id: Optional[str] = None, + debug_log_filename: Optional[str] = None, + max_frames: int = 100, + turn_end_timeout_secs: float = 2.5, + verbose: bool = False, + ): + """ + Initialize the observer. + + Args: + tracer: OpenInference tracer + config: Trace configuration + conversation_id: Optional conversation/session ID to link all spans + debug_log_filename: Optional filename for debug logging + max_frames: Maximum number of frame IDs to keep in history for + duplicate detection. Defaults to 100. + turn_end_timeout_secs: Timeout in seconds after bot stops speaking + before automatically ending the turn. Defaults to 2.5. + """ + super().__init__() + self._tracer = tracer + self._config = config + self._additional_span_attributes: Dict[str, str] = {} + if additional_span_attributes and isinstance(additional_span_attributes, dict): + for k, v in additional_span_attributes.items(): + self._additional_span_attributes[str(k)] = str(v) + # Session management + self._conversation_id = conversation_id + + # Debug logging to file + self._debug_log_file = None + self._verbose = verbose + if debug_log_filename: + # Write log to current working directory (where the script is running) + try: + self._debug_log_file = open(debug_log_filename, "w") + self._log_debug(f"=== Observer initialized for conversation {conversation_id} ===") + self._log_debug(f"=== Log file: {debug_log_filename} ===") + except Exception as e: + logger.error(f"Could not open debug log file: {e}") + + # Track processed frames to avoid duplicates + self._processed_frames: Set[int] = set() + self._frame_history: Deque[int] = deque(maxlen=max_frames) + self._active_spans: Dict[int, Dict[str, Any]] = {} + + # Track the last frame seen from each service to detect completion + self._last_frames: Dict[int, Frame] = {} + + # Turn tracking state (based on TurnTrackingObserver pattern) + self._turn_active = False + self._turn_span: Optional[Span] = None + self._turn_context_token: Optional[Token[Context]] = None + self._turn_number: int = 0 + self._turn_start_time: int = 0 + self._turn_user_text: List[str] = [] + self._turn_bot_text: List[str] = [] + self._bot_speaking: bool = False + self._has_bot_spoken: bool = False + self._turn_end_timeout_secs: float = turn_end_timeout_secs + self._end_turn_timer: Optional[asyncio.TimerHandle] = None + + def _log_debug(self, message: str) -> None: + """Log debug message to file and logger.""" + if self._debug_log_file: + timestamp = datetime.now().isoformat() + log_line = f"[{timestamp}] {message}\n" + self._debug_log_file.write(log_line) + self._debug_log_file.flush() + if self._verbose: + logger.debug(message) + + def __del__(self) -> None: + """Clean up debug log file.""" + if self._debug_log_file: + try: + self._log_debug("=== Observer destroyed ===") + self._debug_log_file.close() + except Exception as e: + logger.error(f"Error closing debug log file: {e}") + pass + + def _schedule_turn_end(self, data: FramePushed) -> None: + """Schedule turn end with a timeout.""" + # Cancel any existing timer + self._cancel_turn_end_timer() + + # Create a new timer + loop = asyncio.get_running_loop() + self._end_turn_timer = loop.call_later( + self._turn_end_timeout_secs, + lambda: asyncio.create_task(self._end_turn_after_timeout(data)), + ) + self._log_debug(f" Scheduled turn end timer ({self._turn_end_timeout_secs}s)") + + def _cancel_turn_end_timer(self) -> None: + """Cancel the turn end timer if it exists.""" + if self._end_turn_timer: + self._end_turn_timer.cancel() + self._end_turn_timer = None + self._log_debug(" Cancelled turn end timer") + + async def _end_turn_after_timeout(self, data: FramePushed) -> None: + """End turn after timeout has expired.""" + if self._turn_active and not self._bot_speaking: + self._log_debug(f" Turn {self._turn_number} ending due to timeout") + await self._finish_turn(interrupted=False) + self._end_turn_timer = None + + async def on_push_frame(self, data: FramePushed) -> None: + """ + Called when a frame is pushed between processors. + + Args: + data: FramePushed event data with source, destination, frame, direction + """ + try: + frame = data.frame + frame_type = frame.__class__.__name__ + source_name = data.source.__class__.__name__ if data.source else "Unknown" + + # Skip already processed frames to avoid duplicates from propagation + if frame.id in self._processed_frames: + self._log_debug(f"FRAME (DUPLICATE SKIPPED): {frame_type} from {source_name}") + return + + # Mark frame as processed + self._processed_frames.add(int(frame.id)) + self._frame_history.append(frame.id) + + # If we've exceeded our history size, rebuild the set from deque + if len(self._processed_frames) > len(self._frame_history): + self._processed_frames = set(self._frame_history) + + # Log every frame + self._log_debug(f"FRAME: {frame_type} from {source_name}") + + # Turn tracking based on TurnTrackingObserver pattern + # Use generic speaking frames for turn boundaries + if isinstance(frame, StartFrame): + # Start the first turn immediately when pipeline starts + if self._turn_number == 0: + self._log_debug(" Starting first turn via StartFrame") + await self._start_turn(data) + + elif isinstance(frame, UserStartedSpeakingFrame): + await self._handle_user_started_speaking(data) + + elif isinstance(frame, BotStartedSpeakingFrame): + await self._handle_bot_started_speaking(data) + + elif isinstance(frame, BotStoppedSpeakingFrame) and self._bot_speaking: + await self._handle_bot_stopped_speaking(data) + + elif isinstance(frame, (EndFrame, CancelFrame)): + await self._handle_pipeline_end(data) + + # Collect conversation text (separate concern from turn boundaries) + # Only collect from final/complete frames to avoid duplication + if isinstance(frame, TranscriptionFrame): + # Collect user text from STT output + if self._turn_active and frame.text: + self._turn_user_text.append(frame.text) + self._log_debug(f" Collected user text: {frame.text[:50]}...") + + elif isinstance(frame, TTSTextFrame): + # Collect bot text from TTS input (final complete sentences) + # Only collect if the frame comes from an actual TTS service, not transport + # This prevents duplication when frames propagate through the pipeline + service_type = detect_service_type(data.source) + if self._turn_active and frame.text and service_type == "tts": + self._turn_bot_text.append(frame.text) + self._log_debug(f" Collected bot text from TTS: {frame.text[:50]}...") + + # Handle service frames for creating service spans + # Check both source (frames emitted BY service) + # and destination (frames received BY service) + source_service_type = detect_service_type(data.source) + dest_service_type = detect_service_type(data.destination) + + # Handle frames emitted by a service (outputs) + if source_service_type: + await self._handle_service_frame(data, is_input=False) + + # Handle frames received by a service (inputs) + # Only process if destination is different from source to avoid double-counting + if dest_service_type and data.destination != data.source: + await self._handle_service_frame(data, is_input=True) + + except Exception as e: + logger.debug(f"Error in observer: {e}") + + async def _handle_user_started_speaking(self, data: FramePushed) -> None: + """Handle user speaking events, including interruptions.""" + if self._bot_speaking: + # Handle interruption - end current turn and start a new one + self._log_debug(" User interruption detected - ending current turn") + self._cancel_turn_end_timer() + await self._finish_turn(interrupted=True) + self._bot_speaking = False # Bot is considered interrupted + self._log_debug(" Starting new turn after interruption") + await self._start_turn(data) + elif self._turn_active and self._has_bot_spoken: + # User started speaking during the turn_end_timeout_secs period after bot speech + self._log_debug(" User speaking after bot - ending turn and starting new one") + self._cancel_turn_end_timer() + await self._finish_turn(interrupted=False) + await self._start_turn(data) + elif not self._turn_active: + # Start a new turn after previous one ended + self._log_debug(" Starting new turn (user speaking)") + await self._start_turn(data) + else: + # User is speaking within the same turn (before bot has responded) + self._log_debug(f" User is already speaking in Turn {self._turn_number}") + + async def _handle_bot_started_speaking(self, data: FramePushed) -> None: + """Handle bot speaking events.""" + self._bot_speaking = True + self._has_bot_spoken = True + # Cancel any pending turn end timer when bot starts speaking again + self._cancel_turn_end_timer() + self._log_debug(" Bot started speaking") + + async def _handle_bot_stopped_speaking(self, data: FramePushed) -> None: + """Handle bot stopped speaking events.""" + self._bot_speaking = False + self._log_debug(" Bot stopped speaking") + # Schedule turn end with timeout + # This is needed to handle cases where the bot's speech ends and then resumes + # This can happen with HTTP TTS services or function calls + self._schedule_turn_end(data) + + async def _handle_pipeline_end(self, data: FramePushed) -> None: + """Handle pipeline end or cancellation by flushing any active turn.""" + if self._turn_active: + self._log_debug(" Pipeline ending - finishing active turn") + # Cancel any pending turn end timer + self._cancel_turn_end_timer() + # End the current turn + await self._finish_turn(interrupted=True) + + async def _handle_service_frame(self, data: FramePushed, is_input: bool = False) -> None: + """ + Handle frame from an LLM, TTS, or STT service. + Detects nested LLM calls within TTS/STT services. + + Args: + data: FramePushed event data + is_input: True if this frame is being received by the service (input), + False if being emitted by the service (output) + """ + from pipecat.frames.frames import ( + EndFrame, + ErrorFrame, + ) + + # Use destination for input frames, source for output frames + service = data.destination if is_input else data.source + service_id = id(service) + frame = data.frame + service_type = detect_service_type(service) + + if service_type != "unknown": + # Check if we need to create a new span + # For LLM services, LLMContextFrame signals a new invocation + # finish previous span if exists + if isinstance(frame, LLMContextFrame) and service_id in self._active_spans: + self._log_debug( + f" New LLM invocation detected" + f" Finishing previous span for service {service_id}" + ) + self._finish_span(service_id) + + # Check if we already have a span for this service + if service_id not in self._active_spans: + # If no turn is active yet, start one automatically + # This ensures we capture initialization frames with proper context + if not self._turn_active or self._turn_span is None: + self._log_debug( + f" No active turn - auto-starting turn for {service_id} initialization" + ) + await self._start_turn(data) + + # Create new span directly under turn (no nesting logic) + # All service spans are siblings under the turn span + span = self._create_service_span(service, service_type) + self._active_spans[service_id] = { + "span": span, + "service_type": service_type, # Track service type for later use + "frame_count": 0, + "accumulated_input": "", # Deduplicated accumulated input text + "accumulated_output": "", # Deduplicated accumulated output text + "start_time_ns": time.time_ns(), # Store start time in nanoseconds (Unix epoch) + "processing_time_seconds": None, # Will be set from metrics + } + + # Check if span still exists (it might have been ended by a previous call) + if service_id not in self._active_spans: + self._log_debug(f" Span for service {service_id} already ended, skipping frame") + return + + # Increment frame count for this service + span_info = self._active_spans[service_id] + span_info["frame_count"] += 1 + + # Extract and add attributes from this frame to the span + span = span_info["span"] + frame_attrs = extract_attributes_from_frame(frame) + + # Log frame direction for debugging + direction = "INPUT" if is_input else "OUTPUT" + self._log_debug( + f" Processing {direction} frame: {frame.__class__.__name__} for {service_type}" + ) + if frame_attrs: + self._log_debug( + f" Extracted {len(frame_attrs)} attributes: {list(frame_attrs.keys())}" + ) + else: + self._log_debug(" No attributes extracted from this frame") + + # Handle text chunk accumulation with deduplication + # IMPORTANT: Only collect INPUT chunks when frame is received by service (is_input=True) + # and only collect OUTPUT chunks when frame is emitted by service (is_input=False) + + # Check for streaming text chunks + text_chunk: str = frame_attrs.get("text.chunk", "") + accumulated: str = "" + if text_chunk: + # For TTS input frames, only accumulate if going to output transport + # This ensures we only capture complete sentences being sent to the user + if is_input and service_type == "tts": + # Check if destination is the final output transport + if not isinstance(data.destination, BaseOutputTransport): + self._log_debug(" Skipping TTS chunk (not going to output transport)") + text_chunk = "" # Skip this chunk + + if text_chunk and is_input: + # Input chunk - check if this extends our accumulated text + accumulated = span_info["accumulated_input"] + if not accumulated: + # First chunk + span_info["accumulated_input"] = text_chunk + self._log_debug(f" Accumulated INPUT chunk (first): {text_chunk}...") + elif text_chunk.startswith(accumulated): + # New chunk contains all previous text plus more (redundant pattern) + # Extract only the new part + new_part = text_chunk[len(accumulated) :] + if new_part: + span_info["accumulated_input"] = text_chunk + self._log_debug(f" Accumulated INPUT (new part): {new_part}...") + else: + self._log_debug(" Skipped fully redundant INPUT chunk") + elif accumulated and accumulated in text_chunk: + # Current accumulated text is contained in new chunk + # This means we're getting the full text again with more added + span_info["accumulated_input"] = text_chunk if text_chunk else "" + new_part = text_chunk.replace(accumulated, "", 1) if text_chunk else "" + self._log_debug(f" Accumulated INPUT (replaced): {new_part}...") + else: + # Non-overlapping chunk - just append + span_info["accumulated_input"] = accumulated + text_chunk + self._log_debug(f" Accumulated INPUT chunk (append): {text_chunk}...") + else: + # Output chunk - same logic + accumulated = span_info["accumulated_output"] + if not accumulated: + span_info["accumulated_output"] = text_chunk + self._log_debug(f" Accumulated OUTPUT chunk (first): {text_chunk}...") + elif text_chunk.startswith(accumulated): + new_part = text_chunk[len(accumulated) :] + if new_part: + span_info["accumulated_output"] = text_chunk + self._log_debug(f" Accumulated OUTPUT (new part): {new_part}...") + else: + self._log_debug(" Skipped fully redundant OUTPUT chunk") + elif accumulated in text_chunk: + span_info["accumulated_output"] = text_chunk + new_part = text_chunk.replace(accumulated, "", 1) + self._log_debug(f" Accumulated OUTPUT (replaced): {new_part}...") + elif accumulated and text_chunk: + span_info["accumulated_output"] = accumulated + text_chunk + self._log_debug(f" Accumulated OUTPUT chunk (append): {text_chunk}...") + else: + self._log_debug(" Skipped OUTPUT chunk (no accumulated text)") + + # Process all other attributes + for key, value in frame_attrs.items(): + # Skip text.chunk since we handled it above + if key == "text.chunk": + continue + + # Skip input-related attributes if this is an output frame + if not is_input and ( + key in (SpanAttributes.INPUT_VALUE, SpanAttributes.LLM_INPUT_MESSAGES) + or key.startswith("llm.input_messages.") + ): + self._log_debug( + f" Skipping INPUT attribute {key} (frame is OUTPUT from service)" + ) + continue + + # Skip output-related attributes if this is an input frame + if is_input and ( + key in (SpanAttributes.OUTPUT_VALUE, SpanAttributes.LLM_OUTPUT_MESSAGES) + or key.startswith("llm.output_messages.") + ): + self._log_debug( + f" Skipping OUTPUT attribute {key} (frame is INPUT to service)" + ) + continue + + # Handle complete (non-streaming) INPUT_VALUE (e.g., from TranscriptionFrame) + # Special case for STT: TranscriptionFrame is OUTPUT from STT but represents the + # transcribed text which should be recorded as INPUT to the span for observability + if key == SpanAttributes.INPUT_VALUE and value: + if is_input or service_type == "stt": + # This is a complete input, not streaming - set immediately + # For STT, we capture output transcriptions as input values + span.set_attribute(SpanAttributes.INPUT_VALUE, value) + self._log_debug(f" Set complete INPUT_VALUE: {str(value)[:100]}...") + + # Handle complete (non-streaming) OUTPUT_VALUE + elif key == SpanAttributes.OUTPUT_VALUE and value and not is_input: + # This is a complete output, not streaming - set immediately + span.set_attribute(SpanAttributes.OUTPUT_VALUE, value) + self._log_debug(f" Set complete OUTPUT_VALUE: {str(value)}...") + + elif key == "service.processing_time_seconds": + # Store processing time for use in _finish_span to calculate proper end_time + span_info["processing_time_seconds"] = value + span.set_attribute("service.processing_time_seconds", value) + else: + # For all other attributes, just set them (may overwrite) + span.set_attribute(key, value) + + # Store this as the last frame from this service + self._last_frames[service_id] = frame + + # Finish span only on completion frames (EndFrame or ErrorFrame) + if isinstance(frame, (EndFrame, ErrorFrame)): + self._finish_span(service_id) + + def _create_service_span( + self, + service: FrameProcessor, + service_type: str, + ) -> Span: + """ + Create a span for a service with type-specific attributes. + All service spans are created as children of the turn span. + + Args: + service: The service instance (FrameProcessor) + service_type: Service type (llm, tts, stt, image_gen, vision, mcp, websocket) + + Returns: + The created span + """ + span_name = f"pipecat.{service_type}" + self._log_debug(f">>> Creating {service_type} span") + + # Create span under the turn context + # Explicitly set the turn span as parent to avoid context issues in async code + if self._turn_span and self._turn_active: + turn_context = trace_api.set_span_in_context(self._turn_span) + span = self._tracer.start_span( + name=span_name, + context=turn_context, + ) + self._log_debug(f" Created service span under turn #{self._turn_number}") + else: + # No active turn, create as root span (will be in new trace) + self._log_debug(f" WARNING: No active turn! Creating root span for {service_type}") + span = self._tracer.start_span( + name=span_name, + ) + + # Set service.name to the actual service class name for uniqueness + span.set_attribute("service.name", service.__class__.__name__) + + # Extract and apply service-specific attributes + service_attrs = extract_service_attributes(service) + span.set_attributes(service_attrs) + self._log_debug(f" Set attributes: {service_attrs}") + + return span + + def _finish_span(self, service_id: int) -> None: + """ + Finish a span for a service. + + Args: + service_id: The id() of the service instance + """ + if service_id not in self._active_spans: + return + + span_info = self._active_spans.pop(service_id) + span: Span = span_info["span"] + start_time_ns = span_info["start_time_ns"] + + # Calculate end time (use processing time if available, otherwise use current time) + processing_time_seconds = span_info.get("processing_time_seconds") + if processing_time_seconds is not None: + end_time_ns = start_time_ns + int(processing_time_seconds * 1_000_000_000) + else: + end_time_ns = time.time_ns() + + # Set accumulated input/output text values from streaming chunks + # These were deduplicated during accumulation + accumulated_input = span_info.get("accumulated_input", "") + accumulated_output = span_info.get("accumulated_output", "") + + if accumulated_input: + span.set_attribute(SpanAttributes.INPUT_VALUE, accumulated_input) + self._log_debug( + f" Set input.value from accumulated chunks: {len(accumulated_input)} chars" + ) + + if accumulated_output: + span.set_attribute(SpanAttributes.OUTPUT_VALUE, accumulated_output) + self._log_debug( + f" Set output.value from accumulated chunks: {len(accumulated_output)} chars" + ) + + # For LLM spans, also set flattened output messages format + service_type = span_info.get("service_type") + if service_type == "llm": + span.set_attribute("llm.output_messages.0.message.role", "assistant") + span.set_attribute("llm.output_messages.0.message.content", accumulated_output) + + span.set_status(trace_api.Status(trace_api.StatusCode.OK)) # + span.end(end_time=int(end_time_ns)) + return + + async def _start_turn(self, data: FramePushed) -> None: + """Start a new conversation turn and set it as parent context.""" + self._turn_active = True + self._has_bot_spoken = False + self._turn_number += 1 + self._turn_start_time = time.time_ns() # Use our own clock for consistency + + self._log_debug(f"\n{'=' * 60}") + self._log_debug(f">>> STARTING TURN #{self._turn_number}") + self._log_debug(f" Conversation ID: {self._conversation_id}") + + # Create turn span as root (no parent) + # Each turn will be a separate trace automatically + # Use an empty context to ensure no ambient parent span is picked up + span_attributes = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, + "conversation.turn_number": self._turn_number, + } + if self._additional_span_attributes: + span_attributes.update(self._additional_span_attributes) + self._turn_span = self._tracer.start_span( + name="pipecat.conversation.turn", + context=Context(), # Empty context ensures this is a true root span + attributes=span_attributes, # type: ignore + ) + + if self._conversation_id: + self._turn_span.set_attribute( # + SpanAttributes.SESSION_ID, self._conversation_id + ) + self._log_debug(f" Set session.id attribute: {self._conversation_id}") + + self._turn_user_text = [] + self._turn_bot_text = [] + return + + async def _finish_turn(self, interrupted: bool = False) -> None: + """ + Finish the current conversation turn and detach context. + + Args: + interrupted: Whether the turn was interrupted + """ + if not self._turn_active or not self._turn_span: + self._log_debug(" Skipping finish_turn - no active turn") + return + + # Calculate turn duration + duration = 0.0 + current_time_ns = time.time_ns() + duration = (current_time_ns - self._turn_start_time) / 1_000_000_000 # Convert to seconds + + self._log_debug(f"\n{'=' * 60}") + self._log_debug( + f">>> FINISHING TURN #{self._turn_number}" + + f" (interrupted={interrupted}, duration={duration:.2f}s)" + ) + self._log_debug(f" Active service spans: {len(self._active_spans)}") + + # Set input/output attributes + if self._turn_user_text: + user_input = " ".join(self._turn_user_text) + self._turn_span.set_attribute(SpanAttributes.INPUT_VALUE, user_input) # + + if self._turn_bot_text: + bot_output = " ".join(self._turn_bot_text) + self._turn_span.set_attribute(SpanAttributes.OUTPUT_VALUE, bot_output) # + + # Finish all active service spans BEFORE ending the turn span + # This ensures child spans are ended before the parent + service_ids_to_finish = list(self._active_spans.keys()) + for service_id in service_ids_to_finish: + self._finish_span(service_id) + + # Set turn metadata + end_reason = "interrupted" if interrupted else "completed" + self._turn_span.set_attribute("conversation.end_reason", end_reason) # + self._turn_span.set_attribute("conversation.turn_duration_seconds", duration) + self._turn_span.set_attribute("conversation.was_interrupted", interrupted) + + # Finish turn span (parent) last + self._turn_span.set_status(trace_api.Status(trace_api.StatusCode.OK)) # + self._turn_span.end(end_time=int(current_time_ns)) # + + # Clear turn state + self._log_debug(" Clearing turn state") + self._turn_active = False + self._turn_span = None + self._turn_context_token = None diff --git a/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/package.py b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/package.py new file mode 100644 index 000000000..f4ad47ca8 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/package.py @@ -0,0 +1,3 @@ +"""Package metadata for Pipecat instrumentation.""" + +_instruments = ("pipecat-ai",) diff --git a/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/version.py b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/version.py new file mode 100644 index 000000000..3dc1f76bc --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/conftest.py b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/conftest.py new file mode 100644 index 000000000..c889007a3 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/conftest.py @@ -0,0 +1,387 @@ +""" +Shared test fixtures for Pipecat instrumentation tests. +""" + +import asyncio +from typing import AsyncGenerator, List + +import pytest +from opentelemetry import trace as trace_api +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from pipecat.frames.frames import ( + AudioRawFrame, + EndFrame, + Frame, + LLMMessagesUpdateFrame, + StartFrame, + TextFrame, + TranscriptionFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.task import PipelineTask +from pipecat.services.llm_service import LLMService +from pipecat.services.stt_service import STTService +from pipecat.services.tts_service import TTSService + +# Mock Services for Testing + + +class MockLLMService(LLMService): + """Mock LLM service for testing""" + + def __init__(self, *, model: str = "mock-model", provider: str = "mock", **kwargs): + super().__init__(**kwargs) + self._model = model + self._model_name = model # Set the private attribute directly + self._provider = provider + self.processed_frames = [] + # Set module to simulate provider + self.__class__.__module__ = f"pipecat.services.{provider}.llm" + + async def process_frame(self, frame: Frame, direction): + self.processed_frames.append(frame) + if isinstance(frame, LLMMessagesUpdateFrame): + # Simulate LLM response + response = TextFrame(text="Mock LLM response") + await self.push_frame(response, direction) + return await super().process_frame(frame, direction) + + +class MockTTSService(TTSService): + """Mock TTS service for testing""" + + def __init__( + self, + *, + model: str = "mock-tts", + voice: str = "mock-voice", + provider: str = "mock", + **kwargs, + ): + super().__init__(**kwargs) + self._model = model + self._model_name = model # Set the private attribute directly + self._voice = voice + self._voice_id = voice # Real Pipecat services use _voice_id + self._sample_rate = 16000 # Use private attribute for sample_rate + self._provider = provider + self.processed_texts = [] + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + """Convert text to mock audio frames""" + self.processed_texts.append(text) + # Simulate audio frame generation + audio_data = b"\x00" * 1024 # Mock audio data + yield AudioRawFrame(audio=audio_data, sample_rate=16000, num_channels=1) + + +class MockSTTService(STTService): + """Mock STT service for testing""" + + def __init__(self, *, model: str = "mock-stt", provider: str = "mock", **kwargs): + super().__init__(**kwargs) + self._model = model + self._model_name = model # Set the private attribute directly + self._provider = provider + self._user_id = "test-user" # Add user_id for STT metadata extraction + self._sample_rate = 16000 # Use private attribute for sample_rate + self._muted = False # Use private attribute for is_muted + self.processed_audio = [] + + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + """Convert audio to mock transcription""" + self.processed_audio.append(audio) + # Simulate transcription + yield TranscriptionFrame(text="Mock transcription", user_id="test-user", timestamp=0) + + +# Service Factory Functions - Better approach than multiple mock classes + + +def create_mock_service(service_class, provider: str, service_type: str, **kwargs): + """ + Factory function to create mock services with proper provider attribution. + + Args: + service_class: Base service class (MockLLMService, MockTTSService, MockSTTService) + provider: Provider name (openai, anthropic, elevenlabs, deepgram) + service_type: Service type (llm, tts, stt) + **kwargs: Additional arguments passed to service constructor + """ + # Create instance + service = service_class(provider=provider, **kwargs) + + # Set module path to simulate real provider service + service.__class__.__module__ = f"pipecat.services.{provider}.{service_type}" + + return service + + +# Convenience factory functions for common providers +def create_openai_llm(model: str = "gpt-4", **kwargs): + """Create mock OpenAI LLM service""" + return create_mock_service(MockLLMService, "openai", "llm", model=model, **kwargs) + + +def create_openai_tts(model: str = "tts-1", voice: str = "alloy", **kwargs): + """Create mock OpenAI TTS service""" + return create_mock_service(MockTTSService, "openai", "tts", model=model, voice=voice, **kwargs) + + +def create_openai_stt(model: str = "whisper-1", **kwargs): + """Create mock OpenAI STT service""" + return create_mock_service(MockSTTService, "openai", "stt", model=model, **kwargs) + + +def create_anthropic_llm(model: str = "claude-3-5-sonnet-20241022", **kwargs): + """Create mock Anthropic LLM service""" + return create_mock_service(MockLLMService, "anthropic", "llm", model=model, **kwargs) + + +def create_elevenlabs_tts( + voice_id: str = "mock-voice-id", model: str = "eleven_turbo_v2", **kwargs +): + """Create mock ElevenLabs TTS service""" + service = create_mock_service( + MockTTSService, "elevenlabs", "tts", model=model, voice=voice_id, **kwargs + ) + service._voice_id = voice_id + return service + + +def create_deepgram_stt(model: str = "nova-2", **kwargs): + """Create mock Deepgram STT service""" + return create_mock_service(MockSTTService, "deepgram", "stt", model=model, **kwargs) + + +def create_cartesia_tts(model: str = "sonic-english", voice_id: str = "mock-voice", **kwargs): + """Create mock Cartesia TTS service""" + return create_mock_service( + MockTTSService, "cartesia", "tts", model=model, voice=voice_id, **kwargs + ) + + +# Fixtures + + +@pytest.fixture +def in_memory_span_exporter(): + """Create an in-memory span exporter for testing""" + exporter = InMemorySpanExporter() + yield exporter + # Clear spans after each test + exporter.clear() + + +@pytest.fixture +def tracer_provider(in_memory_span_exporter): + """Create a tracer provider with in-memory exporter""" + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(in_memory_span_exporter)) + trace_api.set_tracer_provider(provider) + return provider + + +@pytest.fixture +def tracer(tracer_provider): + """Create a tracer for testing""" + return tracer_provider.get_tracer(__name__) + + +@pytest.fixture +def mock_llm_service(): + """Create a mock LLM service""" + return MockLLMService() + + +@pytest.fixture +def mock_tts_service(): + """Create a mock TTS service""" + return MockTTSService() + + +@pytest.fixture +def mock_stt_service(): + """Create a mock STT service""" + return MockSTTService() + + +@pytest.fixture +def mock_openai_llm(): + """Create a mock OpenAI LLM service""" + return create_openai_llm() + + +@pytest.fixture +def mock_openai_tts(): + """Create a mock OpenAI TTS service""" + return create_openai_tts() + + +@pytest.fixture +def mock_openai_stt(): + """Create a mock OpenAI STT service""" + return create_openai_stt() + + +@pytest.fixture +def mock_anthropic_llm(): + """Create a mock Anthropic LLM service""" + return create_anthropic_llm() + + +@pytest.fixture +def mock_elevenlabs_tts(): + """Create a mock ElevenLabs TTS service""" + return create_elevenlabs_tts() + + +@pytest.fixture +def mock_deepgram_stt(): + """Create a mock Deepgram STT service""" + return create_deepgram_stt() + + +@pytest.fixture +def simple_pipeline(mock_stt_service, mock_llm_service, mock_tts_service): + """Create a simple pipeline with STT -> LLM -> TTS""" + return Pipeline([mock_stt_service, mock_llm_service, mock_tts_service]) + + +@pytest.fixture +def openai_pipeline(mock_openai_stt, mock_openai_llm, mock_openai_tts): + """Create a pipeline with OpenAI services""" + return Pipeline([mock_openai_stt, mock_openai_llm, mock_openai_tts]) + + +@pytest.fixture +def mixed_provider_pipeline(mock_deepgram_stt, mock_anthropic_llm, mock_elevenlabs_tts): + """Create a pipeline with mixed service providers""" + return Pipeline([mock_deepgram_stt, mock_anthropic_llm, mock_elevenlabs_tts]) + + +@pytest.fixture +def event_loop(): + """Create an event loop for async tests""" + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest.fixture +def pipeline_task(simple_pipeline): + """Create a pipeline task""" + return PipelineTask(simple_pipeline) + + +def get_spans_by_name(exporter: InMemorySpanExporter, name: str) -> List: + """Helper to get spans by name from exporter""" + return [span for span in exporter.get_finished_spans() if span.name.startswith(name)] + + +def get_span_attributes(span) -> dict: + """Helper to get span attributes as dict""" + return dict(span.attributes) if span.attributes else {} + + +def assert_span_has_attributes(span, expected_attributes: dict): + """Assert that span has expected attributes""" + actual = get_span_attributes(span) + for key, value in expected_attributes.items(): + assert key in actual, f"Attribute {key} not found in span" + assert actual[key] == value, f"Expected {key}={value}, got {actual[key]}" + + +def assert_span_hierarchy(spans: List, expected_hierarchy: List[str]): + """ + Assert that spans form the expected parent-child hierarchy. + expected_hierarchy is a list of span names from root to leaf. + """ + span_by_name = {span.name: span for span in spans} + + for i in range(len(expected_hierarchy) - 1): + parent_name = expected_hierarchy[i] + child_name = expected_hierarchy[i + 1] + + assert parent_name in span_by_name, f"Parent span {parent_name} not found" + assert child_name in span_by_name, f"Child span {child_name} not found" + + parent_span = span_by_name[parent_name] + child_span = span_by_name[child_name] + + assert child_span.parent.span_id == parent_span.context.span_id, ( + f"{child_name} is not a child of {parent_name}" + ) + + +async def run_pipeline_task(task: PipelineTask, *frames: Frame, send_start_frame: bool = True): + """ + Helper to run a pipeline task with given frames. + + This simulates pipeline execution by manually triggering frame processing + through the observers, which is sufficient for testing instrumentation. + + Args: + task: The PipelineTask to run + *frames: Frames to queue before running the task + send_start_frame: Whether to send StartFrame first (default: True) + """ + from pipecat.processors.frame_processor import FrameDirection + + # Mock data class for frame push events + class MockFramePushData: + def __init__(self, source, frame): + import time + + self.source = source + self.frame = frame + self.destination = None + self.direction = FrameDirection.DOWNSTREAM + self.timestamp = time.time_ns() # Nanoseconds for TurnTrackingObserver + # Ensure frame has an id attribute for TurnTrackingObserver compatibility + if not hasattr(frame, "id"): + frame.id = id(frame) + + # Get the pipeline processors (services) + # The structure is: task._pipeline._processors contains [Source, Pipeline, Sink] + # The actual services are in the nested Pipeline._processors + processors = [] + if hasattr(task, "_pipeline"): + pipeline = task._pipeline + if hasattr(pipeline, "_processors") and len(pipeline._processors) > 1: + # The middle item is the actual Pipeline containing the services + nested_pipeline = pipeline._processors[1] + if hasattr(nested_pipeline, "_processors"): + processors = nested_pipeline._processors + + # Get all observers from the task + # The task has a TaskObserver wrapper which contains the actual observers + observers = [] + if hasattr(task, "_observer") and task._observer: + task_observer = task._observer + # TaskObserver has _observers list containing the real observers + if hasattr(task_observer, "_observers") and task_observer._observers: + observers.extend(task_observer._observers) + + # Send StartFrame first to initialize first turn + if send_start_frame: + for processor in processors: + for observer in observers: + if hasattr(observer, "on_push_frame"): + await observer.on_push_frame(MockFramePushData(processor, StartFrame())) + + # Trigger observer callbacks for each frame through each processor + for frame in frames: + for processor in processors: + # Notify all observers about this frame push + for observer in observers: + if hasattr(observer, "on_push_frame"): + await observer.on_push_frame(MockFramePushData(processor, frame)) + + # Always send EndFrame to finish spans + for processor in processors: + for observer in observers: + if hasattr(observer, "on_push_frame"): + await observer.on_push_frame(MockFramePushData(processor, EndFrame())) diff --git a/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_instrumentor.py b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_instrumentor.py new file mode 100644 index 000000000..1a2d938fb --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_instrumentor.py @@ -0,0 +1,187 @@ +""" +Test the PipecatInstrumentor class for automatic observer injection. +""" + +from pipecat.pipeline.task import PipelineTask + +from openinference.instrumentation.pipecat import PipecatInstrumentor + + +class TestInstrumentorBasics: + """Test basic instrumentor functionality""" + + def test_instrumentor_can_be_imported(self): + """Test that instrumentor can be imported""" + assert PipecatInstrumentor is not None + + def test_instrumentor_initialization(self): + """Test instrumentor can be initialized""" + instrumentor = PipecatInstrumentor() + assert instrumentor is not None + + def test_instrumentor_instrument(self, tracer_provider): + """Test instrumentor can be instrumented""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + assert instrumentor.is_instrumented_by_opentelemetry + + def test_instrumentor_uninstrument(self, tracer_provider): + """Test instrumentor can be uninstrumented""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + instrumentor.uninstrument() + assert not instrumentor.is_instrumented_by_opentelemetry + + def test_double_instrument_is_safe(self, tracer_provider): + """Test that double instrumentation is safe""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + instrumentor.instrument(tracer_provider=tracer_provider) # Should not raise + assert instrumentor.is_instrumented_by_opentelemetry + + def test_uninstrument_without_instrument_is_safe(self): + """Test that uninstrument without instrument is safe""" + instrumentor = PipecatInstrumentor() + instrumentor.uninstrument() # Should not raise + + +class TestObserverInjection: + """Test automatic observer injection into PipelineTask""" + + def test_observer_injected_automatically(self, tracer_provider, simple_pipeline): + """Test that observer is automatically injected into PipelineTask""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + # Create a task - observer should be auto-injected + task = PipelineTask(simple_pipeline) + + # Check that task has observers + # Note: Implementation will need to expose observers for verification + # or we verify via generated spans + assert task is not None + + instrumentor.uninstrument() + + def test_multiple_tasks_get_separate_observers(self, tracer_provider, simple_pipeline): + """Test that each task gets its own observer instance (thread safety)""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + # Create multiple tasks + task1 = PipelineTask(simple_pipeline) + task2 = PipelineTask(simple_pipeline) + + # Each should have independent observer state + # Verify via task execution producing independent spans + assert task1 is not None + assert task2 is not None + assert task1 is not task2 + + instrumentor.uninstrument() + + def test_existing_observers_preserved(self, tracer_provider, simple_pipeline): + """Test that existing observers are preserved when auto-injecting""" + from pipecat.observers.base_observer import BaseObserver + + class CustomObserver(BaseObserver): + def __init__(self): + super().__init__() + self.events = [] + + async def on_push_frame(self, data): + self.events.append(data) + + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + custom_observer = CustomObserver() + task = PipelineTask(simple_pipeline, observers=[custom_observer]) + + # Custom observer should still be present + # Implementation should add OpenInferenceObserver without removing custom ones + assert task is not None + + instrumentor.uninstrument() + + def test_manual_observer_creation(self, tracer_provider): + """Test manual observer creation for advanced use cases""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + # Create observer manually + observer = instrumentor.create_observer() + assert observer is not None + + instrumentor.uninstrument() + + +class TestInstrumentationWithConfig: + """Test instrumentation with various configurations""" + + def test_instrument_with_trace_config(self, tracer_provider): + """Test instrumentation with custom TraceConfig""" + from openinference.instrumentation import TraceConfig + + config = TraceConfig() + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider, config=config) + + assert instrumentor.is_instrumented_by_opentelemetry + instrumentor.uninstrument() + + +class TestInstrumentorLifecycle: + """Test instrumentor lifecycle and cleanup""" + + def test_instrumentor_singleton_behavior(self, tracer_provider): + """Test that multiple instrumentor instances behave correctly""" + instrumentor1 = PipecatInstrumentor() + instrumentor2 = PipecatInstrumentor() + + instrumentor1.instrument(tracer_provider=tracer_provider) + + # Second instrumentor should detect first is already instrumented + assert instrumentor1.is_instrumented_by_opentelemetry + assert instrumentor2.is_instrumented_by_opentelemetry # Singleton pattern + + instrumentor1.uninstrument() + + def test_cleanup_on_uninstrument(self, tracer_provider, simple_pipeline): + """Test that uninstrument properly cleans up""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + # Create task while instrumented + task1 = PipelineTask(simple_pipeline) + + instrumentor.uninstrument() + + # New tasks should not get observer after uninstrument + task2 = PipelineTask(simple_pipeline) + + assert task1 is not None + assert task2 is not None + + def test_reinstrumentation(self, tracer_provider): + """Test that instrumentation can be re-applied after uninstrument""" + instrumentor = PipecatInstrumentor() + + instrumentor.instrument(tracer_provider=tracer_provider) + instrumentor.uninstrument() + instrumentor.instrument(tracer_provider=tracer_provider) + + assert instrumentor.is_instrumented_by_opentelemetry + instrumentor.uninstrument() + + +class TestInstrumentationDependencies: + """Test that instrumentation properly declares dependencies""" + + def test_instrumentation_dependencies(self): + """Test that instrumentor declares correct dependencies""" + instrumentor = PipecatInstrumentor() + dependencies = instrumentor.instrumentation_dependencies() + + # Should declare pipecat as dependency + assert "pipecat" in dependencies or "pipecat-ai" in dependencies diff --git a/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_provider_spans.py b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_provider_spans.py new file mode 100644 index 000000000..2cd7132d3 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_provider_spans.py @@ -0,0 +1,525 @@ +""" +Test span creation for different service providers (OpenAI, Anthropic, ElevenLabs, Deepgram). +Ensures that base class instrumentation works across all provider implementations. +""" + +import json + +import pytest +from conftest import assert_span_has_attributes, get_spans_by_name, run_pipeline_task +from pipecat.frames.frames import ( + AudioRawFrame, + LLMContextFrame, + LLMMessagesUpdateFrame, + TextFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.task import PipelineTask + +from openinference.instrumentation.pipecat import PipecatInstrumentor +from openinference.semconv.trace import OpenInferenceSpanKindValues, SpanAttributes + + +class TestOpenAISpans: + """Test span creation for OpenAI services""" + + @pytest.mark.asyncio + async def test_openai_llm_span(self, tracer_provider, in_memory_span_exporter, mock_openai_llm): + """Test that OpenAI LLM service creates proper spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + try: + pipeline = Pipeline([mock_openai_llm]) + task = PipelineTask(pipeline) # Use default settings so pipeline can complete + + # Send LLM request and run pipeline + messages = [{"role": "user", "content": "Hello"}] + await run_pipeline_task(task, LLMMessagesUpdateFrame(messages=messages, run_llm=True)) + + llm_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.llm") + + assert len(llm_spans) > 0 + llm_span = llm_spans[0] + + expected_attrs = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, + "service.name": "MockLLMService", # Class name of the service + SpanAttributes.LLM_MODEL_NAME: "gpt-4", + SpanAttributes.LLM_PROVIDER: "openai", # Provider from metadata + } + assert_span_has_attributes(llm_span, expected_attrs) + finally: + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_openai_tts_span(self, tracer_provider, in_memory_span_exporter, mock_openai_tts): + """Test that OpenAI TTS service creates proper spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_openai_tts]) + task = PipelineTask(pipeline) + + # Send text to convert to speech + await run_pipeline_task(task, TextFrame(text="Hello world")) + + tts_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.tts") + + assert len(tts_spans) > 0 + tts_span = tts_spans[0] + + expected_attrs = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, + "service.name": "MockTTSService", # Class name + "audio.voice": "alloy", + } + assert_span_has_attributes(tts_span, expected_attrs) + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_openai_stt_span(self, tracer_provider, in_memory_span_exporter, mock_openai_stt): + """Test that OpenAI STT service creates proper spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_openai_stt]) + task = PipelineTask(pipeline) + + # Send audio to transcribe + audio_data = b"\x00" * 1024 + await run_pipeline_task( + task, AudioRawFrame(audio=audio_data, sample_rate=16000, num_channels=1) + ) + + stt_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.stt") + + assert len(stt_spans) > 0 + stt_span = stt_spans[0] + + expected_attrs = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, + "service.name": "MockSTTService", # Class name + } + assert_span_has_attributes(stt_span, expected_attrs) + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_openai_full_pipeline( + self, tracer_provider, in_memory_span_exporter, openai_pipeline + ): + """Test full OpenAI pipeline (STT -> LLM -> TTS)""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(openai_pipeline) + + # Simulate full conversation flow + audio_data = b"\x00" * 1024 + await run_pipeline_task( + task, AudioRawFrame(audio=audio_data, sample_rate=16000, num_channels=1) + ) + + # Should have spans for all three phases + stt_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.stt") + llm_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.llm") + tts_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.tts") + + assert len(stt_spans) > 0 + # LLM and TTS may not be triggered in mock, but structure is tested + + # All should be Mock services with OpenAI provider + for span in stt_spans + llm_spans + tts_spans: + attrs = dict(span.attributes) + service_name = attrs.get("service.name") + assert service_name in [ + "MockSTTService", + "MockLLMService", + "MockTTSService", + ] + + instrumentor.uninstrument() + + +class TestAnthropicSpans: + """Test span creation for Anthropic services""" + + @pytest.mark.asyncio + async def test_anthropic_llm_span( + self, tracer_provider, in_memory_span_exporter, mock_anthropic_llm + ): + """Test that Anthropic LLM service creates proper spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_anthropic_llm]) + task = PipelineTask(pipeline) + + messages = [{"role": "user", "content": "Hello Claude"}] + await run_pipeline_task(task, LLMMessagesUpdateFrame(messages=messages, run_llm=True)) + + llm_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.llm") + + assert len(llm_spans) > 0 + llm_span = llm_spans[0] + + expected_attrs = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, + "service.name": "MockLLMService", # Class name + SpanAttributes.LLM_MODEL_NAME: "claude-3-5-sonnet-20241022", + SpanAttributes.LLM_PROVIDER: "anthropic", + } + assert_span_has_attributes(llm_span, expected_attrs) + + instrumentor.uninstrument() + + +class TestElevenLabsSpans: + """Test span creation for ElevenLabs TTS service""" + + @pytest.mark.asyncio + async def test_elevenlabs_tts_span( + self, tracer_provider, in_memory_span_exporter, mock_elevenlabs_tts + ): + """Test that ElevenLabs TTS service creates proper spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_elevenlabs_tts]) + task = PipelineTask(pipeline) + + await run_pipeline_task(task, TextFrame(text="Test speech")) + + tts_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.tts") + + assert len(tts_spans) > 0 + tts_span = tts_spans[0] + + expected_attrs = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, + "service.name": "MockTTSService", # Class name + } + assert_span_has_attributes(tts_span, expected_attrs) + + # Should have audio.voice or audio.voice_id attribute + attrs = dict(tts_span.attributes) + assert "audio.voice" in attrs or "audio.voice_id" in attrs + + instrumentor.uninstrument() + + +class TestDeepgramSpans: + """Test span creation for Deepgram STT service""" + + @pytest.mark.asyncio + async def test_deepgram_stt_span( + self, tracer_provider, in_memory_span_exporter, mock_deepgram_stt + ): + """Test that Deepgram STT service creates proper spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_deepgram_stt]) + task = PipelineTask(pipeline) + + audio_data = b"\x00" * 1024 + await run_pipeline_task( + task, AudioRawFrame(audio=audio_data, sample_rate=16000, num_channels=1) + ) + + stt_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.stt") + + assert len(stt_spans) > 0 + stt_span = stt_spans[0] + + expected_attrs = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, + "service.name": "MockSTTService", # Class name + } + assert_span_has_attributes(stt_span, expected_attrs) + + instrumentor.uninstrument() + + +class TestMixedProviderPipeline: + """Test pipelines with multiple different providers""" + + @pytest.mark.asyncio + async def test_mixed_provider_span_creation( + self, tracer_provider, in_memory_span_exporter, mixed_provider_pipeline + ): + """Test that mixed provider pipeline creates spans for all services""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(mixed_provider_pipeline) + + # Simulate flow through pipeline + audio_data = b"\x00" * 1024 + await run_pipeline_task( + task, AudioRawFrame(audio=audio_data, sample_rate=16000, num_channels=1) + ) + + spans = in_memory_span_exporter.get_finished_spans() + + # Check we have spans from different providers + providers_found = set() + for span in spans: + attrs = dict(span.attributes) + if "service.name" in attrs: + providers_found.add(attrs["service.name"]) + + # Should have at least some of: deepgram, anthropic, elevenlabs + assert len(providers_found) > 0 + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_mixed_providers_maintain_correct_attribution( + self, tracer_provider, in_memory_span_exporter, mixed_provider_pipeline + ): + """Test that each span is attributed to correct provider""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(mixed_provider_pipeline) + + audio_data = b"\x00" * 1024 + await run_pipeline_task( + task, AudioRawFrame(audio=audio_data, sample_rate=16000, num_channels=1) + ) + + # STT span should be MockSTTService with deepgram provider + stt_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.stt") + if stt_spans: + attrs = dict(stt_spans[0].attributes) + assert attrs.get("service.name") == "MockSTTService" + + # LLM span should be MockLLMService with anthropic provider + llm_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.llm") + if llm_spans: + attrs = dict(llm_spans[0].attributes) + assert attrs.get("service.name") == "MockLLMService" + assert attrs.get(SpanAttributes.LLM_PROVIDER) == "anthropic" + + # TTS span should be MockTTSService with elevenlabs provider + tts_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.tts") + if tts_spans: + attrs = dict(tts_spans[0].attributes) + assert attrs.get("service.name") == "MockTTSService" + + instrumentor.uninstrument() + + +class TestSpanInputOutput: + """Test that spans capture input and output correctly for different providers""" + + @pytest.mark.asyncio + async def test_stt_output_captured( + self, tracer_provider, in_memory_span_exporter, mock_openai_stt + ): + """Test that STT span captures output transcription""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_openai_stt]) + task = PipelineTask(pipeline) + + audio_data = b"\x00" * 1024 + await run_pipeline_task( + task, AudioRawFrame(audio=audio_data, sample_rate=16000, num_channels=1) + ) + + stt_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.stt") + + if stt_spans: + attrs = dict(stt_spans[0].attributes) + output_value = attrs.get(SpanAttributes.OUTPUT_VALUE) + + # Mock STT returns "Mock transcription" + if output_value: + assert "Mock transcription" in str(output_value) + + instrumentor.uninstrument() + + +class TestProviderSpecificAttributes: + """Test provider-specific attributes are captured""" + + @pytest.mark.asyncio + async def test_openai_model_attribute( + self, tracer_provider, in_memory_span_exporter, mock_openai_llm + ): + """Test that OpenAI spans include model information""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_openai_llm]) + task = PipelineTask(pipeline) + + messages = [{"role": "user", "content": "Test"}] + await run_pipeline_task(task, LLMMessagesUpdateFrame(messages=messages, run_llm=True)) + + llm_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.llm") + + if llm_spans: + attrs = dict(llm_spans[0].attributes) + assert SpanAttributes.LLM_MODEL_NAME in attrs + assert attrs[SpanAttributes.LLM_MODEL_NAME] == "gpt-4" + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_anthropic_model_attribute( + self, tracer_provider, in_memory_span_exporter, mock_anthropic_llm + ): + """Test that Anthropic spans include correct model""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_anthropic_llm]) + task = PipelineTask(pipeline) + + messages = [{"role": "user", "content": "Test"}] + await run_pipeline_task(task, LLMMessagesUpdateFrame(messages=messages, run_llm=True)) + + llm_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.llm") + + if llm_spans: + attrs = dict(llm_spans[0].attributes) + assert SpanAttributes.LLM_MODEL_NAME in attrs + assert "claude" in attrs[SpanAttributes.LLM_MODEL_NAME].lower() + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_elevenlabs_voice_attribute( + self, tracer_provider, in_memory_span_exporter, mock_elevenlabs_tts + ): + """Test that ElevenLabs TTS includes voice_id""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_elevenlabs_tts]) + task = PipelineTask(pipeline) + + await run_pipeline_task(task, TextFrame(text="Test")) + + tts_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.tts") + + if tts_spans: + attrs = dict(tts_spans[0].attributes) + # Should have audio.voice or audio.voice_id attribute + has_voice = "audio.voice" in attrs or "audio.voice_id" in attrs + assert has_voice + + instrumentor.uninstrument() + + +class TestLLMContextFrame: + """Test that LLMContextFrame attributes are properly captured""" + + @pytest.mark.asyncio + async def test_llm_context_frame_captures_messages( + self, tracer_provider, in_memory_span_exporter, mock_openai_llm + ): + """Test that LLMContextFrame messages are extracted and added to span attributes""" + + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_openai_llm]) + task = PipelineTask(pipeline) + + # Create a mock LLMContext with messages + class MockLLMContext: + def __init__(self): + self._messages = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + self._tools = None + self._tool_choice = None + + mock_context = MockLLMContext() + context_frame = LLMContextFrame(context=mock_context) + + # Send the context frame through the pipeline + await run_pipeline_task(task, context_frame) + + # Get all spans - LLMContextFrame should be captured on service spans + spans = in_memory_span_exporter.get_finished_spans() + + # Look for spans with LLM context attributes + found_context_attrs = False + for span in spans: + attrs = dict(span.attributes) if span.attributes else {} + if "llm.messages_count" in attrs: + found_context_attrs = True + assert attrs["llm.messages_count"] == 2 + + # Verify messages were serialized + if SpanAttributes.LLM_INPUT_MESSAGES in attrs: + messages_json = attrs[SpanAttributes.LLM_INPUT_MESSAGES] + messages = json.loads(messages_json) + assert len(messages) == 2 + assert messages[0]["role"] == "user" + assert messages[1]["role"] == "assistant" + + # Should also be in INPUT_VALUE + if SpanAttributes.INPUT_VALUE in attrs: + input_value = attrs[SpanAttributes.INPUT_VALUE] + assert "Hello" in input_value + + # LLMContextFrame tracking may be optional depending on implementation + # but if present, it should have correct structure + if found_context_attrs: + assert True # Attributes were found and validated + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_llm_context_frame_with_tools( + self, tracer_provider, in_memory_span_exporter, mock_openai_llm + ): + """Test that LLMContextFrame with tools captures tool count""" + + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + pipeline = Pipeline([mock_openai_llm]) + task = PipelineTask(pipeline) + + # Create a mock LLMContext with messages and tools + class MockLLMContext: + def __init__(self): + self._messages = [{"role": "user", "content": "What's the weather?"}] + self._tools = [ + {"name": "get_weather", "description": "Get weather info"}, + {"name": "get_time", "description": "Get current time"}, + ] + self._tool_choice = None + + mock_context = MockLLMContext() + context_frame = LLMContextFrame(context=mock_context) + + # Send the context frame through the pipeline + await run_pipeline_task(task, context_frame) + + # Get all spans + spans = in_memory_span_exporter.get_finished_spans() + + # Look for spans with tool count + found_tools_attrs = False + for span in spans: + attrs = dict(span.attributes) if span.attributes else {} + if "llm.tools_count" in attrs: + found_tools_attrs = True + assert attrs["llm.tools_count"] == 2 + + # Tool tracking may be optional, but if present should be correct + if found_tools_attrs: + assert True # Tool count was found and validated + + instrumentor.uninstrument() diff --git a/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_service_detection.py b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_service_detection.py new file mode 100644 index 000000000..d596d01b5 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_service_detection.py @@ -0,0 +1,317 @@ +""" +Test service type detection and provider identification across different implementations. +This ensures our base class instrumentation affects all inheriting classes. +""" + + +class TestServiceTypeDetection: + """Test detection of service types (LLM, TTS, STT) from base classes""" + + def test_detect_llm_service_base(self, mock_llm_service): + """Test detection of generic LLM service""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + service_type = detect_service_type(mock_llm_service) + + assert service_type == "llm" + + def test_detect_tts_service_base(self, mock_tts_service): + """Test detection of generic TTS service""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + service_type = detect_service_type(mock_tts_service) + + assert service_type == "tts" + + def test_detect_stt_service_base(self, mock_stt_service): + """Test detection of generic STT service""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + service_type = detect_service_type(mock_stt_service) + + assert service_type == "stt" + + def test_detect_openai_llm(self, mock_openai_llm): + """Test detection of OpenAI LLM service""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + service_type = detect_service_type(mock_openai_llm) + + assert service_type == "llm" + + def test_detect_anthropic_llm(self, mock_anthropic_llm): + """Test detection of Anthropic LLM service""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + service_type = detect_service_type(mock_anthropic_llm) + + assert service_type == "llm" + + def test_detect_elevenlabs_tts(self, mock_elevenlabs_tts): + """Test detection of ElevenLabs TTS service""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + service_type = detect_service_type(mock_elevenlabs_tts) + + assert service_type == "tts" + + def test_detect_deepgram_stt(self, mock_deepgram_stt): + """Test detection of Deepgram STT service""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + service_type = detect_service_type(mock_deepgram_stt) + + assert service_type == "stt" + + def test_detect_non_service_processor(self): + """Test that non-service processors return None""" + from pipecat.processors.frame_processor import FrameProcessor + + from openinference.instrumentation.pipecat._attributes import detect_service_type + + generic_processor = FrameProcessor() + service_type = detect_service_type(generic_processor) + + assert service_type == "unknown" + + +class TestProviderDetection: + """Test provider detection from service module paths""" + + def test_openai_provider_detection(self, mock_openai_llm): + """Test OpenAI provider detection from module path""" + from openinference.instrumentation.pipecat._attributes import ( + detect_provider_from_service, + ) + + provider = detect_provider_from_service(mock_openai_llm) + + assert provider == "openai" + + def test_anthropic_provider_detection(self, mock_anthropic_llm): + """Test Anthropic provider detection""" + from openinference.instrumentation.pipecat._attributes import ( + detect_provider_from_service, + ) + + provider = detect_provider_from_service(mock_anthropic_llm) + + assert provider == "anthropic" + + def test_elevenlabs_provider_detection(self, mock_elevenlabs_tts): + """Test ElevenLabs provider detection""" + from openinference.instrumentation.pipecat._attributes import ( + detect_provider_from_service, + ) + + provider = detect_provider_from_service(mock_elevenlabs_tts) + + assert provider == "elevenlabs" + + def test_deepgram_provider_detection(self, mock_deepgram_stt): + """Test Deepgram provider detection""" + from openinference.instrumentation.pipecat._attributes import ( + detect_provider_from_service, + ) + + provider = detect_provider_from_service(mock_deepgram_stt) + + assert provider == "deepgram" + + def test_unknown_provider_fallback(self, mock_llm_service): + """Test fallback for services without clear provider""" + from openinference.instrumentation.pipecat._attributes import ( + detect_provider_from_service, + ) + + provider = detect_provider_from_service(mock_llm_service) + + # Mock service has provider="mock" set explicitly + assert provider in ["mock", "unknown"] + + +class TestServiceMetadataExtraction: + """Test extraction of service metadata (model, voice, etc.)""" + + def test_extract_llm_model(self, mock_openai_llm): + """Test extraction of LLM model name""" + from openinference.instrumentation.pipecat._attributes import ( + extract_service_attributes, + ) + + metadata = extract_service_attributes(mock_openai_llm) + + # LLM services use GenAI semantic conventions + assert "gen_ai.request.model" in metadata + assert metadata["gen_ai.request.model"] == "gpt-4" + + def test_extract_tts_model_and_voice(self, mock_openai_tts): + """Test extraction of TTS model and voice""" + from openinference.instrumentation.pipecat._attributes import ( + extract_service_attributes, + ) + + metadata = extract_service_attributes(mock_openai_tts) + + assert "service.model" in metadata + assert metadata["service.model"] == "tts-1" + assert "audio.voice" in metadata + assert metadata["audio.voice"] == "alloy" + + def test_extract_stt_model(self, mock_openai_stt): + """Test extraction of STT model""" + from openinference.instrumentation.pipecat._attributes import ( + extract_service_attributes, + ) + + metadata = extract_service_attributes(mock_openai_stt) + + assert "service.model" in metadata + assert metadata["service.model"] == "whisper-1" + + def test_extract_elevenlabs_voice_id(self, mock_elevenlabs_tts): + """Test extraction of ElevenLabs voice_id""" + from openinference.instrumentation.pipecat._attributes import ( + extract_service_attributes, + ) + + metadata = extract_service_attributes(mock_elevenlabs_tts) + + assert "audio.voice_id" in metadata or "audio.voice" in metadata + + def test_extract_anthropic_model(self, mock_anthropic_llm): + """Test extraction of Anthropic model""" + from openinference.instrumentation.pipecat._attributes import ( + extract_service_attributes, + ) + + metadata = extract_service_attributes(mock_anthropic_llm) + + # LLM services use GenAI semantic conventions + assert "gen_ai.request.model" in metadata + assert "claude" in metadata["gen_ai.request.model"].lower() + + def test_extract_provider_from_metadata(self, mock_openai_llm): + """Test that provider is included in metadata""" + from openinference.instrumentation.pipecat._attributes import ( + extract_service_attributes, + ) + + metadata = extract_service_attributes(mock_openai_llm) + + # LLM services use GenAI semantic conventions + assert "gen_ai.system" in metadata + assert metadata["gen_ai.system"] == "openai" + + +class TestMultiProviderPipeline: + """Test service detection in pipelines with multiple providers""" + + def test_detect_all_services_in_mixed_pipeline(self, mixed_provider_pipeline): + """Test detection of all services in a pipeline with mixed providers""" + from openinference.instrumentation.pipecat._attributes import detect_service_type + + processors = mixed_provider_pipeline._processors + + service_types = [detect_service_type(p) for p in processors] + + # Should detect STT, LLM, TTS in order + assert "stt" in service_types + assert "llm" in service_types + assert "tts" in service_types + + def test_extract_providers_from_mixed_pipeline(self, mixed_provider_pipeline): + """Test provider extraction from mixed provider pipeline""" + from openinference.instrumentation.pipecat._attributes import ( + detect_provider_from_service, + ) + + processors = mixed_provider_pipeline._processors + + providers = [detect_provider_from_service(p) for p in processors] + + # Should have deepgram, anthropic, elevenlabs + assert "deepgram" in providers + assert "anthropic" in providers + assert "elevenlabs" in providers + + def test_extract_all_metadata_from_pipeline(self, mixed_provider_pipeline): + """Test metadata extraction from all services in pipeline""" + from openinference.instrumentation.pipecat._attributes import ( + detect_service_type, + extract_service_attributes, + ) + + processors = mixed_provider_pipeline._processors + + # Filter for only actual services (not generic processors) + service_processors = [p for p in processors if detect_service_type(p) != "unknown"] + metadata_list = [extract_service_attributes(p) for p in service_processors] + + # Each service should have provider information (via gen_ai.system or llm.provider) + for metadata in metadata_list: + # Check for provider in either gen_ai.system or llm.provider + has_provider = "gen_ai.system" in metadata or "llm.provider" in metadata + assert has_provider, f"No provider found in metadata: {metadata.keys()}" + # At least one should have a model (gen_ai.request.model or service.model) + if "gen_ai.request.model" in metadata: + assert isinstance(metadata["gen_ai.request.model"], str) + elif "service.model" in metadata: + assert isinstance(metadata["service.model"], str) + + +class TestServiceInheritanceDetection: + """Test that service detection works correctly with inheritance hierarchies""" + + def test_custom_llm_service_detected(self): + """Test that custom LLM service inheriting from base is detected""" + from pipecat.services.llm_service import LLMService + + from openinference.instrumentation.pipecat._attributes import detect_service_type + + class CustomLLMService(LLMService): + def __init__(self): + super().__init__() + self._model = "custom-model" + + custom_service = CustomLLMService() + service_type = detect_service_type(custom_service) + + assert service_type == "llm" + + def test_deeply_nested_service_detected(self): + """Test that services with deep inheritance are detected""" + from pipecat.services.tts_service import TTSService + + from openinference.instrumentation.pipecat._attributes import detect_service_type + + class BaseTTSWrapper(TTSService): + async def run_tts(self, text: str): + yield + + class SpecificTTSService(BaseTTSWrapper): + pass + + nested_service = SpecificTTSService() + service_type = detect_service_type(nested_service) + + assert service_type == "tts" + + def test_multiple_inheritance_service(self): + """Test service detection with multiple inheritance (edge case)""" + from pipecat.services.stt_service import STTService + + from openinference.instrumentation.pipecat._attributes import detect_service_type + + class MixinClass: + pass + + class MultiInheritSTT(MixinClass, STTService): + async def run_stt(self, audio: bytes): + yield + + multi_service = MultiInheritSTT() + service_type = detect_service_type(multi_service) + + # Should still detect as STT since it inherits from STTService + assert service_type == "stt" diff --git a/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_simple_check.py b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_simple_check.py new file mode 100644 index 000000000..b2e9a3a11 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_simple_check.py @@ -0,0 +1,17 @@ +"""Simple test to verify basic functionality""" + +import pytest + + +def test_basic(): + """Just check that tests run""" + assert True + + +@pytest.mark.asyncio +async def test_async_basic(): + """Check async tests work""" + import asyncio + + await asyncio.sleep(0.001) + assert True diff --git a/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_turn_tracking.py b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_turn_tracking.py new file mode 100644 index 000000000..779d58600 --- /dev/null +++ b/python/instrumentation/openinference-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_turn_tracking.py @@ -0,0 +1,418 @@ +""" +Test turn-based tracing functionality. +Ensures proper conversation turn detection and span creation. +""" + +import asyncio + +import pytest +from conftest import ( + assert_span_has_attributes, + get_spans_by_name, + run_pipeline_task, +) +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + TextFrame, + TranscriptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.pipeline.task import PipelineTask + +from openinference.instrumentation.pipecat import PipecatInstrumentor +from openinference.semconv.trace import OpenInferenceSpanKindValues, SpanAttributes + + +class TestTurnDetection: + """Test basic turn detection and span creation""" + + @pytest.mark.asyncio + async def test_user_turn_creates_span( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that user starting to speak creates a turn span""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + # Simulate user starting to speak + await task.queue_frame(UserStartedSpeakingFrame()) + await asyncio.sleep(0.1) # Let async processing happen + + # Should have a turn span (may not be finished yet) + # This tests that turn tracking is working + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_complete_turn_cycle( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test complete turn cycle: user speaks -> bot responds""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + # Turn 1: User speaks and bot responds + await run_pipeline_task( + task, + UserStartedSpeakingFrame(), + TranscriptionFrame(text="Hello", user_id="test", timestamp=0), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + TextFrame(text="Hi there!"), + BotStoppedSpeakingFrame(), + # Start Turn 2 to end Turn 1 (cancels timeout timer) + UserStartedSpeakingFrame(), + ) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + # Should have at least one complete turn + assert len(turn_spans) >= 1 + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_turn_span_attributes( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that turn spans have correct attributes""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + # Complete turn + await task.queue_frame(UserStartedSpeakingFrame()) + await task.queue_frame(TranscriptionFrame(text="Test input", user_id="user1", timestamp=0)) + await task.queue_frame(UserStoppedSpeakingFrame()) + await task.queue_frame(BotStartedSpeakingFrame()) + await task.queue_frame(TextFrame(text="Test output")) + await task.queue_frame(BotStoppedSpeakingFrame()) + + await asyncio.sleep(0.1) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + if turn_spans: + turn_span = turn_spans[0] + expected_attributes = { + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, + } + assert_span_has_attributes(turn_span, expected_attributes) + + # Should have input and output + attrs = dict(turn_span.attributes) + assert SpanAttributes.INPUT_VALUE in attrs or "conversation.input" in attrs + assert SpanAttributes.OUTPUT_VALUE in attrs or "conversation.output" in attrs + + instrumentor.uninstrument() + + +class TestMultipleTurns: + """Test handling of multiple conversation turns""" + + @pytest.mark.asyncio + async def test_multiple_sequential_turns( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that multiple turns create separate spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + # Three complete turns + await run_pipeline_task( + task, + # Turn 1 + UserStartedSpeakingFrame(), + TranscriptionFrame(text="First", user_id="user1", timestamp=0), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + # Turn 2 + UserStartedSpeakingFrame(), + TranscriptionFrame(text="Second", user_id="user1", timestamp=1), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + # Turn 3 + UserStartedSpeakingFrame(), + TranscriptionFrame(text="Third", user_id="user1", timestamp=2), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + BotStoppedSpeakingFrame(), + ) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + # Should have 3 separate turn spans + assert len(turn_spans) >= 3 + + # Each turn should have a turn number + turn_numbers = [] + for span in turn_spans: + attrs = dict(span.attributes) + if "conversation.turn_number" in attrs: + turn_numbers.append(attrs["conversation.turn_number"]) + + assert len(set(turn_numbers)) >= 3 # At least 3 unique turn numbers + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_turn_interruption( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test handling of turn interruption (user interrupts bot)""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + # Turn with interruption + await run_pipeline_task( + task, + UserStartedSpeakingFrame(), + TranscriptionFrame(text="Hello", user_id="user1", timestamp=0), + UserStoppedSpeakingFrame(), + BotStartedSpeakingFrame(), + # User interrupts before bot finishes + UserStartedSpeakingFrame(), + TranscriptionFrame(text="Wait, stop!", user_id="user1", timestamp=1), + UserStoppedSpeakingFrame(), + ) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + # Should handle interruption gracefully - first turn ends, second begins + assert len(turn_spans) >= 1 + + # Check for interruption event or attribute + for span in turn_spans: + attrs = dict(span.attributes) + # May have an end_reason attribute indicating interruption + if "conversation.end_reason" in attrs: + # Just verify the attribute exists + assert isinstance(attrs["conversation.end_reason"], str) + + instrumentor.uninstrument() + + +class TestTurnHierarchy: + """Test that turn spans properly parent phase spans (STT -> LLM -> TTS)""" + + @pytest.mark.asyncio + async def test_turn_parents_phase_spans( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that STT, LLM, TTS spans are children of turn span""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + # Complete turn with all phases + await task.queue_frame(UserStartedSpeakingFrame()) + await task.queue_frame(TranscriptionFrame(text="Hello", user_id="user1", timestamp=0)) + await task.queue_frame(UserStoppedSpeakingFrame()) + # LLM processing happens here + await task.queue_frame(BotStartedSpeakingFrame()) + await task.queue_frame(TextFrame(text="Response")) + await task.queue_frame(BotStoppedSpeakingFrame()) + + await asyncio.sleep(0.1) + + # Verify hierarchy: Turn -> STT/LLM/TTS + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + stt_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.stt") + llm_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.llm") + tts_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.tts") + + if turn_spans and (stt_spans or llm_spans or tts_spans): + turn_span = turn_spans[0] + + # Check that phase spans are children of turn span + for phase_span in stt_spans + llm_spans + tts_spans: + if phase_span.parent: + # Parent context should link to turn span + assert phase_span.parent.span_id == turn_span.context.span_id + + instrumentor.uninstrument() + + +class TestTurnConfiguration: + """Test turn tracking configuration options""" + + @pytest.mark.asyncio + async def test_turn_tracking_disabled( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that turn tracking can be disabled""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=False) + + # Send frames that would normally trigger turn tracking + await task.queue_frame(UserStartedSpeakingFrame()) + await task.queue_frame(TranscriptionFrame(text="Hello", user_id="user1", timestamp=0)) + await task.queue_frame(UserStoppedSpeakingFrame()) + + await asyncio.sleep(0.1) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + # Should not create turn spans when disabled + assert len(turn_spans) == 0 + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_session_id_in_turn_spans( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that session ID is included in turn spans""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True, conversation_id="test-123") + + await task.queue_frame(UserStartedSpeakingFrame()) + await task.queue_frame(TranscriptionFrame(text="Hello", user_id="user1", timestamp=0)) + await task.queue_frame(UserStoppedSpeakingFrame()) + await task.queue_frame(BotStartedSpeakingFrame()) + await task.queue_frame(BotStoppedSpeakingFrame()) + + await asyncio.sleep(0.1) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + if turn_spans: + turn_span = turn_spans[0] + attrs = dict(turn_span.attributes) + + # Should have session/conversation ID + assert "session.id" in attrs or "conversation.id" in attrs + + instrumentor.uninstrument() + + +class TestTurnInputOutput: + """Test capture of turn-level input and output""" + + @pytest.mark.asyncio + async def test_turn_captures_user_input( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that turn span captures complete user input""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + user_message = "This is the user's complete message" + + await task.queue_frame(UserStartedSpeakingFrame()) + await task.queue_frame(TranscriptionFrame(text=user_message, user_id="user1", timestamp=0)) + await task.queue_frame(UserStoppedSpeakingFrame()) + await task.queue_frame(BotStartedSpeakingFrame()) + await task.queue_frame(BotStoppedSpeakingFrame()) + + await asyncio.sleep(0.1) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + if turn_spans: + turn_span = turn_spans[0] + attrs = dict(turn_span.attributes) + + input_value = attrs.get(SpanAttributes.INPUT_VALUE) or attrs.get("conversation.input") + assert input_value is not None + assert user_message in str(input_value) + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_turn_captures_bot_output( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that turn span captures complete bot output""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + bot_response = "This is the bot's complete response" + + await task.queue_frame(UserStartedSpeakingFrame()) + await task.queue_frame(TranscriptionFrame(text="Hello", user_id="user1", timestamp=0)) + await task.queue_frame(UserStoppedSpeakingFrame()) + await task.queue_frame(BotStartedSpeakingFrame()) + await task.queue_frame(TextFrame(text=bot_response)) + await task.queue_frame(BotStoppedSpeakingFrame()) + + await asyncio.sleep(0.1) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + if turn_spans: + turn_span = turn_spans[0] + attrs = dict(turn_span.attributes) + + output_value = attrs.get(SpanAttributes.OUTPUT_VALUE) or attrs.get( + "conversation.output" + ) + assert output_value is not None + assert bot_response in str(output_value) + + instrumentor.uninstrument() + + @pytest.mark.asyncio + async def test_turn_handles_multiple_text_chunks( + self, tracer_provider, in_memory_span_exporter, simple_pipeline + ): + """Test that turn span aggregates multiple text chunks""" + instrumentor = PipecatInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + task = PipelineTask(simple_pipeline, enable_turn_tracking=True) + + await task.queue_frame(UserStartedSpeakingFrame()) + await task.queue_frame(TranscriptionFrame(text="Part one", user_id="user1", timestamp=0)) + await task.queue_frame(TranscriptionFrame(text="Part two", user_id="user1", timestamp=1)) + await task.queue_frame(UserStoppedSpeakingFrame()) + await task.queue_frame(BotStartedSpeakingFrame()) + await task.queue_frame(TextFrame(text="Response part A")) + await task.queue_frame(TextFrame(text="Response part B")) + await task.queue_frame(BotStoppedSpeakingFrame()) + + await asyncio.sleep(0.1) + + turn_spans = get_spans_by_name(in_memory_span_exporter, "pipecat.conversation.turn") + + if turn_spans: + turn_span = turn_spans[0] + attrs = dict(turn_span.attributes) + + # Should capture aggregated input/output + input_value = attrs.get(SpanAttributes.INPUT_VALUE) or attrs.get("conversation.input") + output_value = attrs.get(SpanAttributes.OUTPUT_VALUE) or attrs.get( + "conversation.output" + ) + + # Both parts should be present (concatenated or in list) + if input_value: + assert "Part one" in str(input_value) or "Part two" in str(input_value) + + if output_value: + assert "Response part A" in str(output_value) or "Response part B" in str( + output_value + ) + + instrumentor.uninstrument() diff --git a/python/tox.ini b/python/tox.ini index a0980b900..5e60f7ea1 100644 --- a/python/tox.ini +++ b/python/tox.ini @@ -32,6 +32,7 @@ envlist = py3{9,14}-ci-{pydantic_ai,pydantic_ai-latest} py3{9,14}-ci-{openllmetry,openllmetry-latest} py3{10,13}-ci-{openlit,openlit-latest} + py3{10,13}-ci-{pipecat,pipecat-latest} [testenv] @@ -72,7 +73,7 @@ changedir = pydantic_ai: instrumentation/openinference-instrumentation-pydantic-ai/ openllmetry: instrumentation/openinference-instrumentation-openllmetry/ openlit: instrumentation/openinference-instrumentation-openlit/ - + pipecat: instrumentation/openinference-instrumentation-pipecat/ commands_pre = agno: uv pip install --reinstall {toxinidir}/instrumentation/openinference-instrumentation-agno[test] agno-latest: uv pip install -U agno @@ -162,6 +163,8 @@ commands_pre = openllmetry-latest: uv pip install -U opentelemetry-instrumentation-openai openlit: uv pip install --reinstall {toxinidir}/instrumentation/openinference-instrumentation-openlit[test] openlit-latest: uv pip install -U openlit + pipecat: uv pip install --reinstall {toxinidir}/instrumentation/openinference-instrumentation-pipecat[test] + pipecat-latest: uv pip install -U pipecat-ai uv pip list -v commands =