From a066be779d8c0cf105402b18e0c03397358156ec Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Thu, 2 Apr 2026 05:19:17 +0000 Subject: [PATCH 1/3] Add VerificationLoop capability for automated verify-fix-retry loops Implements a capability that runs configurable verification checks (e.g. lint, test, build) after agent completion and automatically retries with failure feedback if any check fails, up to a configurable maximum number of retries. Closes #79 Co-Authored-By: Claude Opus 4.6 (1M context) --- PLAN.md | 46 ++++ src/pydantic_harness/__init__.py | 8 +- src/pydantic_harness/verification_loop.py | 185 +++++++++++++++ tests/test_verification_loop.py | 277 ++++++++++++++++++++++ 4 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 PLAN.md create mode 100644 src/pydantic_harness/verification_loop.py create mode 100644 tests/test_verification_loop.py diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 0000000..b8756b6 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,46 @@ +# VerificationLoop Capability + +## Problem + +Coding agents that make changes and hope they work are unreliable. The most +successful coding agents (Aider, Spotify's agent fleet, etc.) converge on +correctness by running an automated **verify-fix-repeat** loop after changes. +Without this, verification only happens when the agent remembers to check. + +## Design + +A `VerificationLoop` capability that uses the `wrap_run` hook to: + +1. Run the agent normally via `handler()` +2. Execute a list of `Verifier` checks (e.g. lint, test, build) +3. If any verifier fails, re-run the agent with failure feedback appended to + the conversation, so the model can fix the issues +4. Repeat until all verifiers pass or `max_retries` is exhausted + +### Key types + +- **`VerificationResult(passed: bool, message: str)`** -- outcome of a single + check +- **`Verifier(name: str, check_fn: async () -> VerificationResult)`** -- a + named check +- **`VerificationLoop(verifiers, max_retries=3)`** -- the capability + +### Retry mechanics + +Retries call `ctx.agent.run()` with the previous run's `message_history` plus +a feedback prompt containing the verifier names and failure messages. An +`_in_retry` flag prevents recursive verification when the retry run triggers +`wrap_run` again on the same capability instance. + +If all retries are exhausted, the last result is returned and a +`UserWarning` is emitted. + +## Files + +- `src/pydantic_harness/verification_loop.py` -- capability implementation +- `src/pydantic_harness/__init__.py` -- public exports +- `tests/test_verification_loop.py` -- 15 tests, 100% coverage + +## References + +- pydantic-harness #79 diff --git a/src/pydantic_harness/__init__.py b/src/pydantic_harness/__init__.py index 9d728b6..adf8074 100644 --- a/src/pydantic_harness/__init__.py +++ b/src/pydantic_harness/__init__.py @@ -7,4 +7,10 @@ # Each capability module is imported and re-exported here. # Capabilities are listed alphabetically. -__all__: list[str] = [] +from pydantic_harness.verification_loop import VerificationLoop, VerificationResult, Verifier + +__all__: list[str] = [ + 'VerificationLoop', + 'VerificationResult', + 'Verifier', +] diff --git a/src/pydantic_harness/verification_loop.py b/src/pydantic_harness/verification_loop.py new file mode 100644 index 0000000..e0e3260 --- /dev/null +++ b/src/pydantic_harness/verification_loop.py @@ -0,0 +1,185 @@ +"""Verification loop capability for PydanticAI agents. + +Runs configurable verification checks after the agent completes and retries +with failure feedback if any check fails, up to a configurable maximum number +of retries. + +Example:: + + from pydantic_ai import Agent + from pydantic_harness import VerificationLoop, Verifier, VerificationResult + + async def check_lint() -> VerificationResult: + # Run linting, return pass/fail + return VerificationResult(passed=True, message='No lint errors.') + + agent = Agent( + 'openai:gpt-4o', + capabilities=[ + VerificationLoop( + verifiers=[Verifier(name='lint', check_fn=check_lint)], + max_retries=3, + ), + ], + ) +""" + +from __future__ import annotations + +import logging +import warnings +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from typing import Any + +from pydantic_ai.capabilities.abstract import AbstractCapability, WrapRunHandler +from pydantic_ai.run import AgentRunResult +from pydantic_ai.tools import RunContext + +logger = logging.getLogger(__name__) + + +@dataclass +class VerificationResult: + """The outcome of a single verification check. + + Attributes: + passed: Whether the check passed. + message: A human-readable description of the outcome. + """ + + passed: bool + message: str + + +@dataclass +class Verifier: + """A named verification check to run after agent completion. + + Attributes: + name: A short identifier for this verifier (e.g. ``'lint'``, ``'test'``). + check_fn: An async callable that returns a :class:`VerificationResult`. + """ + + name: str + check_fn: Callable[[], Awaitable[VerificationResult]] + + +@dataclass +class VerificationLoop(AbstractCapability[Any]): + """Runs verification checks after agent completion and retries on failure. + + After the agent produces a result, each :class:`Verifier` is run in order. + If any verifier fails, the agent is re-run with the failure messages + appended as context so the model can fix the issues. This repeats up to + ``max_retries`` times. If all retries are exhausted the last result is + returned and a warning is emitted. + + Example:: + + from pydantic_ai import Agent + from pydantic_harness import VerificationLoop, Verifier, VerificationResult + + async def check_tests() -> VerificationResult: + ... + + agent = Agent( + 'openai:gpt-4o', + capabilities=[ + VerificationLoop( + verifiers=[Verifier(name='tests', check_fn=check_tests)], + ), + ], + ) + """ + + verifiers: list[Verifier] = field(default_factory=lambda: list[Verifier]()) + """Verifiers to run after each agent completion.""" + + max_retries: int = 3 + """Maximum number of retry attempts when verification fails.""" + + # --- Per-run state --- + + _in_retry: bool = field(default=False, repr=False) + """When ``True``, :meth:`wrap_run` skips verification (retry pass-through).""" + + async def wrap_run( + self, + ctx: RunContext[Any], + *, + handler: WrapRunHandler, + ) -> AgentRunResult[Any]: + """Run the agent, then verify. Retry with feedback on failure. + + When the agent is re-run for a retry, this hook fires again on + the new run. The ``_in_retry`` flag prevents recursive verification: + retry runs pass straight through to the handler. + """ + result = await handler() + + # Retry runs skip verification to avoid infinite recursion. + if self._in_retry: + return result + + agent = ctx.agent + + for attempt in range(1, self.max_retries + 1): + failures = await self._run_verifiers() + if not failures: + return result + + failure_summary = '; '.join(f'{name}: {msg}' for name, msg in failures) + feedback = self._build_feedback(failures, attempt) + logger.info( + 'Verification failed (attempt %d/%d): %s', + attempt, + self.max_retries, + failure_summary, + ) + + if agent is None: # pragma: no cover — defensive; agent is always set in practice + warnings.warn( + 'Verification failed but agent is not available on RunContext for retry. Returning last result.', + stacklevel=2, + ) + return result + + # Mark that the next run is a retry so wrap_run passes through. + self._in_retry = True + try: + result = await agent.run( + feedback, + message_history=result.all_messages(), + ) + finally: + self._in_retry = False + + # Final verification after last retry. + failures = await self._run_verifiers() + if not failures: + return result + + warnings.warn( + f'Verification still failing after {self.max_retries} retries: ' + + '; '.join(f'{name}: {msg}' for name, msg in failures), + stacklevel=2, + ) + return result + + async def _run_verifiers(self) -> list[tuple[str, str]]: + """Run all verifiers and return a list of ``(name, message)`` for failures.""" + failures: list[tuple[str, str]] = [] + for verifier in self.verifiers: + vr = await verifier.check_fn() + if not vr.passed: + failures.append((verifier.name, vr.message)) + return failures + + @staticmethod + def _build_feedback(failures: list[tuple[str, str]], attempt: int) -> str: + """Build a feedback prompt from verification failures.""" + parts = [f'Verification failed (attempt {attempt}). Please fix the issues:'] + for name, message in failures: + parts.append(f'- {name}: {message}') + return '\n'.join(parts) diff --git a/tests/test_verification_loop.py b/tests/test_verification_loop.py new file mode 100644 index 0000000..dbd1ea6 --- /dev/null +++ b/tests/test_verification_loop.py @@ -0,0 +1,277 @@ +"""Tests for the VerificationLoop capability.""" +# pyright: reportPrivateUsage=false + +from __future__ import annotations + +import warnings + +import pytest +from pydantic_ai import Agent +from pydantic_ai.models.test import TestModel + +from pydantic_harness.verification_loop import ( + VerificationLoop, + VerificationResult, + Verifier, +) + +# --------------------------------------------------------------------------- +# Unit tests for helpers +# --------------------------------------------------------------------------- + + +class TestBuildFeedback: + def test_single_failure(self): + feedback = VerificationLoop._build_feedback([('lint', 'unused import on line 5')], attempt=1) + assert 'attempt 1' in feedback + assert '- lint: unused import on line 5' in feedback + + def test_multiple_failures(self): + failures = [('lint', 'error A'), ('test', 'error B')] + feedback = VerificationLoop._build_feedback(failures, attempt=2) + assert 'attempt 2' in feedback + assert '- lint: error A' in feedback + assert '- test: error B' in feedback + + +class TestRunVerifiers: + @pytest.mark.anyio() + async def test_all_pass(self): + cap = VerificationLoop( + verifiers=[ + Verifier(name='lint', check_fn=_pass_verifier), + Verifier(name='test', check_fn=_pass_verifier), + ], + ) + failures = await cap._run_verifiers() + assert failures == [] + + @pytest.mark.anyio() + async def test_one_fails(self): + cap = VerificationLoop( + verifiers=[ + Verifier(name='lint', check_fn=_pass_verifier), + Verifier(name='test', check_fn=_fail_verifier('2 tests failed')), + ], + ) + failures = await cap._run_verifiers() + assert len(failures) == 1 + assert failures[0] == ('test', '2 tests failed') + + @pytest.mark.anyio() + async def test_all_fail(self): + cap = VerificationLoop( + verifiers=[ + Verifier(name='lint', check_fn=_fail_verifier('lint error')), + Verifier(name='test', check_fn=_fail_verifier('test error')), + ], + ) + failures = await cap._run_verifiers() + assert len(failures) == 2 + + @pytest.mark.anyio() + async def test_empty_verifiers(self): + cap = VerificationLoop(verifiers=[]) + failures = await cap._run_verifiers() + assert failures == [] + + +# --------------------------------------------------------------------------- +# Defaults +# --------------------------------------------------------------------------- + + +def test_defaults(): + cap = VerificationLoop() + assert cap.verifiers == [] + assert cap.max_retries == 3 + + +# --------------------------------------------------------------------------- +# Integration tests with a real agent +# --------------------------------------------------------------------------- + + +@pytest.mark.anyio() +async def test_all_pass_no_retry(): + """When all verifiers pass on the first run, the result is returned without retries.""" + call_count = 0 + + async def always_pass() -> VerificationResult: + nonlocal call_count + call_count += 1 + return VerificationResult(passed=True, message='OK') + + cap = VerificationLoop( + verifiers=[Verifier(name='check', check_fn=always_pass)], + max_retries=3, + ) + agent = Agent(TestModel(), output_type=str, capabilities=[cap]) + result = await agent.run('Do something') + assert isinstance(result.output, str) + # Verifiers called once (after the initial run). + assert call_count == 1 + + +@pytest.mark.anyio() +async def test_retry_on_failure_then_pass(): + """When verification fails once, the agent retries and succeeds.""" + attempts = 0 + + async def pass_on_second() -> VerificationResult: + nonlocal attempts + attempts += 1 + if attempts <= 1: + return VerificationResult(passed=False, message='lint error on line 5') + return VerificationResult(passed=True, message='OK') + + cap = VerificationLoop( + verifiers=[Verifier(name='lint', check_fn=pass_on_second)], + max_retries=3, + ) + agent = Agent(TestModel(), output_type=str, capabilities=[cap]) + result = await agent.run('Fix the code') + assert isinstance(result.output, str) + # Verifier called twice: once after initial run (fail), once after retry (pass). + assert attempts == 2 + + +@pytest.mark.anyio() +async def test_max_retries_exceeded(): + """When verification keeps failing, a warning is emitted and last result is returned.""" + call_count = 0 + + async def always_fail() -> VerificationResult: + nonlocal call_count + call_count += 1 + return VerificationResult(passed=False, message='still broken') + + cap = VerificationLoop( + verifiers=[Verifier(name='test', check_fn=always_fail)], + max_retries=2, + ) + agent = Agent(TestModel(), output_type=str, capabilities=[cap]) + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + result = await agent.run('Fix the code') + + assert isinstance(result.output, str) + # 2 (in-loop checks, one per retry attempt) + 1 (final check after loop) = 3 + assert call_count == 3 + assert len(w) == 1 + assert 'after 2 retries' in str(w[0].message) + assert 'test: still broken' in str(w[0].message) + + +@pytest.mark.anyio() +async def test_multiple_verifiers_partial_failure(): + """Only failing verifiers appear in the retry feedback.""" + lint_calls = 0 + test_calls = 0 + + async def lint_check() -> VerificationResult: + nonlocal lint_calls + lint_calls += 1 + return VerificationResult(passed=True, message='OK') + + async def test_check() -> VerificationResult: + nonlocal test_calls + test_calls += 1 + if test_calls <= 1: + return VerificationResult(passed=False, message='1 test failed') + return VerificationResult(passed=True, message='OK') + + cap = VerificationLoop( + verifiers=[ + Verifier(name='lint', check_fn=lint_check), + Verifier(name='test', check_fn=test_check), + ], + max_retries=3, + ) + agent = Agent(TestModel(), output_type=str, capabilities=[cap]) + result = await agent.run('Fix things') + assert isinstance(result.output, str) + # lint called twice (initial + after retry), test called twice (initial fail + retry pass). + assert lint_calls == 2 + assert test_calls == 2 + + +@pytest.mark.anyio() +async def test_no_verifiers_passthrough(): + """With no verifiers configured, the run proceeds without any verification.""" + cap = VerificationLoop(verifiers=[], max_retries=3) + agent = Agent(TestModel(), output_type=str, capabilities=[cap]) + result = await agent.run('Hello') + assert isinstance(result.output, str) + + +@pytest.mark.anyio() +async def test_feedback_message_contains_verifier_info(): + """Verify that the feedback message sent on retry contains the verifier name and error.""" + check_calls = 0 + + async def fail_once() -> VerificationResult: + nonlocal check_calls + check_calls += 1 + if check_calls <= 1: + return VerificationResult(passed=False, message='type error on line 10') + return VerificationResult(passed=True, message='OK') + + cap = VerificationLoop( + verifiers=[Verifier(name='typecheck', check_fn=fail_once)], + max_retries=3, + ) + agent = Agent(TestModel(), output_type=str, capabilities=[cap]) + + result = await agent.run('Fix the code') + # The retry run produces a new message history that includes the feedback prompt. + # Serialize to JSON and check the feedback string is present. + history_json = result.all_messages_json().decode() + assert 'typecheck' in history_json + assert 'type error on line 10' in history_json + + +@pytest.mark.anyio() +async def test_passes_on_final_check_after_loop(): + """When verification fails during retries but passes on the final check, no warning is emitted.""" + check_calls = 0 + + async def pass_on_third() -> VerificationResult: + nonlocal check_calls + check_calls += 1 + # Fail on calls 1 and 2 (in-loop), pass on call 3 (final check after loop). + if check_calls < 3: + return VerificationResult(passed=False, message='still failing') + return VerificationResult(passed=True, message='OK') + + cap = VerificationLoop( + verifiers=[Verifier(name='build', check_fn=pass_on_third)], + max_retries=2, + ) + agent = Agent(TestModel(), output_type=str, capabilities=[cap]) + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + result = await agent.run('Fix the build') + + assert isinstance(result.output, str) + assert check_calls == 3 + # No warning should have been emitted since the final check passed. + assert len(w) == 0 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _pass_verifier() -> VerificationResult: + return VerificationResult(passed=True, message='OK') + + +def _fail_verifier(message: str): + async def _check() -> VerificationResult: + return VerificationResult(passed=False, message=message) + + return _check From 5a41785a000c22c2b4abd2e8beb883681788d6cf Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Thu, 2 Apr 2026 05:36:08 +0000 Subject: [PATCH 2/3] Fix trio compatibility: restrict async tests to asyncio backend --- tests/test_verification_loop.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_verification_loop.py b/tests/test_verification_loop.py index dbd1ea6..4b16dcf 100644 --- a/tests/test_verification_loop.py +++ b/tests/test_verification_loop.py @@ -15,6 +15,12 @@ Verifier, ) + +@pytest.fixture(params=['asyncio']) +def anyio_backend(request: pytest.FixtureRequest) -> str: + return request.param # type: ignore[no-any-return] + + # --------------------------------------------------------------------------- # Unit tests for helpers # --------------------------------------------------------------------------- From 41cf0457c5becc76e6fa6c985a119870576dc0a1 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Thu, 2 Apr 2026 05:54:46 +0000 Subject: [PATCH 3/3] Run verifiers in parallel and improve retry prompt - Add `parallel: bool = True` parameter to run verifiers concurrently via `asyncio.gather` (falls back to sequential for single verifier) - Improve retry feedback prompt to explicitly say "ONLY fix the failing checks, do not make other changes" Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pydantic_harness/verification_loop.py | 14 ++- tests/test_verification_loop.py | 102 ++++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/src/pydantic_harness/verification_loop.py b/src/pydantic_harness/verification_loop.py index e0e3260..2c9b19e 100644 --- a/src/pydantic_harness/verification_loop.py +++ b/src/pydantic_harness/verification_loop.py @@ -26,6 +26,7 @@ async def check_lint() -> VerificationResult: from __future__ import annotations +import asyncio import logging import warnings from collections.abc import Awaitable, Callable @@ -99,6 +100,13 @@ async def check_tests() -> VerificationResult: max_retries: int = 3 """Maximum number of retry attempts when verification fails.""" + parallel: bool = True + """Whether to run verifiers in parallel via ``asyncio.gather``. + + When ``True`` (the default), all verifiers execute concurrently. + Set to ``False`` to run them sequentially in list order. + """ + # --- Per-run state --- _in_retry: bool = field(default=False, repr=False) @@ -169,6 +177,10 @@ async def wrap_run( async def _run_verifiers(self) -> list[tuple[str, str]]: """Run all verifiers and return a list of ``(name, message)`` for failures.""" + if self.parallel and len(self.verifiers) > 1: + results = await asyncio.gather(*(v.check_fn() for v in self.verifiers)) + return [(verifier.name, vr.message) for verifier, vr in zip(self.verifiers, results) if not vr.passed] + failures: list[tuple[str, str]] = [] for verifier in self.verifiers: vr = await verifier.check_fn() @@ -179,7 +191,7 @@ async def _run_verifiers(self) -> list[tuple[str, str]]: @staticmethod def _build_feedback(failures: list[tuple[str, str]], attempt: int) -> str: """Build a feedback prompt from verification failures.""" - parts = [f'Verification failed (attempt {attempt}). Please fix the issues:'] + parts = [f'Verification failed (attempt {attempt}). ONLY fix the failing checks, do not make other changes.'] for name, message in failures: parts.append(f'- {name}: {message}') return '\n'.join(parts) diff --git a/tests/test_verification_loop.py b/tests/test_verification_loop.py index 4b16dcf..8682e92 100644 --- a/tests/test_verification_loop.py +++ b/tests/test_verification_loop.py @@ -3,6 +3,7 @@ from __future__ import annotations +import asyncio import warnings import pytest @@ -31,6 +32,7 @@ def test_single_failure(self): feedback = VerificationLoop._build_feedback([('lint', 'unused import on line 5')], attempt=1) assert 'attempt 1' in feedback assert '- lint: unused import on line 5' in feedback + assert 'ONLY fix the failing checks' in feedback def test_multiple_failures(self): failures = [('lint', 'error A'), ('test', 'error B')] @@ -39,6 +41,10 @@ def test_multiple_failures(self): assert '- lint: error A' in feedback assert '- test: error B' in feedback + def test_does_not_encourage_other_changes(self): + feedback = VerificationLoop._build_feedback([('test', 'fail')], attempt=1) + assert 'do not make other changes' in feedback + class TestRunVerifiers: @pytest.mark.anyio() @@ -267,6 +273,102 @@ async def pass_on_third() -> VerificationResult: assert len(w) == 0 +# --------------------------------------------------------------------------- +# Parallel execution +# --------------------------------------------------------------------------- + + +class TestParallelVerifiers: + @pytest.mark.anyio() + async def test_parallel_default(self): + cap = VerificationLoop() + assert cap.parallel is True + + @pytest.mark.anyio() + async def test_parallel_runs_concurrently(self): + """Verify that parallel execution actually runs checks concurrently.""" + execution_log: list[str] = [] + + async def slow_check_a() -> VerificationResult: + execution_log.append('a_start') + await asyncio.sleep(0.01) + execution_log.append('a_end') + return VerificationResult(passed=True, message='OK') + + async def slow_check_b() -> VerificationResult: + execution_log.append('b_start') + await asyncio.sleep(0.01) + execution_log.append('b_end') + return VerificationResult(passed=True, message='OK') + + cap = VerificationLoop( + verifiers=[ + Verifier(name='a', check_fn=slow_check_a), + Verifier(name='b', check_fn=slow_check_b), + ], + parallel=True, + ) + failures = await cap._run_verifiers() + assert failures == [] + # Both should start before either finishes. + assert execution_log[:2] == ['a_start', 'b_start'] + + @pytest.mark.anyio() + async def test_sequential_runs_in_order(self): + """When parallel=False, verifiers run one at a time.""" + execution_log: list[str] = [] + + async def check_a() -> VerificationResult: + execution_log.append('a_start') + await asyncio.sleep(0.01) + execution_log.append('a_end') + return VerificationResult(passed=True, message='OK') + + async def check_b() -> VerificationResult: + execution_log.append('b_start') + await asyncio.sleep(0.01) + execution_log.append('b_end') + return VerificationResult(passed=True, message='OK') + + cap = VerificationLoop( + verifiers=[ + Verifier(name='a', check_fn=check_a), + Verifier(name='b', check_fn=check_b), + ], + parallel=False, + ) + failures = await cap._run_verifiers() + assert failures == [] + # Sequential: a finishes before b starts. + assert execution_log == ['a_start', 'a_end', 'b_start', 'b_end'] + + @pytest.mark.anyio() + async def test_parallel_collects_failures(self): + """Parallel mode still correctly collects failures from all verifiers.""" + cap = VerificationLoop( + verifiers=[ + Verifier(name='lint', check_fn=_fail_verifier('lint error')), + Verifier(name='test', check_fn=_pass_verifier), + Verifier(name='build', check_fn=_fail_verifier('build error')), + ], + parallel=True, + ) + failures = await cap._run_verifiers() + assert len(failures) == 2 + assert failures[0] == ('lint', 'lint error') + assert failures[1] == ('build', 'build error') + + @pytest.mark.anyio() + async def test_single_verifier_skips_gather(self): + """With only one verifier, gather is not used even in parallel mode.""" + cap = VerificationLoop( + verifiers=[Verifier(name='lint', check_fn=_pass_verifier)], + parallel=True, + ) + failures = await cap._run_verifiers() + assert failures == [] + + # --------------------------------------------------------------------------- # Helpers # ---------------------------------------------------------------------------