Skip to content

feat(llmobs): allow span processor to return None to omit spans #13739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from dataclasses import field
import copy
import json
import os
import time
Expand Down Expand Up @@ -127,7 +128,20 @@ class LLMObsSpan:
Passed to the `span_processor` function in the `enable` or `register_processor` methods.

Example::
def span_processor(span: LLMObsSpan) -> LLMObsSpan:
def span_processor(span: LLMObsSpan) -> Optional[LLMObsSpan]:
# Access full span context for decision making
ctx = span.get_span_context()
if ctx:
model_name = ctx._get_ctx_item("_ml_obs.meta.model_name")
metadata = ctx._get_ctx_item("_ml_obs.meta.metadata") or {}

# Omit spans based on full context
if model_name == "sensitive-model" and metadata.get("contains_pii"):
return None # This span will be omitted

# Modify input/output
if span.get_tag("omit_span") == "1":
return None
if span.get_tag("no_input") == "1":
span.input = []
return span
Expand All @@ -140,6 +154,7 @@ class Message(TypedDict):
input: List[Message] = field(default_factory=list)
output: List[Message] = field(default_factory=list)
_tags: Dict[str, str] = field(default_factory=dict)
_span_context: Optional["Span"] = field(default=None, init=False)

def get_tag(self, key: str) -> Optional[str]:
"""Get a tag from the span.
Expand All @@ -149,6 +164,17 @@ def get_tag(self, key: str) -> Optional[str]:
:rtype: Optional[str]
"""
return self._tags.get(key)

def get_span_context(self) -> Optional["Span"]:
"""Get read-only access to the full span for context.

This provides access to all span data including metadata, metrics,
model information, and any other span fields for decision making.

:return: The full span object for reading context, or None if not available.
:rtype: Optional[Span]
"""
return self._span_context


class LLMObs(Service):
Expand All @@ -158,7 +184,7 @@ class LLMObs(Service):
def __init__(
self,
tracer: Optional[Tracer] = None,
span_processor: Optional[Callable[[LLMObsSpan], LLMObsSpan]] = None,
span_processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None,
) -> None:
super(LLMObs, self).__init__()
self.tracer = tracer or ddtrace.tracer
Expand Down Expand Up @@ -204,6 +230,8 @@ def _submit_llmobs_span(self, span: Span) -> None:
span_event = None
try:
span_event = self._llmobs_span_event(span)
if span_event is None:
return
self._llmobs_span_writer.enqueue(span_event)
except (KeyError, TypeError, ValueError):
log.error(
Expand All @@ -215,7 +243,7 @@ def _submit_llmobs_span(self, span: Span) -> None:
if self._evaluator_runner:
self._evaluator_runner.enqueue(span_event, span)

def _llmobs_span_event(self, span: Span) -> LLMObsSpanEvent:
def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]:
"""Span event object structure."""
span_kind = span._get_ctx_item(SPAN_KIND)
if not span_kind:
Expand Down Expand Up @@ -277,10 +305,15 @@ def _llmobs_span_event(self, span: Span) -> LLMObsSpanEvent:
if self._user_span_processor:
error = False
try:
llmobs_span._tags = cast(Dict[str, str], span._get_ctx_item(TAGS))
span_clone = copy.copy(span)
llmobs_span._span_context = span_clone
llmobs_span._tags = cast(Dict[str, str], span._get_ctx_item(TAGS) or {})

user_llmobs_span = self._user_span_processor(llmobs_span)
if user_llmobs_span is None:
return None
if not isinstance(user_llmobs_span, LLMObsSpan):
raise TypeError("User span processor must return an LLMObsSpan, got %r" % type(user_llmobs_span))
raise TypeError("User span processor must return an LLMObsSpan or None, got %r" % type(user_llmobs_span))
llmobs_span = user_llmobs_span
except Exception as e:
log.error("Error in LLMObs span processor (%r): %r", self._user_span_processor, e)
Expand Down Expand Up @@ -448,7 +481,7 @@ def enable(
api_key: Optional[str] = None,
env: Optional[str] = None,
service: Optional[str] = None,
span_processor: Optional[Callable[[LLMObsSpan], LLMObsSpan]] = None,
span_processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None,
_tracer: Optional[Tracer] = None,
_auto: bool = False,
) -> None:
Expand All @@ -463,8 +496,8 @@ def enable(
:param str api_key: Your datadog api key.
:param str env: Your environment name.
:param str service: Your service name.
:param Callable[[LLMObsSpan], LLMObsSpan] span_processor: A function that takes an LLMObsSpan and returns an
LLMObsSpan.
:param Callable[[LLMObsSpan], Optional[LLMObsSpan]] span_processor: A function that takes an LLMObsSpan and returns an
LLMObsSpan or None. If None is returned, the span will be omitted and not sent to LLMObs.
"""
if cls.enabled:
log.debug("%s already enabled", cls.__name__)
Expand Down Expand Up @@ -548,14 +581,16 @@ def enable(
)

@classmethod
def register_processor(cls, processor: Optional[Callable[[LLMObsSpan], LLMObsSpan]] = None) -> None:
def register_processor(cls, processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None) -> None:
"""Register a processor to be called on each LLMObs span.

This can be used to modify the span before it is sent to LLMObs. For example, you can modify the input/output.
You can also return None to omit the span entirely from being sent to LLMObs.

To deregister the processor, call `register_processor(None)`.

:param processor: A function that takes an LLMObsSpan and returns an LLMObsSpan.
:param processor: A function that takes an LLMObsSpan and returns an LLMObsSpan or None.
If None is returned, the span will be omitted and not sent to LLMObs.
"""
cls._instance._user_span_processor = processor

Expand Down
41 changes: 41 additions & 0 deletions tests/llmobs/test_llmobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import os
from textwrap import dedent
from typing import Optional

import pytest

Expand Down Expand Up @@ -167,6 +168,46 @@ def test_processor_bad_return_type(self, llmobs, llmobs_enable_opts, llmobs_even
assert llmobs_events[0]["meta"]["input"] == {"messages": [{"content": "value", "role": ""}]}
assert llmobs_events[0]["meta"]["output"] == {"messages": [{"content": "value", "role": ""}]}

def _omit_span_processor(span: LLMObsSpan) -> Optional[LLMObsSpan]:
if span.get_tag("omit_span") == "true":
return None
return span

@pytest.mark.parametrize("llmobs_enable_opts", [dict(span_processor=_omit_span_processor)])
def test_processor_omit_span(self, llmobs, llmobs_enable_opts, llmobs_events):
"""Test that a processor that returns None omits the span from being sent."""
# Create a span that should be omitted
with llmobs.llm() as llm_span:
llmobs.annotate(llm_span, input_data="omit me", output_data="response", tags={"omit_span": "true"})

# Create a span that should be kept
with llmobs.llm() as llm_span:
llmobs.annotate(llm_span, input_data="keep me", output_data="response", tags={"omit_span": "false"})

# Only the second span should be in the events
assert len(llmobs_events) == 1
assert llmobs_events[0]["meta"]["input"]["messages"][0]["content"] == "keep me"

def _context_aware_processor(span: LLMObsSpan) -> Optional[LLMObsSpan]:
"""Test processor that uses span context for decision making."""
ctx = span.get_span_context()
if ctx is not None:
# Access full span context
span_kind = ctx._get_ctx_item("_ml_obs.span_kind")
if span_kind == "llm" and span.get_tag("redact_input") == "true":
span.input = [{"content": "[REDACTED]", "role": "user"}]
return span

@pytest.mark.parametrize("llmobs_enable_opts", [dict(span_processor=_context_aware_processor)])
def test_processor_span_context_access(self, llmobs, llmobs_enable_opts, llmobs_events):
"""Test that processor can access full span context for decision making."""
with llmobs.llm() as llm_span:
llmobs.annotate(llm_span, input_data="sensitive data", output_data="response", tags={"redact_input": "true"})

# Input should be redacted based on context-aware processing
assert len(llmobs_events) == 1
assert llmobs_events[0]["meta"]["input"]["messages"][0]["content"] == "[REDACTED]"

def test_ddtrace_run_register_processor(self, ddtrace_run_python_code_in_subprocess, llmobs_backend):
"""Users using ddtrace-run can register a processor to be called on each LLMObs span."""
env = os.environ.copy()
Expand Down