Skip to content
Merged
40 changes: 40 additions & 0 deletions posthog/ai/anthropic/anthropic_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,32 @@ def format_anthropic_streaming_content(
return formatted


def extract_anthropic_web_search_count(response: Any) -> int:
"""
Extract web search count from Anthropic response.

Anthropic provides exact web search counts via usage.server_tool_use.web_search_requests.

Args:
response: The response from Anthropic API

Returns:
Number of web search requests (0 if none)
"""
if not hasattr(response, "usage"):
return 0

if not hasattr(response.usage, "server_tool_use"):
return 0

server_tool_use = response.usage.server_tool_use

if hasattr(server_tool_use, "web_search_requests"):
return max(0, int(getattr(server_tool_use, "web_search_requests", 0)))

return 0


def extract_anthropic_usage_from_response(response: Any) -> TokenUsage:
"""
Extract usage from a full Anthropic response (non-streaming).
Expand Down Expand Up @@ -191,6 +217,10 @@ def extract_anthropic_usage_from_response(response: Any) -> TokenUsage:
if cache_creation and cache_creation > 0:
result["cache_creation_input_tokens"] = cache_creation

web_search_count = extract_anthropic_web_search_count(response)
if web_search_count > 0:
result["web_search_count"] = web_search_count

return result


Expand Down Expand Up @@ -222,6 +252,16 @@ def extract_anthropic_usage_from_event(event: Any) -> TokenUsage:
if hasattr(event, "usage") and event.usage:
usage["output_tokens"] = getattr(event.usage, "output_tokens", 0)

# Extract web search count from usage
if hasattr(event.usage, "server_tool_use"):
server_tool_use = event.usage.server_tool_use
if hasattr(server_tool_use, "web_search_requests"):
web_search_count = int(
getattr(server_tool_use, "web_search_requests", 0)
)
if web_search_count > 0:
usage["web_search_count"] = web_search_count

return usage


Expand Down
54 changes: 53 additions & 1 deletion posthog/ai/gemini/gemini_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,46 @@ def format_gemini_input(contents: Any) -> List[FormattedMessage]:
return [_format_object_message(contents)]


def extract_gemini_web_search_count(response: Any) -> int:
"""
Extract web search count from Gemini response.

Gemini bills per request that uses grounding, not per query.
Returns 1 if grounding_metadata is present, 0 otherwise.

Args:
response: The response from Gemini API

Returns:
1 if web search/grounding was used, 0 otherwise
"""

# Check for grounding_metadata in candidates
if hasattr(response, "candidates"):
for candidate in response.candidates:
if (
hasattr(candidate, "grounding_metadata")
and candidate.grounding_metadata
):
return 1

# Also check for google_search or grounding in function call names
if hasattr(candidate, "content") and candidate.content:
if hasattr(candidate.content, "parts") and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, "function_call") and part.function_call:
function_name = getattr(
part.function_call, "name", ""
).lower()
if (
"google_search" in function_name
or "grounding" in function_name
):
return 1

return 0


def _extract_usage_from_metadata(metadata: Any) -> TokenUsage:
"""
Common logic to extract usage from Gemini metadata.
Expand Down Expand Up @@ -382,7 +422,14 @@ def extract_gemini_usage_from_response(response: Any) -> TokenUsage:
if not hasattr(response, "usage_metadata") or not response.usage_metadata:
return TokenUsage(input_tokens=0, output_tokens=0)

return _extract_usage_from_metadata(response.usage_metadata)
usage = _extract_usage_from_metadata(response.usage_metadata)

# Add web search count if present
web_search_count = extract_gemini_web_search_count(response)
if web_search_count > 0:
usage["web_search_count"] = web_search_count

return usage


def extract_gemini_usage_from_chunk(chunk: Any) -> TokenUsage:
Expand All @@ -404,6 +451,11 @@ def extract_gemini_usage_from_chunk(chunk: Any) -> TokenUsage:
# Use the shared helper to extract usage
usage = _extract_usage_from_metadata(chunk.usage_metadata)

# Add web search count if present
web_search_count = extract_gemini_web_search_count(chunk)
if web_search_count > 0:
usage["web_search_count"] = web_search_count

return usage


Expand Down
92 changes: 92 additions & 0 deletions posthog/ai/openai/openai_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,83 @@ def format_openai_streaming_content(
return formatted


def extract_openai_web_search_count(response: Any) -> int:
"""
Extract web search count from OpenAI response.

Uses a two-tier detection strategy:
1. Priority 1 (exact count): Check for output[].type == "web_search_call" (Responses API)
2. Priority 2 (binary detection): Check for various web search indicators:
- Root-level citations, search_results, or usage.search_context_size (Perplexity)
- Annotations with type "url_citation" in choices/output

Args:
response: The response from OpenAI API

Returns:
Number of web search requests (exact count or binary 1/0)
"""

# Priority 1: Check for exact count in Responses API output
if hasattr(response, "output"):
web_search_count = 0
for item in response.output:
if hasattr(item, "type") and item.type == "web_search_call":
web_search_count += 1

web_search_count = max(0, web_search_count)

if web_search_count > 0:
return web_search_count

# Priority 2: Binary detection (returns 1 or 0)

# Check root-level indicators (Perplexity)
if hasattr(response, "citations"):
citations = getattr(response, "citations")
if citations and len(citations) > 0:
return 1

if hasattr(response, "search_results"):
search_results = getattr(response, "search_results")
if search_results and len(search_results) > 0:
return 1

if hasattr(response, "usage") and hasattr(response.usage, "search_context_size"):
if response.usage.search_context_size:
return 1

# Check for url_citation annotations in choices (Chat Completions)
if hasattr(response, "choices"):
for choice in response.choices:
if hasattr(choice, "message") and hasattr(choice.message, "annotations"):
annotations = choice.message.annotations
if annotations:
for annotation in annotations:
if (
hasattr(annotation, "type")
and annotation.type == "url_citation"
):
return 1

# Check for url_citation annotations in output (Responses API)
if hasattr(response, "output"):
for item in response.output:
if hasattr(item, "content") and isinstance(item.content, list):
for content_item in item.content:
if hasattr(content_item, "annotations"):
annotations = content_item.annotations
if annotations:
for annotation in annotations:
if (
hasattr(annotation, "type")
and annotation.type == "url_citation"
):
return 1

return 0


def extract_openai_usage_from_response(response: Any) -> TokenUsage:
"""
Extract usage statistics from a full OpenAI response (non-streaming).
Expand Down Expand Up @@ -312,6 +389,10 @@ def extract_openai_usage_from_response(response: Any) -> TokenUsage:
if reasoning_tokens > 0:
result["reasoning_tokens"] = reasoning_tokens

web_search_count = extract_openai_web_search_count(response)
if web_search_count > 0:
result["web_search_count"] = web_search_count

return result


Expand Down Expand Up @@ -358,6 +439,11 @@ def extract_openai_usage_from_chunk(
chunk.usage.completion_tokens_details.reasoning_tokens
)

# Extract web search count from the chunk (available in final streaming chunks)
web_search_count = extract_openai_web_search_count(chunk)
if web_search_count > 0:
usage["web_search_count"] = web_search_count

elif provider_type == "responses":
# For Responses API, usage is only in chunk.response.usage for completed events
if hasattr(chunk, "type") and chunk.type == "response.completed":
Expand Down Expand Up @@ -386,6 +472,12 @@ def extract_openai_usage_from_chunk(
response_usage.output_tokens_details.reasoning_tokens
)

# Extract web search count from the complete response
if hasattr(chunk, "response"):
web_search_count = extract_openai_web_search_count(chunk.response)
if web_search_count > 0:
usage["web_search_count"] = web_search_count

return usage


Expand Down
1 change: 1 addition & 0 deletions posthog/ai/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TokenUsage(TypedDict, total=False):
cache_read_input_tokens: Optional[int]
cache_creation_input_tokens: Optional[int]
reasoning_tokens: Optional[int]
web_search_count: Optional[int]


class ProviderResponse(TypedDict, total=False):
Expand Down
30 changes: 30 additions & 0 deletions posthog/ai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def merge_usage_stats(
if source_reasoning is not None:
current = target.get("reasoning_tokens") or 0
target["reasoning_tokens"] = current + source_reasoning

source_web_search = source.get("web_search_count")
if source_web_search is not None:
current = target.get("web_search_count") or 0
target["web_search_count"] = current + source_web_search

elif mode == "cumulative":
# Replace with latest values (already cumulative)
if source.get("input_tokens") is not None:
Expand All @@ -67,6 +73,9 @@ def merge_usage_stats(
]
if source.get("reasoning_tokens") is not None:
target["reasoning_tokens"] = source["reasoning_tokens"]
if source.get("web_search_count") is not None:
target["web_search_count"] = source["web_search_count"]

else:
raise ValueError(f"Invalid mode: {mode}. Must be 'incremental' or 'cumulative'")

Expand Down Expand Up @@ -311,6 +320,10 @@ def call_llm_and_track_usage(
if reasoning is not None and reasoning > 0:
event_properties["$ai_reasoning_tokens"] = reasoning

web_search_count = usage.get("web_search_count")
if web_search_count is not None and web_search_count > 0:
event_properties["$ai_web_search_count"] = web_search_count

if posthog_distinct_id is None:
event_properties["$process_person_profile"] = False

Expand Down Expand Up @@ -414,6 +427,14 @@ async def call_llm_and_track_usage_async(
if cache_creation is not None and cache_creation > 0:
event_properties["$ai_cache_creation_input_tokens"] = cache_creation

reasoning = usage.get("reasoning_tokens")
if reasoning is not None and reasoning > 0:
event_properties["$ai_reasoning_tokens"] = reasoning

web_search_count = usage.get("web_search_count")
if web_search_count is not None and web_search_count > 0:
event_properties["$ai_web_search_count"] = web_search_count

if posthog_distinct_id is None:
event_properties["$process_person_profile"] = False

Expand Down Expand Up @@ -535,6 +556,15 @@ def capture_streaming_event(
if value is not None and isinstance(value, int) and value > 0:
event_properties[f"$ai_{field}"] = value

# Add web search count if present (all providers)
web_search_count = event_data["usage_stats"].get("web_search_count")
if (
web_search_count is not None
and isinstance(web_search_count, int)
and web_search_count > 0
):
event_properties["$ai_web_search_count"] = web_search_count

# Handle provider-specific fields
if (
event_data["provider"] == "openai"
Expand Down
Loading
Loading