Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pydantic_ai_harness/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .background_tools import BackgroundTools
from .code_mode import CodeMode

__all__ = ['CodeMode']
__all__ = ['BackgroundTools', 'CodeMode']


def __getattr__(name: str) -> object:
if name == 'BackgroundTools':
from .background_tools import BackgroundTools

return BackgroundTools
if name == 'CodeMode':
from .code_mode import CodeMode

Expand Down
111 changes: 111 additions & 0 deletions pydantic_ai_harness/background_tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Background Tools

Run selected tools as fire-and-forget asyncio tasks, so the agent can keep working while they finish.

## The problem

Some tools take seconds to minutes -- deep research, big aggregations, sub-agent delegation. With normal tool calls the agent is blocked: it makes the call, waits, then plans its next step. Over a long task the conversation effectively serializes.

## The solution

`BackgroundTools` spawns the matching tool calls as `asyncio.Task`s. The agent receives an immediate acknowledgment string and continues planning. When the task finishes, its result is enqueued as a follow-up message via [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue]; Pydantic AI's pending message queue redirects the agent into a fresh `ModelRequest` instead of ending, so the model sees the result and can use it.

## Usage

```python
from pydantic_ai import Agent
from pydantic_ai_harness import BackgroundTools

agent = Agent('openai:gpt-5', capabilities=[BackgroundTools()])

@agent.tool_plain(metadata={'background': True})
async def slow_research(query: str) -> str:
"""Research a topic thoroughly. Runs in the background."""
return await do_expensive_research(query)
```

By default any tool with `metadata={'background': True}` runs in the background. The agent's instructions are augmented automatically so the model knows it shouldn't block waiting for the result.

## Selecting which tools run in the background

`BackgroundTools(tools=...)` accepts the standard [`ToolSelector`][pydantic_ai.tools.ToolSelector]:

```python
# By metadata key (default)
BackgroundTools() # tools with metadata={'background': True}
BackgroundTools(tools={'background': True}) # explicit form
BackgroundTools(tools={'kind': 'research'}) # custom metadata key

# By name
BackgroundTools(tools=['slow_research', 'deep_dig'])

# By predicate
BackgroundTools(tools=lambda ctx, td: td.name.startswith('research_'))
```

### Marking a whole MCP server or toolset

Combine with [`SetToolMetadata`][pydantic_ai.capabilities.SetToolMetadata] or `FunctionToolset.with_metadata(...)` to mark every tool from a source as background, without touching individual definitions:

```python
from pydantic_ai import Agent
from pydantic_ai.capabilities import MCP, SetToolMetadata
from pydantic_ai_harness import BackgroundTools

agent = Agent('openai:gpt-5', capabilities=[
MCP('https://research.example/mcp/'),
SetToolMetadata(predicate=lambda td: td.name.startswith('mcp_'), background=True),
BackgroundTools(),
])
```

## Result delivery

Results are enqueued as `'follow_up'` priority messages on Pydantic AI's pending message queue. When the agent would otherwise produce a final result, the queue is drained and the agent continues with a fresh `ModelRequest` containing all completed background results.

The follow-up message format is a `SystemPromptPart` containing:

- On success: `Background tool 'X' (task <id>) completed.\nResult: <return value>`
- On failure: `Background tool 'X' (task <id>) failed: <error message>`

The model sees the task ID alongside the result so it can correlate against the ack string it received earlier.

## Lifecycle and cancellation

- Each agent run gets fresh task state via the capability's `for_run` hook -- concurrent runs do not share tasks
- If the surrounding agent run is cancelled (e.g. via `asyncio.wait_for` timeout), all live background tasks are cancelled in the capability's `wrap_run` cleanup
- `asyncio.CancelledError` from a cancelled task does not produce a follow-up; it propagates as a normal task cancellation

## Limitations

- **Streaming**: follow-up delivery requires `agent.run()` or explicit `agent_run.next()` driving. A bare `async for node in agent_run:` loop does not run `after_node_run`, so background results won't be delivered.
- **Temporal / DBOS**: tools run inside durable activities and don't share state with the surrounding workflow. Tool-side `ctx.enqueue` calls do not currently propagate back, so background results from durable tools are lost. If you need this, file an issue.

## API

```python
BackgroundTools(
tools: ToolSelector = {'background': True},
)
```

## Agent spec (YAML/JSON)

```yaml
# agent.yaml
model: openai:gpt-5
capabilities:
- BackgroundTools: {}
```

```python
from pydantic_ai import Agent
from pydantic_ai_harness import BackgroundTools

agent = Agent.from_file('agent.yaml', custom_capability_types=[BackgroundTools])
```

## Further reading

- [Pydantic AI message history -- injecting messages mid-run](https://ai.pydantic.dev/message-history/#injecting-messages-mid-run) -- the underlying primitive
- [Pydantic AI capabilities](https://ai.pydantic.dev/capabilities/)
5 changes: 5 additions & 0 deletions pydantic_ai_harness/background_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Run selected tools as background asyncio tasks with async result delivery."""

from ._capability import BackgroundTools

__all__ = ['BackgroundTools']
165 changes: 165 additions & 0 deletions pydantic_ai_harness/background_tools/_capability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Background tools capability that spawns selected tools as fire-and-forget tasks."""

from __future__ import annotations

import asyncio
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from pydantic_ai.capabilities import AbstractCapability, CapabilityOrdering, PendingMessageDrainCapability
from pydantic_ai.messages import SystemPromptPart, ToolCallPart
from pydantic_ai.tools import (
AgentDepsT,
RunContext,
ToolDefinition,
ToolSelector,
matches_tool_selector,
)

if TYPE_CHECKING:
from pydantic_ai import _agent_graph
from pydantic_ai.capabilities.abstract import WrapToolExecuteHandler
from pydantic_ai.result import FinalResult
from pydantic_graph import End


_DEFAULT_SELECTOR: dict[str, Any] = {'background': True}

_INSTRUCTIONS = """\
Some tools run in the background: when you call them you'll get an immediate \
acknowledgment, and the real result will be delivered automatically as a follow-up \
message when the task completes. Continue working on other things in the meantime; \
do not block waiting for the result.\
"""


@dataclass
class BackgroundTools(AbstractCapability[AgentDepsT]):
"""Run selected tools as fire-and-forget asyncio tasks.

When the model calls a tool that matches the selector, the capability spawns the
tool's handler in an `asyncio.Task` and immediately returns an acknowledgment
string to the agent. When the task completes, its result (or error) is enqueued
via [`RunContext.enqueue`][pydantic_ai.tools.RunContext.enqueue] as a `'follow_up'`
message — Pydantic AI's pending message queue redirects the agent to a fresh
`ModelRequest` instead of ending, so the model receives the result and can act on it.

```python
from pydantic_ai import Agent
from pydantic_ai_harness import BackgroundTools

# Default: any tool with `metadata={'background': True}` runs in the background.
agent = Agent('openai:gpt-5', capabilities=[BackgroundTools()])

@agent.tool_plain(metadata={'background': True})
async def slow_research(query: str) -> str:
return await do_expensive_research(query)
```

Combine with [`SetToolMetadata`][pydantic_ai.capabilities.SetToolMetadata] to mark
every tool from a specific MCP server, or with `FunctionToolset.with_metadata(...)`
to mark a whole toolset. Or pass a name list / predicate via `tools=...` to ignore
metadata entirely.
"""

tools: ToolSelector[AgentDepsT] = field(default_factory=lambda: dict(_DEFAULT_SELECTOR))
"""Which tools should run in the background.

- `dict[str, Any]` (default `{'background': True}`): tools whose metadata deeply
includes the given key-value pairs.
- `'all'`: every tool in the agent's toolset (rarely what you want).
- `Sequence[str]`: tools with matching names.
- Callable `(ctx, tool_def) -> bool | Awaitable[bool]`: custom predicate.
"""

_tasks: dict[str, asyncio.Task[None]] = field(
default_factory=dict[str, 'asyncio.Task[None]'], init=False, repr=False
)
_completion_event: asyncio.Event = field(default_factory=asyncio.Event, init=False, repr=False)

def get_ordering(self) -> CapabilityOrdering:
# `after_node_run` runs in reverse order (outermost runs last). We need to
# wait for at least one background task BEFORE the core
# `PendingMessageDrainCapability` checks the queue for follow-ups, so
# drain must be outermost relative to us.
return CapabilityOrdering(wrapped_by=[PendingMessageDrainCapability])

def get_instructions(self) -> str:
return _INSTRUCTIONS

async def for_run(self, ctx: RunContext[AgentDepsT]) -> BackgroundTools[AgentDepsT]:
# Fresh per-run state so concurrent runs don't share tasks.
return BackgroundTools(tools=self.tools)

async def wrap_tool_execute(
self,
ctx: RunContext[AgentDepsT],
*,
call: ToolCallPart,
tool_def: ToolDefinition,
args: dict[str, Any],
handler: WrapToolExecuteHandler,
) -> Any:
if not await matches_tool_selector(self.tools, ctx, tool_def):
return await handler(args)

task_id = call.tool_call_id
tool_name = call.tool_name

async def _run() -> None:
try:
result = await handler(args)
ctx.enqueue(
SystemPromptPart(f"Background tool '{tool_name}' (task {task_id}) completed.\nResult: {result}"),
priority='follow_up',
)
except asyncio.CancelledError:
# Run cleanup cancelled us; don't enqueue a spurious failure follow-up.
raise
except Exception as e:
ctx.enqueue(
SystemPromptPart(f"Background tool '{tool_name}' (task {task_id}) failed: {e}"),
priority='follow_up',
)
finally:
self._tasks.pop(task_id, None)
self._completion_event.set()

self._tasks[task_id] = asyncio.create_task(_run())
return (
f"Tool '{tool_name}' is running in background (task {task_id}). "
f'You will receive the result automatically when it completes. '
f'Continue with other work in the meantime.'
)

async def after_node_run(
self,
ctx: RunContext[AgentDepsT],
*,
node: _agent_graph.AgentNode[AgentDepsT, Any],
result: _agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]],
) -> _agent_graph.AgentNode[AgentDepsT, Any] | End[FinalResult[Any]]:
from pydantic_graph import End

if not isinstance(result, End) or not self._tasks:
return result

# Hold End until at least one task completes so the drain capability
# (which runs after us in reverse order) has a follow-up to deliver.
self._completion_event.clear()
await self._completion_event.wait()
return result

async def wrap_run(
self,
ctx: RunContext[AgentDepsT],
*,
handler: Any,
) -> Any:
Comment on lines +157 to +158
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 handler: Any in wrap_run violates "no Any types" rule

The AGENTS.md coding standard explicitly requires "pyright strict mode -- no Any types, full type annotations". In wrap_run, the handler parameter and return type both use Any (lines 157-158), whereas the analogous wrap_tool_execute method correctly imports and uses the specific WrapToolExecuteHandler type from pydantic_ai.capabilities.abstract (_capability.py:21). There should be a corresponding WrapRunHandler type (or equivalent) imported and used here instead of Any.

Prompt for agents
In pydantic_ai_harness/background_tools/_capability.py, the wrap_run method uses `handler: Any` and `-> Any` on lines 157-158. The AGENTS.md rule mandates no Any types. The wrap_tool_execute method already correctly imports WrapToolExecuteHandler from pydantic_ai.capabilities.abstract. Check pydantic_ai.capabilities.abstract for a WrapRunHandler type (or similar) and import it in the TYPE_CHECKING block at line 21 to replace the Any usages. The return type should also be specified concretely (likely the agent run result type from the abstract base class signature).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

try:
return await handler()
finally:
for task in self._tasks.values():
task.cancel()
if self._tasks:
await asyncio.gather(*self._tasks.values(), return_exceptions=True)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ lint = [
]

[tool.uv.sources]
pydantic-ai-slim = { git = 'https://github.com/pydantic/pydantic-ai.git', branch = 'main', subdirectory = 'pydantic_ai_slim' }
pydantic-ai-slim = { git = 'https://github.com/pydantic/pydantic-ai.git', branch = 'background-tools', subdirectory = 'pydantic_ai_slim' }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 pydantic-ai-slim pinned to feature branch background-tools instead of main

The pyproject.toml source for pydantic-ai-slim was changed from branch = 'main' to branch = 'background-tools'. This is clearly intentional for development of this feature (the capability depends on APIs in that branch), but this PR should not be merged until the background-tools branch is merged into main in the pydantic-ai repo, otherwise the published package would depend on a feature branch that could be deleted or force-pushed.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


[tool.hatch.version]
source = 'uv-dynamic-versioning'
Expand Down
Empty file.
Loading
Loading