Skip to content

Conversation

@duncankmckinnon
Copy link

@duncankmckinnon duncankmckinnon commented Nov 13, 2025

Add OpenInference Instrumentation for Pipecat

This PR implements comprehensive OpenTelemetry tracing for Pipecat voice agents using OpenInference semantic conventions, enabling production-ready observability for voice AI applications.

Overview

Adds automatic instrumentation for Pipecat pipelines that captures:

  • Turn-level spans: Complete conversation exchanges with user input/output
  • Service-level spans: Individual LLM, TTS, and STT operations with proper directionality
  • Flat span hierarchy: All service spans as siblings under turn spans for clear visualization
  • Rich attributes: Model names, providers, token counts, latency metrics, and full conversation history in Arize-compatible format

Key Features

1. Automatic Instrumentation via Observer Pattern

The instrumentor wraps PipelineTask.__init__ to automatically inject an observer into every task:

from openinference.instrumentation.pipecat import PipecatInstrumentor
from arize.otel import register

tracer_provider = register(
    space_id=os.getenv("ARIZE_SPACE_ID"),
    api_key=os.getenv("ARIZE_API_KEY"),
    project_name=os.getenv("ARIZE_PROJECT_NAME"),
)

PipecatInstrumentor().instrument(
    tracer_provider=tracer_provider,
    debug_log_filename="debug.log"  # Optional
)

No code changes needed in your pipeline - just instrument once and all PipelineTask instances get automatic tracing.

2. Intelligent Turn Tracking

Implements conversation turn tracking using speaking events to define natural conversation boundaries:

  • Start turn: UserStartedSpeakingFrame or StartFrame (first pipeline frame)
  • End turn: Timeout after BotStoppedSpeakingFrame (configurable, default 2.5s)
  • Interruption handling: New turn starts immediately when user interrupts bot
  • Auto-start: First service activity auto-starts a turn if none active

This approach ensures one turn span per actual conversation exchange with proper handling of multi-part bot responses (e.g., function calls causing multiple TTS segments).

3. Bidirectional Frame Processing with Deduplication

Captures frames both entering (input) and leaving (output) services with intelligent deduplication:

  • Directional filtering: INPUT_VALUE only captured when is_input=True, OUTPUT_VALUE only when is_input=False
  • Streaming accumulation: LLM and TTS streaming chunks accumulated with smart deduplication
  • Special handling for STT: TranscriptionFrame is OUTPUT from STT but recorded as INPUT for observability
  • TTS filtering: Only captures TTS text when going to BaseOutputTransport (final output)

Deduplication handles cumulative chunks (e.g., "Hello" → "Hello world" → "Hello world!") by detecting overlaps and extracting only new content.

4. Multiple LLM Invocations Per Turn

Properly handles multiple LLM calls within a single turn (e.g., function calling flows):

  • Detects new invocations via LLMContextFrame
  • Finishes previous LLM span before starting new one
  • Each LLM call gets its own span with full message context
  • Prevents output accumulation across different invocations

5. Flattened Message Format for Arize

Message history is exported in the flattened format that Arize expects:

# Instead of a single JSON string:
"llm.input_messages": "[{role: 'user', content: '...'}]"

# We set individual attributes:
"llm.input_messages.0.message.role": "system"
"llm.input_messages.0.message.content": "You are a helpful assistant"
"llm.input_messages.1.message.role": "user"
"llm.input_messages.1.message.content": "What is quantum computing?"
# ... and so on

This enables proper display in Arize's UI with message-level filtering and analysis.

6. Consistent Time Handling

Uses time.time_ns() (Unix epoch) consistently for all span timestamps:

  • Span start time: Recorded at service span creation
  • Span end time: Calculated from start_time_ns + processing_time_seconds when metrics available
  • Avoids mixing clocks: Does not use Pipecat's monotonic_ns() timestamps which are relative to pipeline start

Ensures end_time >= start_time invariant required by OpenTelemetry.

7. Multi-Provider Service Detection

Automatically detects and attributes service types and providers:

  • LLM Services: OpenAI, Anthropic (sets llm.provider, llm.model_name)
  • TTS Services: OpenAI, ElevenLabs, Cartesia (sets audio.voice, audio.voice_id)
  • STT Services: OpenAI, Deepgram, Cartesia
  • Generic detection: Works with any service inheriting from Pipecat base classes

Sets service.name to the actual service class name for unique identification.

8. Session Tracking

Automatically extracts conversation_id from PipelineTask and sets as session.id attribute on all spans, enabling conversation-level filtering in observability platforms.

Implementation Details

Core Components

PipecatInstrumentor (init.py)

  • Wraps PipelineTask.__init__ using wrapt
  • Injects OpenInferenceObserver into each task
  • Supports optional debug_log_filename parameter for detailed frame logging
  • Thread-safe: creates separate observer instance per task

OpenInferenceObserver (_observer.py)

  • Implements Pipecat's BaseObserver interface
  • Listens to on_push_frame events with bidirectional processing (input/output)
  • Creates turn spans and service spans with proper OpenTelemetry context propagation
  • Tracks turn state: active turn, user text, bot text, speaking status
  • Handles frame deduplication to avoid processing propagated frames twice
  • Auto-starts turns when first service activity detected
  • Finishes service spans before turn spans to maintain proper hierarchy

Frame Attribute Extractors (_attributes.py)

  • Extracts OpenInference-compliant attributes from Pipecat frames
  • Handles multiple frame types: TranscriptionFrame, LLMContextFrame, LLMTextFrame, TTSTextFrame, MetricsFrame, etc.
  • Captures: LLM messages (flattened format), audio metadata, token counts, processing times, errors
  • Service attribute extraction for span creation with provider-specific details

Span Hierarchy

pipecat.conversation.turn (trace_id: abc123)
├── pipecat.stt (parent_id: turn_span_id, trace_id: abc123)
├── pipecat.llm (parent_id: turn_span_id, trace_id: abc123)
├── pipecat.llm (parent_id: turn_span_id, trace_id: abc123)  # Second invocation
└── pipecat.tts (parent_id: turn_span_id, trace_id: abc123)

Flat hierarchy: All service spans are siblings under the turn span (no nesting) for clearer visualization in tracing UIs. All spans within a turn share the same trace_id and have session.id attribute set.

Context Propagation

Service spans are created with the turn span's context:

turn_context = trace_api.set_span_in_context(self._turn_span)
span = self._tracer.start_span(
    name=f"pipecat.{service_type}",
    context=turn_context,  # Links to turn span
)

This ensures proper parent-child relationships and enables distributed tracing.

Testing

Test Coverage

69 tests covering:

  1. Instrumentor Basics (test_instrumentor.py):

    • Initialization, instrumentation, uninstrumentation
    • Observer injection into tasks
    • Singleton behavior
    • Configuration handling
  2. Turn Tracking (test_turn_tracking.py):

    • Turn creation on user/bot speech
    • Multiple sequential turns
    • Turn interruption handling
    • Input/output text capture
    • Session ID attribution
    • Turn span hierarchy
  3. Service Detection (test_service_detection.py):

    • LLM/TTS/STT service type detection
    • Multi-provider detection (OpenAI, Anthropic, ElevenLabs, Deepgram)
    • Metadata extraction (models, voices, providers)
    • Custom service inheritance
  4. Provider Spans (test_provider_spans.py):

    • Span creation for different providers
    • Correct span attributes per service type
    • Input/output capture for each service
    • Mixed provider pipelines
    • Provider-specific attributes (model names, voice IDs)

Mock Infrastructure

Comprehensive mocks in conftest.py:

  • Mock LLM/TTS/STT services with configurable metadata
  • Helper functions for running pipeline tasks
  • Span extraction and assertion utilities
  • Support for multiple provider combinations

All tests use in-memory span exporters for fast, isolated testing.

Example Usage

Complete Tracing Example

See examples/trace/001-trace.py for a full working example:

from openinference.instrumentation.pipecat import PipecatInstrumentor
from arize.otel import register

# Generate unique conversation ID
conversation_id = f"conversation-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
debug_log_filename = f"pipecat_frames_{conversation_id}.log"

# Set up tracing
tracer_provider = register(
    space_id=os.getenv("ARIZE_SPACE_ID"),
    api_key=os.getenv("ARIZE_API_KEY"),
    project_name=os.getenv("ARIZE_PROJECT_NAME"),
)

PipecatInstrumentor().instrument(
    tracer_provider=tracer_provider,
    debug_log_filename=debug_log_filename,
)

# Create your pipeline (STT -> LLM -> TTS)
pipeline = Pipeline([stt, llm, tts, transport.output()])

# Create task with conversation ID
task = PipelineTask(
    pipeline,
    conversation_id=conversation_id,
    params=PipelineParams(enable_metrics=True)
)

# Run - tracing happens automatically!
await runner.run(task)

What Gets Traced

For a single user query → bot response with a follow-up question:

Turn Span (pipecat.conversation.turn):

  • session.id: "conversation-20251113_152502"
  • input.value: "What is quantum computing?"
  • output.value: "Quantum computing is a type of computing that uses quantum mechanics..."
  • conversation.turn_number: 1
  • conversation.turn_duration_seconds: 3.5
  • conversation.end_reason: "completed"

STT Span (pipecat.stt):

  • service.name: "OpenAISTTService"
  • service.type: "stt"
  • llm.provider: "openai"
  • llm.model_name: "gpt-4o-transcribe"
  • input.value: "What is quantum computing?" (transcribed text)
  • audio.transcript: "What is quantum computing?"
  • Duration: 0.78 seconds

LLM Span (pipecat.llm):

  • service.name: "OpenAILLMService"
  • service.type: "llm"
  • llm.provider: "openai"
  • llm.model_name: "gpt-4"
  • input.value: "What is quantum computing?" (last user message)
  • output.value: "Quantum computing is..." (accumulated streaming response)
  • llm.input_messages.0.message.role: "system"
  • llm.input_messages.0.message.content: "You are a helpful assistant"
  • llm.input_messages.1.message.role: "user"
  • llm.input_messages.1.message.content: "What is quantum computing?"
  • llm.output_messages.0.message.role: "assistant"
  • llm.output_messages.0.message.content: "Quantum computing is..."
  • llm.token_count.total: 520
  • llm.token_count.prompt: 380
  • llm.token_count.completion: 140
  • Duration: 2.77 seconds

TTS Span (pipecat.tts):

  • service.name: "OpenAITTSService"
  • service.type: "tts"
  • llm.provider: "openai"
  • llm.model_name: "gpt-4o-mini-tts"
  • audio.voice: "ballad"
  • audio.voice_id: "ballad"
  • output.value: "Quantum computing is..." (synthesized text)
  • service.processing_time_seconds: 1.57
  • Duration: 1.57 seconds

Key Improvements in This PR

  1. Fixed STT input.value capture: Special case handling for TranscriptionFrame as OUTPUT from STT but recorded as INPUT for observability
  2. Fixed span timing: Consistent use of Unix epoch nanoseconds (time.time_ns()) instead of mixing with Pipecat's monotonic clock
  3. Fixed processing_time_seconds: Proper storage and use for calculating span end time
  4. Fixed LLM message format: Flattened attribute format (llm.input_messages.{index}.message.{field}) instead of single JSON string
  5. Added LLM output messages: Flattened format for output messages matching input format
  6. Improved deduplication: Smart handling of cumulative streaming chunks with overlap detection
  7. Multiple LLM invocations: Separate spans for each LLM call within a turn
  8. Bidirectional processing: Proper handling of frames as both inputs and outputs with directional filtering

Note

Adds a Pipecat auto-instrumentation package that converts Pipecat frames into OpenInference/OTel spans with turn tracking, service spans (LLM/TTS/STT), rich attributes, examples, tests, and CI integration.

  • New package python/instrumentation/openinference-instrumentation-pipecat:
    • Instrumentor: PipecatInstrumentor auto-injects an observer by wrapping PipelineTask.__init__.
    • Observer: OpenInferenceObserver creates pipecat.conversation.turn spans and sibling service spans (pipecat.llm, pipecat.tts, pipecat.stt), with session IDs, streaming chunk deduplication, timing, and interruption handling.
    • Attribute extraction: _attributes.py extracts frame/service attributes (flattened LLM messages, tool calls, token/latency metrics, provider/model detection) and sets OpenInference/GenAI semconv fields.
    • Packaging: pyproject.toml, entry points for OTel/OpenInference, version 0.1.0, LICENSE, CHANGELOG, README.
    • Examples: examples/trace/001-trace.py and example.env for Phoenix/OTLP setup.
    • Tests: Comprehensive suites for instrumentor behavior, provider/service detection, span attributes, and turn tracking with mocks.
  • CI/Tooling:
    • python/tox.ini: add pipecat and pipecat-latest envs and install steps.
  • Repo housekeeping:
    • .gitignore: ignore *.code-workspace.

Written by Cursor Bugbot for commit 9cb7459. This will update automatically on new commits. Configure here.

@duncankmckinnon duncankmckinnon requested a review from a team as a code owner November 13, 2025 18:26
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Nov 13, 2025
@duncankmckinnon duncankmckinnon changed the title Incorporate pipecat (feat): add auto instrumentor for pipecat Nov 13, 2025
@duncankmckinnon duncankmckinnon changed the title (feat): add auto instrumentor for pipecat feat(pipecat): add auto instrumentor for pipecat Nov 13, 2025
"openinference-semantic-conventions>=0.1.21",
"websockets>=13.1,<16.0",
"mypy>=1.18.2",
]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing wrapt dependency in package dependencies

The code imports wrapt in __init__.py with from wrapt import wrap_function_wrapper, but wrapt is not listed in the dependencies section of pyproject.toml. While it's listed in the mypy overrides to ignore missing imports, it's not actually installed as a dependency. This will cause import errors when the package is installed and used.

Fix in Cursor Fix in Web

elif isinstance(frame, TextFrame):
# Generic text frame (output)
results[SpanAttributes.OUTPUT_VALUE] = text
results["llm.output_messages.0.message.role"] = "user"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect message role for generic text output

Generic TextFrame output is assigned message role "user" but should be "assistant" since it represents output from the model. This violates OpenInference semantic conventions where output messages should have role "assistant". The code already correctly sets this as OUTPUT_VALUE on line 205, confirming this is output data that should be attributed to the assistant.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

2 participants