Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions PLAN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# VerificationLoop Capability

## Problem

Coding agents that make changes and hope they work are unreliable. The most
successful coding agents (Aider, Spotify's agent fleet, etc.) converge on
correctness by running an automated **verify-fix-repeat** loop after changes.
Without this, verification only happens when the agent remembers to check.

## Design

A `VerificationLoop` capability that uses the `wrap_run` hook to:

1. Run the agent normally via `handler()`
2. Execute a list of `Verifier` checks (e.g. lint, test, build)
3. If any verifier fails, re-run the agent with failure feedback appended to
the conversation, so the model can fix the issues
4. Repeat until all verifiers pass or `max_retries` is exhausted

### Key types

- **`VerificationResult(passed: bool, message: str)`** -- outcome of a single
check
- **`Verifier(name: str, check_fn: async () -> VerificationResult)`** -- a
named check
- **`VerificationLoop(verifiers, max_retries=3)`** -- the capability

### Retry mechanics

Retries call `ctx.agent.run()` with the previous run's `message_history` plus
a feedback prompt containing the verifier names and failure messages. An
`_in_retry` flag prevents recursive verification when the retry run triggers
`wrap_run` again on the same capability instance.

If all retries are exhausted, the last result is returned and a
`UserWarning` is emitted.

## Files

- `src/pydantic_harness/verification_loop.py` -- capability implementation
- `src/pydantic_harness/__init__.py` -- public exports
- `tests/test_verification_loop.py` -- 15 tests, 100% coverage

## References

- pydantic-harness #79
8 changes: 7 additions & 1 deletion src/pydantic_harness/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,10 @@
# Each capability module is imported and re-exported here.
# Capabilities are listed alphabetically.

__all__: list[str] = []
from pydantic_harness.verification_loop import VerificationLoop, VerificationResult, Verifier

__all__: list[str] = [
'VerificationLoop',
'VerificationResult',
'Verifier',
]
197 changes: 197 additions & 0 deletions src/pydantic_harness/verification_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""Verification loop capability for PydanticAI agents.

Runs configurable verification checks after the agent completes and retries
with failure feedback if any check fails, up to a configurable maximum number
of retries.

Example::

from pydantic_ai import Agent
from pydantic_harness import VerificationLoop, Verifier, VerificationResult

async def check_lint() -> VerificationResult:
# Run linting, return pass/fail
return VerificationResult(passed=True, message='No lint errors.')

agent = Agent(
'openai:gpt-4o',
capabilities=[
VerificationLoop(
verifiers=[Verifier(name='lint', check_fn=check_lint)],
max_retries=3,
),
],
)
"""

from __future__ import annotations

import asyncio
import logging
import warnings
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any

from pydantic_ai.capabilities.abstract import AbstractCapability, WrapRunHandler
from pydantic_ai.run import AgentRunResult
from pydantic_ai.tools import RunContext

logger = logging.getLogger(__name__)


@dataclass
class VerificationResult:
"""The outcome of a single verification check.

Attributes:
passed: Whether the check passed.
message: A human-readable description of the outcome.
"""

passed: bool
message: str


@dataclass
class Verifier:
"""A named verification check to run after agent completion.

Attributes:
name: A short identifier for this verifier (e.g. ``'lint'``, ``'test'``).
check_fn: An async callable that returns a :class:`VerificationResult`.
"""

name: str
check_fn: Callable[[], Awaitable[VerificationResult]]


@dataclass
class VerificationLoop(AbstractCapability[Any]):
"""Runs verification checks after agent completion and retries on failure.

After the agent produces a result, each :class:`Verifier` is run in order.
If any verifier fails, the agent is re-run with the failure messages
appended as context so the model can fix the issues. This repeats up to
``max_retries`` times. If all retries are exhausted the last result is
returned and a warning is emitted.

Example::

from pydantic_ai import Agent
from pydantic_harness import VerificationLoop, Verifier, VerificationResult

async def check_tests() -> VerificationResult:
...

agent = Agent(
'openai:gpt-4o',
capabilities=[
VerificationLoop(
verifiers=[Verifier(name='tests', check_fn=check_tests)],
),
],
)
"""

verifiers: list[Verifier] = field(default_factory=lambda: list[Verifier]())
"""Verifiers to run after each agent completion."""

max_retries: int = 3
"""Maximum number of retry attempts when verification fails."""

parallel: bool = True
"""Whether to run verifiers in parallel via ``asyncio.gather``.

When ``True`` (the default), all verifiers execute concurrently.
Set to ``False`` to run them sequentially in list order.
"""

# --- Per-run state ---

_in_retry: bool = field(default=False, repr=False)
"""When ``True``, :meth:`wrap_run` skips verification (retry pass-through)."""

async def wrap_run(
self,
ctx: RunContext[Any],
*,
handler: WrapRunHandler,
) -> AgentRunResult[Any]:
"""Run the agent, then verify. Retry with feedback on failure.

When the agent is re-run for a retry, this hook fires again on
the new run. The ``_in_retry`` flag prevents recursive verification:
retry runs pass straight through to the handler.
"""
result = await handler()

# Retry runs skip verification to avoid infinite recursion.
if self._in_retry:
return result

agent = ctx.agent

for attempt in range(1, self.max_retries + 1):
failures = await self._run_verifiers()
if not failures:
return result

failure_summary = '; '.join(f'{name}: {msg}' for name, msg in failures)
feedback = self._build_feedback(failures, attempt)
logger.info(
'Verification failed (attempt %d/%d): %s',
attempt,
self.max_retries,
failure_summary,
)

if agent is None: # pragma: no cover — defensive; agent is always set in practice
warnings.warn(
'Verification failed but agent is not available on RunContext for retry. Returning last result.',
stacklevel=2,
)
return result

# Mark that the next run is a retry so wrap_run passes through.
self._in_retry = True
try:
result = await agent.run(
feedback,
message_history=result.all_messages(),
)
finally:
self._in_retry = False
Comment on lines +157 to +164
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Instance-level _in_retry flag causes verification to be silently skipped during concurrent agent runs

The _in_retry boolean is mutable shared state on the VerificationLoop instance. When the agent retries, it sets self._in_retry = True (line 157) and then await agent.run(...) which yields control. If another concurrent agent.run() call enters wrap_run on the same agent (and thus the same capability instance) while the first run is retrying, it will see _in_retry = True at line 130 and skip all verification, returning the result unchecked. After the first run finishes its retry, _in_retry is reset to False (line 164), but the damage is done — the second run silently bypassed verification entirely. Since PydanticAI agents are designed to be reusable across concurrent calls, this is a realistic scenario. A per-call token (e.g., using contextvars or checking the RunContext identity) would avoid this.

Prompt for agents
The _in_retry flag at line 112 is instance-level mutable state that is shared across all concurrent wrap_run invocations on the same VerificationLoop instance. When one run sets _in_retry = True and awaits agent.run() (yielding control), another concurrent run entering wrap_run will see _in_retry as True and skip verification entirely.

To fix this, use a per-call mechanism instead of a shared boolean. Options include:
1. Use a contextvars.ContextVar to track whether the current execution context is a retry, so each async task has its own value.
2. Pass a unique run identifier through the RunContext and track which run IDs are retries in a set.
3. Use an asyncio.Lock or counter (e.g., an integer tracking nested retry depth per task) instead of a plain boolean.

The fix needs to ensure that (a) retry runs on the same call chain still skip verification (to prevent infinite recursion), while (b) unrelated concurrent agent.run() calls on the same agent are not affected.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


# Final verification after last retry.
failures = await self._run_verifiers()
if not failures:
return result

warnings.warn(
f'Verification still failing after {self.max_retries} retries: '
+ '; '.join(f'{name}: {msg}' for name, msg in failures),
stacklevel=2,
)
return result
Comment on lines +135 to +176
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Verification still runs even with max_retries=0

With max_retries=0, the for loop at line 135 is range(1, 1) which is empty, so no retries happen. However, the final verification block at lines 167-176 still executes, running verifiers and potentially emitting a warning like 'Verification still failing after 0 retries'. This means max_retries=0 does not disable verification — it disables retries but still verifies once. This may be the intended behavior (verify but don't retry), but it's worth documenting since a user might expect max_retries=0 to skip verification entirely.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async def _run_verifiers(self) -> list[tuple[str, str]]:
"""Run all verifiers and return a list of ``(name, message)`` for failures."""
if self.parallel and len(self.verifiers) > 1:
results = await asyncio.gather(*(v.check_fn() for v in self.verifiers))
return [(verifier.name, vr.message) for verifier, vr in zip(self.verifiers, results) if not vr.passed]

failures: list[tuple[str, str]] = []
for verifier in self.verifiers:
vr = await verifier.check_fn()
if not vr.passed:
failures.append((verifier.name, vr.message))
return failures

@staticmethod
def _build_feedback(failures: list[tuple[str, str]], attempt: int) -> str:
"""Build a feedback prompt from verification failures."""
parts = [f'Verification failed (attempt {attempt}). ONLY fix the failing checks, do not make other changes.']
for name, message in failures:
parts.append(f'- {name}: {message}')
return '\n'.join(parts)
Loading
Loading