diff --git a/pyproject.toml b/pyproject.toml index 48fa239..cdcdfa2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,3 +38,6 @@ strict = true testpaths = ["src"] python_files = ["test_*.py"] asyncio_mode = "auto" +markers = [ + "integration: marks integration tests that require running services", +] diff --git a/src/agents/orchestrator/agent.py b/src/agents/orchestrator/agent.py index 6c48d0a..50512d3 100644 --- a/src/agents/orchestrator/agent.py +++ b/src/agents/orchestrator/agent.py @@ -3,7 +3,8 @@ import asyncio import json -from datetime import datetime +import logging +from datetime import UTC, datetime from pathlib import Path from typing import Any, AsyncIterator @@ -18,6 +19,13 @@ from .workflow_state import WorkflowState +logger = logging.getLogger(__name__) + + +def _utc_iso_timestamp() -> str: + return datetime.now(UTC).isoformat() + + def _load_system_prompt() -> str: prompt_path = Path(__file__).parent / "prompts" / "system.md" if not prompt_path.exists(): @@ -68,7 +76,7 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: "type": "agent_started", "launch_id": launch_id, "agent_id": agent_id, - "timestamp": datetime.utcnow().isoformat(), + "timestamp": _utc_iso_timestamp(), }, ) @@ -98,7 +106,7 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: "launch_id": launch_id, "agent_id": agent_id, "error": state.failure_reason, - "timestamp": datetime.utcnow().isoformat(), + "timestamp": _utc_iso_timestamp(), }, ) break @@ -111,7 +119,7 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: "launch_id": launch_id, "agent_id": agent_id, "output": output, - "timestamp": datetime.utcnow().isoformat(), + "timestamp": _utc_iso_timestamp(), }, ) @@ -134,7 +142,13 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: break state.resume_from_hitl(edits if isinstance(edits, dict) else None) - await self._hitl_store.clear(launch_id) + try: + await self._hitl_store.clear(launch_id) + except Exception: + logger.warning( + "Failed to clear HITL state", + extra={"launch_id": launch_id}, + ) if state.failed: status = "failed" @@ -172,7 +186,7 @@ async def _pause_for_hitl( "checkpoint": checkpoint, "agent_id": state.completed_agents[-1] if state.completed_agents else "unknown", "output_preview": output, - "created_at": datetime.utcnow().isoformat(), + "created_at": _utc_iso_timestamp(), } await self._hitl_store.set_pending(state.launch_id, pending) await self._publish_event( @@ -198,7 +212,14 @@ async def _wait_for_hitl_resolution( return None async def _publish_event(self, launch_id: str, event: dict[str, Any]) -> None: - await self._session_store.publish(f"launch:{launch_id}:events", event) + # Event streaming should not block workflow completion if Redis is unavailable. + try: + await self._session_store.publish(f"launch:{launch_id}:events", event) + except Exception: + logger.warning( + "Failed to publish orchestrator event", + extra={"launch_id": launch_id, "event_type": event.get("type")}, + ) async def stream(self, payload: dict[str, Any]) -> AsyncIterator[str]: result = await self.run(payload)