-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgraph_executor.py
More file actions
30 lines (23 loc) · 894 Bytes
/
graph_executor.py
File metadata and controls
30 lines (23 loc) · 894 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from typing import Dict, Any
import traceback
def execute_workflow(graph, input_data: Dict[str, Any]) -> Dict[str, Any]:
try:
initial_state = {
"input": input_data,
"messages": [],
"current_output": None,
"final_output": None,
"execution_log": []
}
print(f"[Executor] Starting workflow with input: {input_data}")
result = graph.invoke(initial_state)
print(f"[Executor] Workflow completed successfully")
return {
"final_output": result.get("final_output"),
"execution_log": result.get("execution_log", []),
"messages": result.get("messages", [])
}
except Exception as e:
print(f"[Executor] Error: {str(e)}")
print(f"[Executor] Traceback: {traceback.format_exc()}")
raise