diff --git a/backend/app/services/subscription/unified_executor.py b/backend/app/services/subscription/unified_executor.py index 6c1d39ae6..281ab2345 100644 --- a/backend/app/services/subscription/unified_executor.py +++ b/backend/app/services/subscription/unified_executor.py @@ -209,6 +209,47 @@ async def _execute_sse_sync( f"content_length={len(accumulated_content)}" ) + # Build result from accumulated content or final_event + # Check if final_event is an error event + from shared.models import EventType + + result = None + error_message = None + status = "COMPLETED" + + if final_event and final_event.type == EventType.ERROR.value: + # Handle error event + status = "FAILED" + error_message = final_event.error or "Unknown error" + logger.warning( + f"[_execute_sse_sync] Error event received: " + f"execution_id={execution_data.execution_id}, error={error_message}" + ) + elif accumulated_content: + result = {"value": accumulated_content} + elif final_event and final_event.result: + result = final_event.result + + # Publish TaskCompletedEvent for unified handling + logger.info( + f"[_execute_sse_sync] About to publish TaskCompletedEvent: " + f"task_id={execution_data.task_id}, subtask_id={execution_data.subtask_id}, " + f"status={status}, execution_id={execution_data.execution_id}" + ) + await event_bus.publish( + TaskCompletedEvent( + task_id=execution_data.task_id, + subtask_id=execution_data.subtask_id, + user_id=execution_data.user_id, + status=status, + result=result, + error=error_message, + ) + ) + logger.info( + f"[_execute_sse_sync] TaskCompletedEvent published successfully: " + f"execution_id={execution_data.execution_id}" + ) # TaskCompletedEvent is published by StatusUpdatingEmitter except Exception as e: @@ -275,6 +316,30 @@ async def _execute_http_callback( except Exception: pass # Error already handled via emitter + # Check if final_event is an error event + from shared.models import EventType + + if final_event and final_event.type == EventType.ERROR.value: + error_message = final_event.error or "Unknown error" + logger.error( + f"[_execute_http_callback] Error event received: " + f"execution_id={execution_data.execution_id}, error={error_message}" + ) + # Publish TaskCompletedEvent with FAILED status + from app.core.events import TaskCompletedEvent, get_event_bus + + event_bus = get_event_bus() + await event_bus.publish( + TaskCompletedEvent( + task_id=execution_data.task_id, + subtask_id=execution_data.subtask_id, + user_id=execution_data.user_id, + status="FAILED", + error=error_message, + ) + ) + return + logger.info( f"[_execute_http_callback] HTTP+Callback completed: " f"execution_id={execution_data.execution_id}, "