Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f7538f4
Allow dynamic in Temporal workflows
qyinm Dec 19, 2025
bf07386
Add exports for MCP wrappers and improve toolset validation logic
qyinm Dec 20, 2025
ef09ace
Replace dynamic toolset test to `test_temporal.py`
qyinm Dec 20, 2025
908e71c
Make Temporal toolset wrapper parameters optional
qyinm Dec 20, 2025
500690b
Refactor Temporal toolset validation to use visit_and_replace and rem…
qyinm Dec 20, 2025
2d290c6
Refactor Temporal toolset validation to traverse toolset hierarchy
qyinm Dec 20, 2025
cef3192
Update Temporal toolset validation with CombinedToolset workflow test
qyinm Dec 20, 2025
14eced2
Fix TemporalAgent model override and make TemporalDynamicToolset para…
qyinm Dec 20, 2025
0e68ef7
Restore test for runtime toolset registration pattern
qyinm Dec 20, 2025
9cc0ce6
Add documentation for runtime toolset registration pattern
qyinm Dec 20, 2025
db55bb0
Support named toolset pre-registration
qyinm Dec 20, 2025
4ca08e4
Add DynamicToolset validation and export TemporalDynamicToolset
qyinm Dec 20, 2025
2064ceb
Fix import order in Temporal durable execution docs
qyinm Dec 20, 2025
c43ec0a
Fix Temporal agent coverage with pragma no cover comments
qyinm Dec 21, 2025
746d751
Add tests and pragmas to improve Temporal coverage
qyinm Dec 21, 2025
86b9068
Add pragma comments for unreachable workflow code in temporal tests
qyinm Dec 21, 2025
643f5e3
Fix pragma no cover comments to match actual coverage
qyinm Dec 21, 2025
44e4f38
Remove incorrect pragma comments flagged by strict-no-cover
qyinm Dec 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions docs/durable_execution/temporal.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: will review docs later

Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,179 @@ class MultiModelWorkflow:
return result.output
```

### Toolset Registration at Runtime

[`Agent.run(toolsets=...)`][pydantic_ai.agent.Agent.run] normally supports passing toolsets directly. However, `TemporalAgent` requires toolsets to be wrapped in a [`TemporalWrapperToolset`][pydantic_ai.durable_exec.temporal.TemporalWrapperToolset] (such as `TemporalFunctionToolset`, `TemporalMCPServer`, etc.) because Temporal activities must be registered with the worker before the workflow starts.

To use toolsets at runtime with `TemporalAgent`, you need to:

1. Wrap the toolset using the appropriate `TemporalWrapperToolset` class
2. Register its activities with the Temporal Worker
3. Pass the wrapped toolset at runtime via `run(toolsets=[...])`

This pattern allows multiple agents to share the same set of tools without duplicating activity registrations, enabling dynamic agent creation while maintaining proper tool registration.

Alternatively, you can pre-register toolsets with the `TemporalAgent` constructor and reference them by name at runtime. This is similar to how models are handled.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would present this as the preferred approach as it's a lot more convenient, and then explain manual wrapping as an alternative.


### Using Toolset Instances

Here's an example showing how to register and use shared toolsets across multiple agents using toolset instances:

```python {title="shared_toolset_temporal.py" test="skip"}
from datetime import timedelta

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.workflow import ActivityConfig

from pydantic_ai import Agent, FunctionToolset
from pydantic_ai.durable_exec.temporal import TemporalAgent, TemporalFunctionToolset


# Define shared tools
def web_search(query: str) -> str:
"""Search the web for information."""
# Actual web search implementation
return f'Search results for: {query}'


def calculate(expression: str) -> str:
"""Evaluate a mathematical expression."""
# Actual calculation implementation
return f'Result: {expression}'


# Create a shared toolset
shared_toolset = FunctionToolset(
tools=[web_search, calculate],
id='shared_tools', # (1)!
)

# Wrap the toolset for Temporal
wrapped_shared_toolset = TemporalFunctionToolset(
shared_toolset,
activity_name_prefix='shared', # (2)!
activity_config=ActivityConfig(start_to_close_timeout=timedelta(minutes=2)),
deps_type=type(None), # (3)!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be required

)

# Create multiple agents that can use the shared toolset
research_agent = Agent(
'openai:gpt-5',
name='research_agent',
)
math_agent = Agent(
'anthropic:claude-sonnet-4.5',
name='math_agent',
)

# Wrap agents for Temporal (without pre-registering toolsets)
temporal_research_agent = TemporalAgent(research_agent)
temporal_math_agent = TemporalAgent(math_agent)


@workflow.defn
class SharedToolsetWorkflow:
__pydantic_ai_agents__ = [temporal_research_agent, temporal_math_agent] # (4)!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have a convenient way to register the temporal toolsets' activities as well


@workflow.run
async def run(self, task: str, use_research: bool) -> str:
if use_research:
# Research agent uses shared toolset at runtime
result = await temporal_research_agent.run(
task,
toolsets=[wrapped_shared_toolset], # (5)!
)
else:
# Math agent also uses the same shared toolset
result = await temporal_math_agent.run(
task,
toolsets=[wrapped_shared_toolset],
)
return result.output


async def main():
client = await Client.connect('localhost:7233')

async with Worker(
client,
task_queue='shared-toolset-queue',
workflows=[SharedToolsetWorkflow],
activities=[
# Register shared toolset activities once
*wrapped_shared_toolset.temporal_activities, # (6)!
],
):
result = await client.execute_workflow(
SharedToolsetWorkflow.run,
args=['Search for Python tutorials', True],
id='shared-toolset-workflow',
task_queue='shared-toolset-queue',
)
print(result)
```

1. The toolset must have a unique `id` to be used with Temporal.
2. The `activity_name_prefix` ensures activity names don't conflict across different toolset registrations.
3. `deps_type` must be specified if your tools use dependencies. Use `type(None)` if no dependencies are needed.
4. The `__pydantic_ai_agents__` pattern automatically registers agent activities with the workflow.
5. Pass the wrapped toolset at runtime to any agent that needs it.
6. Register the shared toolset's activities once with the worker. Agent activities are automatically registered via `__pydantic_ai_agents__`.

6. Register the shared toolset's activities once with the worker. Agent activities are automatically registered via `__pydantic_ai_agents__`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication


### Using Named Toolsets

You can also pre-register toolsets with names and reference them by name at runtime:

```python {title="named_toolset_temporal.py" test="skip"}
from temporalio import workflow

from pydantic_ai import Agent, FunctionToolset
from pydantic_ai.durable_exec.temporal import TemporalAgent, TemporalFunctionToolset


# Define tools and toolset
def magic_trick(input: str) -> str:
return f'Magic: {input}'

magic_toolset = FunctionToolset(tools=[magic_trick], id='magic')

# Wrap toolset
wrapped_magic_toolset = TemporalFunctionToolset(
magic_toolset,
activity_name_prefix='magic',
deps_type=type(None),
)

# Create agent with pre-registered toolset
agent = Agent('openai:gpt-5', name='magic_agent')
temporal_agent = TemporalAgent(
agent,
toolsets={'magic_tools': wrapped_magic_toolset}, # (1)!
)

@workflow.defn
class MagicWorkflow:
__pydantic_ai_agents__ = [temporal_agent]

@workflow.run
async def run(self, input: str) -> str:
# Reference toolset by name
result = await temporal_agent.run(
input,
toolsets=['magic_tools'], # (2)!
)
return result.output
```

1. Pass a dictionary of toolsets to `TemporalAgent` to pre-register them. The keys are the names used to reference the toolsets at runtime.
2. Pass the listing of toolset names to `run(toolsets=[...])` to use the pre-registered toolsets.

You can also wrap toolsets at agent creation time by passing them to the wrapped agent's constructor, which will automatically temporalize them. The runtime pattern shown above is useful when you want to share toolsets across multiple agents or select toolsets dynamically based on workflow parameters.

## Activity Configuration

Temporal activity configuration, like timeouts and retry policies, can be customized by passing [`temporalio.workflow.ActivityConfig`](https://python.temporal.io/temporalio.workflow.ActivityConfig.html) objects to the `TemporalAgent` constructor:
Expand Down
22 changes: 22 additions & 0 deletions pydantic_ai_slim/pydantic_ai/durable_exec/temporal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

from ...exceptions import UserError
from ._agent import TemporalAgent
from ._dynamic_toolset import TemporalDynamicToolset
from ._function_toolset import TemporalFunctionToolset
from ._logfire import LogfirePlugin
from ._mcp import TemporalMCPToolset
from ._run_context import TemporalRunContext
from ._toolset import TemporalWrapperToolset
from ._workflow import PydanticAIWorkflow
Expand All @@ -26,9 +29,28 @@
'AgentPlugin',
'TemporalRunContext',
'TemporalWrapperToolset',
'TemporalDynamicToolset',
'TemporalFunctionToolset',
'TemporalMCPToolset',
'PydanticAIWorkflow',
]

try:
from . import _mcp_server as _mcp_server
except ImportError:
pass
else:
TemporalMCPServer = _mcp_server.TemporalMCPServer
__all__.append('TemporalMCPServer')

try:
from . import _fastmcp_toolset as _fastmcp_toolset
except ImportError:
pass
else:
TemporalFastMCPToolset = _fastmcp_toolset.TemporalFastMCPToolset
__all__.append('TemporalFastMCPToolset')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm a bit uncomfortable with this. Instead of exposing the types directly, what if we tell the user to use the temporalize_toolset function that intelligently wraps any toolset?


# We need eagerly import the anyio backends or it will happens inside workflow code and temporal has issues
# Note: It's difficult to add a test that covers this because pytest presumably does these imports itself
# when you have a @pytest.mark.anyio somewhere.
Expand Down
Loading
Loading