-
Notifications
You must be signed in to change notification settings - Fork 0
Fix orchestrator Redis resilience and warning cleanup #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,8 @@ | |
|
|
||
| import asyncio | ||
| import json | ||
| from datetime import datetime | ||
| import logging | ||
| from datetime import UTC, datetime | ||
| from pathlib import Path | ||
| from typing import Any, AsyncIterator | ||
|
|
||
|
|
@@ -18,6 +19,13 @@ | |
| from .workflow_state import WorkflowState | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _utc_iso_timestamp() -> str: | ||
| return datetime.now(UTC).isoformat() | ||
|
|
||
|
|
||
| def _load_system_prompt() -> str: | ||
| prompt_path = Path(__file__).parent / "prompts" / "system.md" | ||
| if not prompt_path.exists(): | ||
|
|
@@ -68,7 +76,7 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: | |
| "type": "agent_started", | ||
| "launch_id": launch_id, | ||
| "agent_id": agent_id, | ||
| "timestamp": datetime.utcnow().isoformat(), | ||
| "timestamp": _utc_iso_timestamp(), | ||
| }, | ||
| ) | ||
|
|
||
|
|
@@ -98,7 +106,7 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: | |
| "launch_id": launch_id, | ||
| "agent_id": agent_id, | ||
| "error": state.failure_reason, | ||
| "timestamp": datetime.utcnow().isoformat(), | ||
| "timestamp": _utc_iso_timestamp(), | ||
| }, | ||
| ) | ||
| break | ||
|
|
@@ -111,7 +119,7 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: | |
| "launch_id": launch_id, | ||
| "agent_id": agent_id, | ||
| "output": output, | ||
| "timestamp": datetime.utcnow().isoformat(), | ||
| "timestamp": _utc_iso_timestamp(), | ||
| }, | ||
| ) | ||
|
|
||
|
|
@@ -134,7 +142,13 @@ async def run(self, payload: dict[str, Any]) -> AgentResult: | |
| break | ||
|
|
||
| state.resume_from_hitl(edits if isinstance(edits, dict) else None) | ||
| await self._hitl_store.clear(launch_id) | ||
| try: | ||
| await self._hitl_store.clear(launch_id) | ||
| except Exception: | ||
| logger.warning( | ||
| "Failed to clear HITL state", | ||
| extra={"launch_id": launch_id}, | ||
| ) | ||
|
Comment on lines
+145
to
+151
|
||
|
|
||
| if state.failed: | ||
| status = "failed" | ||
|
|
@@ -172,7 +186,7 @@ async def _pause_for_hitl( | |
| "checkpoint": checkpoint, | ||
| "agent_id": state.completed_agents[-1] if state.completed_agents else "unknown", | ||
| "output_preview": output, | ||
| "created_at": datetime.utcnow().isoformat(), | ||
| "created_at": _utc_iso_timestamp(), | ||
| } | ||
| await self._hitl_store.set_pending(state.launch_id, pending) | ||
| await self._publish_event( | ||
|
|
@@ -198,7 +212,14 @@ async def _wait_for_hitl_resolution( | |
| return None | ||
|
|
||
| async def _publish_event(self, launch_id: str, event: dict[str, Any]) -> None: | ||
| await self._session_store.publish(f"launch:{launch_id}:events", event) | ||
| # Event streaming should not block workflow completion if Redis is unavailable. | ||
| try: | ||
| await self._session_store.publish(f"launch:{launch_id}:events", event) | ||
| except Exception: | ||
|
Comment on lines
214
to
+218
|
||
| logger.warning( | ||
| "Failed to publish orchestrator event", | ||
| extra={"launch_id": launch_id, "event_type": event.get("type")}, | ||
| ) | ||
|
Comment on lines
214
to
+222
|
||
|
|
||
| async def stream(self, payload: dict[str, Any]) -> AsyncIterator[str]: | ||
| result = await self.run(payload) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description says
utcnowusage was replaced with timezone-aware UTC timestamps, but there are stilldatetime.utcnow()call sites elsewhere (e.g.,src/apps/api/workers/tasks.py,src/apps/api/services/launch_service.py, and several repositories). Either narrow the description to the orchestrator events/HITL timestamps or update the remaining occurrences for consistency.