Conversation
| try: | ||
| done, _ = await asyncio.wait( | ||
| [guard_task, handler_task], | ||
| return_when=asyncio.FIRST_COMPLETED, |
There was a problem hiding this comment.
We can simplify it :)
| return_when=asyncio.FIRST_COMPLETED, | |
| try: | |
| done, _ = await asyncio.wait( | |
| {guard_task, handler_task}, | |
| return_when=asyncio.FIRST_COMPLETED, | |
| ) | |
| if guard_task in done: | |
| await guard_task | |
| return await handler_task | |
| response = await handler_task | |
| await guard_task | |
| return response | |
| finally: | |
| for task in (guard_task, handler_task): | |
| if not task.done(): | |
| task.cancel() | |
| await asyncio.gather(guard_task, handler_task, return_exceptions=True) |
Sorry for the ugly snippet wrote in GitHub but hope it makes sense
There was a problem hiding this comment.
Getting back to this because I was thinking if we are actually using this in a semantically correct way.
We do return_when=asyncio.FIRST_COMPLETED but we don't use it in a way warranting it. We always have to wait for the sibling task to be over, aren't we better off doing a group in that case?
Might lose the exceptions but we can unwrap it I think, worth checking for sure I'd love for it to be as readable as possible.
There was a problem hiding this comment.
Kept wait(FIRST_COMPLETED) though. asyncio.gather(return_exceptions=True) waits for all.
TaskGroup would be ideal but it's 3.11+. So wait(FIRST_COMPLETED) is the simplest 3.10-compatible form that still gives us fail-fast. The "wait for sibling" only kicks in on the success path.
There was a problem hiding this comment.
What about the anyio equivalent? Is that also 3.11+?
There was a problem hiding this comment.
Even in cancel paths we would need to cancel the sibling?
If guard fails then handler needs to stop
If handler fails guard has nothing to do
There was a problem hiding this comment.
CC told me that:
Tradeoff is it wraps exceptions in
ExceptionGroup, so aSkipModelRequestfrom the guard comes out asExceptionGroup([SkipModelRequest])which_agent_graph.py's bare exceptSkipModelRequestwon't catch. We'd have to manually unwrap before re-raising (no except* on 3.10 either). Net more code than the current wait + finally.
We would have to check it out
There was a problem hiding this comment.
Yeah that makes sense, except* would have been grand, the snippet could have collapsed to a fraction.
We should check if we can reasonably unwrap correctly otherwise this is fine with me
| async def run_handler() -> ModelResponse: | ||
| return await handler(request_context) | ||
|
|
||
| guard_task: asyncio.Task[None] = asyncio.create_task(self._run_guard(prompt)) |
There was a problem hiding this comment.
I am on the fence thinking if we should call it PromptGuard because we don't run this for subsequent model requests and only for the first user prompt step, what do you think?
There was a problem hiding this comment.
To me it looks clear, considering that we also have OutputBlocked, and it's similar to what exists in the OpenAI Agents SDK https://github.com/openai/openai-agents-python/blob/9a207b6938699d87d2d17dd67dd628ca3af0232d/src/agents/guardrail.py#L72
Summary
Adds two guardrail capabilities with a minimal, callable-based API:
InputGuard(guard, parallel=False, block_message=...)— runs before each model request. A guard that returnsFalsetriggers a graceful refusal viaSkipModelRequest(a cannedModelResponsebecomes the step output); a guard that raises propagates as a hard failure.OutputGuard(guard, block_message=...)— runs inafter_runagainst the final agent output. A guard that returnsFalseraisesOutputBlocked.Both accept sync or async callables.
parallel=TrueonInputGuardraces the guard against the model call viawrap_model_requestand cancels the handler as soon as the guard trips, saving tokens when the guard is slower than the provider round-trip.Design notes:
parallelis a flag onInputGuardrather than a separateAsyncGuardrailwrapper — keeps the API surface small and matches @Josh-blythe's feedback on Input/Output Guardrails capability #28 (fast guards don't need the cancellation wiring; let the capability declare whether it needs parallel).Out of scope for this PR (tracked as follow-ups):
before_tool_call/after_tool_callon AbstractCapability #19).transform/warnactions — MVP is halt-only.after_run+wrap_run_event_streamare the follow-up hooks.pydantic-ai-shieldson top of these primitives.Files added:
Top-level
pydantic_ai_harness/__init__.pyupdated to exportInputGuard,OutputGuard,GuardrailFunc,GuardrailError,InputBlocked,OutputBlocked.Linked Issue
Fixes #28
Checklist
make lint && make typecheck && make testpasses locally (don't stress about CI -- we'll help)pyproject.tomloruv.lock(dependency changes require a separate issue)