Skip to content

Conversation

@Danipulok
Copy link
Contributor

Closes #3624

@Danipulok
Copy link
Contributor Author

Danipulok commented Dec 14, 2025

When I was working on tests, I discovered that agent.run_stream + result.stream_text(delta=False) (default behavior) does not call TextOutput(my_tool)

It seems this is what was discussed in #3393 (comment), so it does not deserve a new issue?

Here are the tests that demonstrate bugged behavior: link

I think I would like to include the failing streaming test with xfail, but what issue/comment should I link? Or maybe don't commit it at all?

@Danipulok
Copy link
Contributor Author

About tests: I created a new class designed to test partial_output and separated tests into tests for sync and streaming mode.
The tests were not deleted, just moved and formatted to look alike

Here are the changes:

test_output_validator_partial_sync

before: test_agent.py::test_output_validator_partial_sync

def test_output_validator_partial_sync():
"""Test that output validators receive correct value for `partial_output` in sync mode."""
call_log: list[tuple[str, bool]] = []
agent = Agent[None, str](TestModel(custom_output_text='test output'))
@agent.output_validator
def validate_output(ctx: RunContext[None], output: str) -> str:
call_log.append((output, ctx.partial_output))
return output
result = agent.run_sync('Hello')
assert result.output == 'test output'
assert call_log == snapshot([('test output', False)])

after: test_agent.py::TestPartialOutput::test_output_validator_text
https://github.com/Danipulok/pydantic-ai/blob/d08191ee0e84ea2ae54fd4c28e8d5076986c281e/tests/test_agent.py#L370-L387

test_output_validator_partial_stream_text
before: test_agent.py::test_output_validator_partial_stream_text

async def test_output_validator_partial_stream_text():
"""Test that output validators receive correct value for `partial_output` when using stream_text()."""
call_log: list[tuple[str, bool]] = []
async def stream_text(messages: list[ModelMessage], info: AgentInfo) -> AsyncIterator[str]:
for chunk in ['Hello', ' ', 'world', '!']:
yield chunk
agent = Agent(FunctionModel(stream_function=stream_text))
@agent.output_validator
def validate_output(ctx: RunContext[None], output: str) -> str:
call_log.append((output, ctx.partial_output))
return output
async with agent.run_stream('Hello') as result:
text_parts = []
async for chunk in result.stream_text(debounce_by=None):
text_parts.append(chunk)
assert text_parts[-1] == 'Hello world!'
assert call_log == snapshot(
[
('Hello', True),
('Hello ', True),
('Hello world', True),
('Hello world!', True),
('Hello world!', False),
]
)

after: test_streaming.py::TestPartialOutput::test_output_validator_text
https://github.com/Danipulok/pydantic-ai/blob/d08191ee0e84ea2ae54fd4c28e8d5076986c281e/tests/test_streaming.py#L762-L789

test_output_validator_partial_stream_output

before: test_agent.py::test_output_validator_partial_stream_output

async def test_output_validator_partial_stream_output():
"""Test that output validators receive correct value for `partial_output` when using stream_output()."""
call_log: list[tuple[Foo, bool]] = []
async def stream_model(messages: list[ModelMessage], info: AgentInfo) -> AsyncIterator[DeltaToolCalls]:
assert info.output_tools is not None
yield {0: DeltaToolCall(name=info.output_tools[0].name, json_args='{"a": 42')}
yield {0: DeltaToolCall(json_args=', "b": "f')}
yield {0: DeltaToolCall(json_args='oo"}')}
agent = Agent(FunctionModel(stream_function=stream_model), output_type=Foo)
@agent.output_validator
def validate_output(ctx: RunContext[None], output: Foo) -> Foo:
call_log.append((output, ctx.partial_output))
return output
async with agent.run_stream('Hello') as result:
outputs = [output async for output in result.stream_output(debounce_by=None)]
assert outputs[-1] == Foo(a=42, b='foo')
assert call_log == snapshot(
[
(Foo(a=42, b='f'), True),
(Foo(a=42, b='foo'), True),
(Foo(a=42, b='foo'), False),
]
)

after: test_streaming.py::TestPartialOutput::test_output_validator_structured
https://github.com/Danipulok/pydantic-ai/blob/d08191ee0e84ea2ae54fd4c28e8d5076986c281e/tests/test_streaming.py#L791-L818

@Danipulok Danipulok changed the title Docs: add docs and tests for RunContext.partial_output in output tools Add docs and tests for RunContext.partial_output in output tools Dec 15, 2025

_(This example is complete, it can be run "as is")_

#### Handling partial output in output functions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this Handling partial output and also drop in output validators from the other section as it's already under "Output validators"


#### Handling partial output in output functions

!!! warning "Output functions are called multiple times during streaming"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer for this to be top-level text rather than the entire docs section to be an indented warning block

#### Handling partial output in output functions

!!! warning "Output functions are called multiple times during streaming"
When using streaming mode (`run_stream()`), output functions are called **multiple times** — once for each partial output received from the model, and once for the final complete output.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should say this in the "Handling partial output in output validators" section as well. Maybe the 2 sections can be merged as output validators and functions are very similar?


**How `partial_output` works:**

- **In sync mode** (`run_sync()`):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned on the other PR, it's just specific to sync mode or run_sync: run (async), iter and run_stream_events have the same behavior. It's really just "run_stream vs the rest"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather this be a paragraph than a list btw


_(This example is complete, it can be run "as is")_

**Example without side effects (transformation only):**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this example as it's the "base case" already covered above


class DatabaseRecord(BaseModel):
name: str
value: int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not a great example because both of these fields are required, meaning the partial data wouldn't validate anyway until the output is complete, so there would not be any partial output in this case.


agent = Agent('openai:gpt-5', output_type=save_to_database)

result = agent.run_sync('Create a record with name "test" and value 42')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not using run_stream here 😄

This example needs some work to actually show the problem + use case


For output functions with **side effects** (e.g., sending notifications, logging, database updates), you should check the [`RunContext.partial_output`][pydantic_ai.tools.RunContext.partial_output] flag to avoid executing side effects on partial data.

**How `partial_output` works:**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No "fake" subheadings please, I'd rather have paragraphs

**Best practices:**

- If your output function **has** side effects (database writes, API calls, notifications) → use `ctx.partial_output` to guard them
- If your output function only **transforms** data (formatting, validation, normalization) → no need to check the flag
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicative with For output functions with **side effects** (e.g., sending notifications, logging, database updates), you should check the [RunContext.partial_output][pydantic_ai.tools.RunContext.partial_output] flag to avoid executing side effects on partial data. above. I'd rather have one authoritative paragraph

]
)

async def test_output_function_structured(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add that test_output_function_text test with TextOutput(func), create a new issue for that bug, and then skip the test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Output functions are called twice or more times in streaming mode

2 participants