Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6c32588
Operate on a deepcopy of `$defs` in `JsonSchemaTransformer` instead o…
thomasst Dec 17, 2025
8b8a928
Ensure `type` field when converting `const` to `enum` in `GoogleJsonS…
majiayu000 Dec 18, 2025
4a5da9b
Bump `google-genai` to 1.56.0 (#3770)
petersli Dec 18, 2025
64a6fe4
Add AI SDK data chunk ID and tool approval types (#3760)
bendrucker Dec 19, 2025
1160591
Add tool approval integration for Vercel AI adapter
bendrucker Dec 19, 2025
85a415b
Add robustness improvements and tests for tool approval
bendrucker Dec 19, 2025
62f7262
Skip doc examples that are incomplete snippets
bendrucker Dec 19, 2025
f18cda6
Also skip linting for incomplete doc snippets
bendrucker Dec 19, 2025
cf6dad7
Add tests to cover edge cases for tool approval extraction
bendrucker Dec 19, 2025
2c8a0af
Use public interface for denied_tool_ids tests
bendrucker Dec 19, 2025
f731bc7
Fix coverage: add pragma comments and caching test
bendrucker Dec 19, 2025
296611a
Address PR review: add tests and fix documentation
bendrucker Dec 20, 2025
2359f4e
Address PR review comments for tool approval
bendrucker Dec 20, 2025
fb5bc2e
Use 'AI SDK UI v6' instead of 'AI SDK v6' for frontend requirement
bendrucker Dec 20, 2025
c077afb
Remove unnecessary tool_call_id checks (always present)
bendrucker Dec 20, 2025
e2e967c
Inline extraction into from_request and remove private method tests
bendrucker Dec 20, 2025
33eabc7
Consolidate tool approval tests to use from_request()
bendrucker Dec 20, 2025
e885d43
Remove trailing blank lines (ruff)
bendrucker Dec 20, 2025
676530b
Add edge case tests for tool approval extraction coverage
bendrucker Dec 20, 2025
9eebd80
Fix coverage: remove duplicate starlette check function
bendrucker Dec 20, 2025
a908ef7
Add test for explicit deferred_tool_results parameter
bendrucker Dec 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions docs/ui/vercel-ai.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Vercel AI Data Stream Protocol

Pydantic AI natively supports the [Vercel AI Data Stream Protocol](https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol#data-stream-protocol) to receive agent run input from, and stream events to, a [Vercel AI Elements](https://ai-sdk.dev/elements) frontend.
Pydantic AI natively supports the [Vercel AI Data Stream Protocol](https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol#data-stream-protocol) to receive agent run input from, and stream events to, a frontend using [AI SDK UI](https://ai-sdk.dev/docs/ai-sdk-ui/overview) hooks like [`useChat`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/use-chat). You can optionally use [AI Elements](https://ai-sdk.dev/elements) for pre-built UI components.


## Usage

Expand Down Expand Up @@ -36,7 +37,7 @@ async def chat(request: Request) -> Response:
If you're using a web framework not based on Starlette (e.g. Django or Flask) or need fine-grained control over the input or output, you can create a `VercelAIAdapter` instance and directly use its methods, which can be chained to accomplish the same thing as the `VercelAIAdapter.dispatch_request()` class method shown above:

1. The [`VercelAIAdapter.build_run_input()`][pydantic_ai.ui.vercel_ai.VercelAIAdapter.build_run_input] class method takes the request body as bytes and returns a Vercel AI [`RequestData`][pydantic_ai.ui.vercel_ai.request_types.RequestData] run input object, which you can then pass to the [`VercelAIAdapter()`][pydantic_ai.ui.vercel_ai.VercelAIAdapter] constructor along with the agent.
- You can also use the [`VercelAIAdapter.from_request()`][pydantic_ai.ui.UIAdapter.from_request] class method to build an adapter directly from a Starlette/FastAPI request.
- You can also use the [`VercelAIAdapter.from_request()`][pydantic_ai.ui.vercel_ai.VercelAIAdapter.from_request] class method to build an adapter directly from a Starlette/FastAPI request.
2. The [`VercelAIAdapter.run_stream()`][pydantic_ai.ui.UIAdapter.run_stream] method runs the agent and returns a stream of Vercel AI events. It supports the same optional arguments as [`Agent.run_stream_events()`](../agents.md#running-agents) and an optional `on_complete` callback function that receives the completed [`AgentRunResult`][pydantic_ai.agent.AgentRunResult] and can optionally yield additional Vercel AI events.
- You can also use [`VercelAIAdapter.run_stream_native()`][pydantic_ai.ui.UIAdapter.run_stream_native] to run the agent and return a stream of Pydantic AI events instead, which can then be transformed into Vercel AI events using [`VercelAIAdapter.transform_stream()`][pydantic_ai.ui.UIAdapter.transform_stream].
3. The [`VercelAIAdapter.encode_stream()`][pydantic_ai.ui.UIAdapter.encode_stream] method encodes the stream of Vercel AI events as SSE (HTTP Server-Sent Events) strings, which you can then return as a streaming response.
Expand Down Expand Up @@ -81,3 +82,27 @@ async def chat(request: Request) -> Response:
sse_event_stream = adapter.encode_stream(event_stream)
return StreamingResponse(sse_event_stream, media_type=accept)
```

## Tool Approval

!!! note
Tool approval requires AI SDK UI v6 or later on the frontend.

Pydantic AI supports human-in-the-loop tool approval workflows with AI SDK UI, allowing users to approve or deny tool executions before they run. See the [deferred tool calls documentation](../deferred-tools.md) for details on setting up tools that require approval.

To enable tool approval streaming, pass `tool_approval=True` when creating the adapter:

```py {test="skip" lint="skip"}
@app.post('/chat')
async def chat(request: Request) -> Response:
adapter = await VercelAIAdapter.from_request(request, agent=agent, tool_approval=True)
return adapter.streaming_response(adapter.run_stream())
```

When `tool_approval=True`, the adapter will:

1. Emit `tool-approval-request` chunks when tools with `requires_approval=True` are called
2. Automatically extract approval responses from follow-up requests
3. Emit `tool-output-denied` chunks for rejected tools

On the frontend, AI SDK UI's [`useChat`](https://ai-sdk.dev/docs/reference/ai-sdk-ui/use-chat) hook handles the approval flow. You can use the [`Confirmation`](https://ai-sdk.dev/elements/components/confirmation) component from AI Elements for a pre-built approval UI, or build your own using the hook's `addToolResult` function.
2 changes: 1 addition & 1 deletion pydantic_ai_slim/pydantic_ai/_json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(
self.prefer_inlined_defs = prefer_inlined_defs
self.simplify_nullable_unions = simplify_nullable_unions

self.defs: dict[str, JsonSchema] = self.schema.get('$defs', {})
self.defs: dict[str, JsonSchema] = deepcopy(self.schema.get('$defs', {}))
self.refs_stack: list[str] = []
self.recursive_refs = set[str]()

Expand Down
11 changes: 11 additions & 0 deletions pydantic_ai_slim/pydantic_ai/profiles/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ def transform(self, schema: JsonSchema) -> JsonSchema:
if (const := schema.pop('const', None)) is not None:
# Gemini doesn't support const, but it does support enum with a single value
schema['enum'] = [const]
# If type is not present, infer it from the const value for Gemini API compatibility
if 'type' not in schema:
if isinstance(const, str):
schema['type'] = 'string'
elif isinstance(const, bool):
# bool must be checked before int since bool is a subclass of int in Python
schema['type'] = 'boolean'
elif isinstance(const, int):
schema['type'] = 'integer'
elif isinstance(const, float):
schema['type'] = 'number'
schema.pop('discriminator', None)
schema.pop('examples', None)

Expand Down
10 changes: 10 additions & 0 deletions pydantic_ai_slim/pydantic_ai/ui/_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class UIAdapter(ABC, Generic[RunInputT, MessageT, EventT, AgentDepsT, OutputData
accept: str | None = None
"""The `Accept` header value of the request, used to determine how to encode the protocol-specific events for the streaming response."""

tool_approval: bool = False
"""Whether to enable tool approval streaming for human-in-the-loop workflows."""

deferred_tool_results: DeferredToolResults | None = None
"""Deferred tool results extracted from the request, used for tool approval workflows."""

@classmethod
async def from_request(
cls, request: Request, *, agent: AbstractAgent[AgentDepsT, OutputDataT]
Expand Down Expand Up @@ -237,6 +243,10 @@ def run_stream_native(
toolsets: Optional additional toolsets for this run.
builtin_tools: Optional additional builtin tools to use for this run.
"""
# Use instance field as fallback if not explicitly passed
if deferred_tool_results is None:
deferred_tool_results = self.deferred_tool_results

message_history = [*(message_history or []), *self.messages]

toolset = self.toolset
Expand Down
3 changes: 3 additions & 0 deletions pydantic_ai_slim/pydantic_ai/ui/_event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class UIEventStream(ABC, Generic[RunInputT, EventT, AgentDepsT, OutputDataT]):
accept: str | None = None
"""The `Accept` header value of the request, used to determine how to encode the protocol-specific events for the streaming response."""

tool_approval: bool = False
"""Whether tool approval streaming is enabled for human-in-the-loop workflows."""

message_id: str = field(default_factory=lambda: str(uuid4()))
"""The message ID to use for the next event."""

Expand Down
51 changes: 48 additions & 3 deletions pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
VideoUrl,
)
from ...output import OutputDataT
from ...tools import AgentDepsT
from ...tools import AgentDepsT, DeferredToolApprovalResult, DeferredToolResults
from .. import MessagesBuilder, UIAdapter, UIEventStream
from ._event_stream import VercelAIEventStream
from .request_types import (
Expand All @@ -51,6 +51,7 @@
SourceUrlUIPart,
StepStartUIPart,
TextUIPart,
ToolApprovalResponded,
ToolInputAvailablePart,
ToolOutputAvailablePart,
ToolOutputErrorPart,
Expand All @@ -61,7 +62,9 @@
from .response_types import BaseChunk

if TYPE_CHECKING:
pass
from starlette.requests import Request

from ...agent import AbstractAgent


__all__ = ['VercelAIAdapter']
Expand All @@ -78,9 +81,51 @@ def build_run_input(cls, body: bytes) -> RequestData:
"""Build a Vercel AI run input object from the request body."""
return request_data_ta.validate_json(body)

@classmethod
async def from_request(
cls,
request: Request,
*,
agent: AbstractAgent[AgentDepsT, OutputDataT],
tool_approval: bool = False,
) -> VercelAIAdapter[AgentDepsT, OutputDataT]:
"""Create a Vercel AI adapter from a request.

Args:
request: The incoming Starlette/FastAPI request.
agent: The Pydantic AI agent to run.
tool_approval: Whether to enable tool approval streaming for human-in-the-loop workflows.
"""
run_input = cls.build_run_input(await request.body())

# Extract deferred tool results from messages when tool_approval is enabled
deferred_tool_results: DeferredToolResults | None = None
if tool_approval:
approvals: dict[str, bool | DeferredToolApprovalResult] = {}
for msg in run_input.messages:
if msg.role != 'assistant':
continue
for part in msg.parts:
if not isinstance(part, ToolUIPart | DynamicToolUIPart):
continue
approval = part.approval
if not isinstance(approval, ToolApprovalResponded):
continue
approvals[part.tool_call_id] = approval.approved
if approvals:
deferred_tool_results = DeferredToolResults(approvals=approvals)

return cls(
agent=agent,
run_input=run_input,
accept=request.headers.get('accept'),
tool_approval=tool_approval,
deferred_tool_results=deferred_tool_results,
)

def build_event_stream(self) -> UIEventStream[RequestData, BaseChunk, AgentDepsT, OutputDataT]:
"""Build a Vercel AI event stream transformer."""
return VercelAIEventStream(self.run_input, accept=self.accept)
return VercelAIEventStream(self.run_input, accept=self.accept, tool_approval=self.tool_approval)

@cached_property
def messages(self) -> list[ModelMessage]:
Expand Down
68 changes: 62 additions & 6 deletions pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from collections.abc import AsyncIterator, Mapping
from dataclasses import dataclass
from functools import cached_property
from typing import Any
from uuid import uuid4

from pydantic_core import to_json

Expand All @@ -13,6 +15,7 @@
BuiltinToolCallPart,
BuiltinToolReturnPart,
FilePart,
FinishReason as PydanticFinishReason,
FunctionToolResultEvent,
RetryPromptPart,
TextPart,
Expand All @@ -23,15 +26,22 @@
ToolCallPartDelta,
)
from ...output import OutputDataT
from ...tools import AgentDepsT
from ...run import AgentRunResultEvent
from ...tools import AgentDepsT, DeferredToolRequests
from .. import UIEventStream
from .request_types import RequestData
from .request_types import (
DynamicToolUIPart,
RequestData,
ToolApprovalResponded,
ToolUIPart,
)
from .response_types import (
BaseChunk,
DoneChunk,
ErrorChunk,
FileChunk,
FinishChunk,
FinishReason,
FinishStepChunk,
ReasoningDeltaChunk,
ReasoningEndChunk,
Expand All @@ -41,13 +51,24 @@
TextDeltaChunk,
TextEndChunk,
TextStartChunk,
ToolApprovalRequestChunk,
ToolInputAvailableChunk,
ToolInputDeltaChunk,
ToolInputStartChunk,
ToolOutputAvailableChunk,
ToolOutputDeniedChunk,
ToolOutputErrorChunk,
)

# Map Pydantic AI finish reasons to Vercel AI format
_FINISH_REASON_MAP: dict[PydanticFinishReason, FinishReason] = {
'stop': 'stop',
'length': 'length',
'content_filter': 'content-filter',
'tool_call': 'tool-calls',
'error': 'error',
}

__all__ = ['VercelAIEventStream']

# See https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol#data-stream-protocol
Expand All @@ -64,6 +85,21 @@ class VercelAIEventStream(UIEventStream[RequestData, BaseChunk, AgentDepsT, Outp
"""UI event stream transformer for the Vercel AI protocol."""

_step_started: bool = False
_finish_reason: FinishReason = None

@cached_property
def _denied_tool_ids(self) -> set[str]:
"""Get the set of tool_call_ids that were denied by the user."""
denied_ids: set[str] = set()
for msg in self.run_input.messages:
if msg.role != 'assistant':
continue
for part in msg.parts:
if not isinstance(part, ToolUIPart | DynamicToolUIPart):
continue
if isinstance(part.approval, ToolApprovalResponded) and not part.approval.approved:
denied_ids.add(part.tool_call_id)
return denied_ids

@property
def response_headers(self) -> Mapping[str, str] | None:
Expand All @@ -85,10 +121,25 @@ async def before_response(self) -> AsyncIterator[BaseChunk]:
async def after_stream(self) -> AsyncIterator[BaseChunk]:
yield FinishStepChunk()

yield FinishChunk()
yield FinishChunk(finish_reason=self._finish_reason)
yield DoneChunk()

async def handle_run_result(self, event: AgentRunResultEvent) -> AsyncIterator[BaseChunk]:
pydantic_reason = event.result.response.finish_reason
if pydantic_reason:
self._finish_reason = _FINISH_REASON_MAP.get(pydantic_reason, 'unknown')

# Emit tool approval requests for deferred approvals (only when tool_approval is enabled)
output = event.result.output
if self.tool_approval and isinstance(output, DeferredToolRequests):
for tool_call in output.approvals:
yield ToolApprovalRequestChunk(
approval_id=str(uuid4()),
tool_call_id=tool_call.tool_call_id,
)

async def on_error(self, error: Exception) -> AsyncIterator[BaseChunk]:
self._finish_reason = 'error'
yield ErrorChunk(error_text=str(error))

async def handle_text_start(self, part: TextPart, follows_text: bool = False) -> AsyncIterator[BaseChunk]:
Expand Down Expand Up @@ -182,10 +233,15 @@ async def handle_file(self, part: FilePart) -> AsyncIterator[BaseChunk]:

async def handle_function_tool_result(self, event: FunctionToolResultEvent) -> AsyncIterator[BaseChunk]:
part = event.result
if isinstance(part, RetryPromptPart):
yield ToolOutputErrorChunk(tool_call_id=part.tool_call_id, error_text=part.model_response())
tool_call_id = part.tool_call_id

# Check if this tool was denied by the user (only when tool_approval is enabled)
if self.tool_approval and tool_call_id in self._denied_tool_ids:
yield ToolOutputDeniedChunk(tool_call_id=tool_call_id)
elif isinstance(part, RetryPromptPart):
yield ToolOutputErrorChunk(tool_call_id=tool_call_id, error_text=part.model_response())
else:
yield ToolOutputAvailableChunk(tool_call_id=part.tool_call_id, output=self._tool_return_output(part))
yield ToolOutputAvailableChunk(tool_call_id=tool_call_id, output=self._tool_return_output(part))

# ToolCallResultEvent.content may hold user parts (e.g. text, images) that Vercel AI does not currently have events for

Expand Down
Loading