Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions backend/app/services/subscription/unified_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,47 @@ async def _execute_sse_sync(
f"content_length={len(accumulated_content)}"
)

# Build result from accumulated content or final_event
# Check if final_event is an error event
from shared.models import EventType

result = None
error_message = None
status = "COMPLETED"

if final_event and final_event.type == EventType.ERROR.value:
# Handle error event
status = "FAILED"
error_message = final_event.error or "Unknown error"
logger.warning(
f"[_execute_sse_sync] Error event received: "
f"execution_id={execution_data.execution_id}, error={error_message}"
)
elif accumulated_content:
result = {"value": accumulated_content}
elif final_event and final_event.result:
result = final_event.result

# Publish TaskCompletedEvent for unified handling
logger.info(
f"[_execute_sse_sync] About to publish TaskCompletedEvent: "
f"task_id={execution_data.task_id}, subtask_id={execution_data.subtask_id}, "
f"status={status}, execution_id={execution_data.execution_id}"
)
await event_bus.publish(
TaskCompletedEvent(
task_id=execution_data.task_id,
subtask_id=execution_data.subtask_id,
user_id=execution_data.user_id,
status=status,
result=result,
error=error_message,
)
)
logger.info(
f"[_execute_sse_sync] TaskCompletedEvent published successfully: "
f"execution_id={execution_data.execution_id}"
)
# TaskCompletedEvent is published by StatusUpdatingEmitter

except Exception as e:
Expand Down Expand Up @@ -275,6 +316,30 @@ async def _execute_http_callback(
except Exception:
pass # Error already handled via emitter

# Check if final_event is an error event
from shared.models import EventType

if final_event and final_event.type == EventType.ERROR.value:
error_message = final_event.error or "Unknown error"
logger.error(
f"[_execute_http_callback] Error event received: "
f"execution_id={execution_data.execution_id}, error={error_message}"
)
# Publish TaskCompletedEvent with FAILED status
from app.core.events import TaskCompletedEvent, get_event_bus

event_bus = get_event_bus()
await event_bus.publish(
TaskCompletedEvent(
task_id=execution_data.task_id,
subtask_id=execution_data.subtask_id,
user_id=execution_data.user_id,
status="FAILED",
error=error_message,
)
)
return

logger.info(
f"[_execute_http_callback] HTTP+Callback completed: "
f"execution_id={execution_data.execution_id}, "
Expand Down