From 61a3447c14a53980dbf37786cd37820cea259274 Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sat, 20 Dec 2025 01:02:54 -0800 Subject: [PATCH 1/5] Add auto refine loop for workflow planning --- velvetflow/planner/structure.py | 64 ++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 3dce2148..69242472 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -485,6 +485,8 @@ def plan_workflow_structure_with_llm( all_action_candidates: List[str] = [] latest_skeleton: Dict[str, Any] = {} latest_coverage: Dict[str, Any] = {} + latest_finalize_feedback: str = "" + latest_nodes_without_upstream: List[Mapping[str, Any]] = [] finalized_state: Dict[str, bool] = {"done": False} def _emit_progress(label: str, workflow_obj: Mapping[str, Any]) -> None: @@ -1283,7 +1285,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin 校验结果与反馈的字典,包含 coverage、workflow 与 should_continue 等字段。 """ _log_tool_call("finalize_workflow", {"ready": ready, "notes": notes}) - nonlocal latest_skeleton, latest_coverage + nonlocal latest_skeleton, latest_coverage, latest_finalize_feedback, latest_nodes_without_upstream skeleton, coverage = _run_coverage_check( nl_requirement=nl_requirement, builder=builder, @@ -1293,6 +1295,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin latest_skeleton = skeleton latest_coverage = coverage nodes_without_upstream = _find_nodes_without_upstream(skeleton) + latest_nodes_without_upstream = nodes_without_upstream is_covered = bool(coverage.get("is_covered", False)) needs_dependency_review = bool(nodes_without_upstream) @@ -1308,6 +1311,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin should_continue = not ready or not is_covered or needs_dependency_review finalized_state["done"] = not should_continue + latest_finalize_feedback = "\n\n".join(feedback_parts) if feedback_parts else "结构已通过覆盖度与依赖检查。" _snapshot("finalize") return { "status": "ok" if not should_continue else "needs_more_work", @@ -1316,7 +1320,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin "coverage": coverage, "nodes_without_upstream": nodes_without_upstream, "should_continue": should_continue, - "feedback": "\n\n".join(feedback_parts) if feedback_parts else "结构已通过覆盖度与依赖检查。", + "feedback": latest_finalize_feedback, "workflow": skeleton, } @@ -1443,19 +1447,63 @@ def validate_node_params(id: str, params: Optional[Dict[str, Any]] = None) -> Ma model=OPENAI_MODEL ) - total_rounds = max_rounds + max_coverage_refine_rounds + max_dependency_refine_rounds log_section("结构规划 - Agent SDK") + def _run_agent(input_payload: Any, max_turns: int) -> Any: + try: + return Runner.run_sync(agent, input_payload, max_turns=max_turns) # type: ignore[arg-type] + except TypeError: + coro = Runner.run(agent, input_payload) # type: ignore[call-arg] + return asyncio.run(coro) if asyncio.iscoroutine(coro) else coro + run_input: Any = nl_requirement - try: - result = Runner.run_sync(agent, run_input, max_turns=total_rounds) # type: ignore[arg-type] - except TypeError: - coro = Runner.run(agent, run_input) # type: ignore[call-arg] - result = asyncio.run(coro) if asyncio.iscoroutine(coro) else coro + coverage_refine_rounds = 0 + dependency_refine_rounds = 0 + while True: + result = _run_agent(run_input, max_rounds) + if finalized_state.get("done"): + break + + needs_coverage_refine = bool(latest_coverage) and not latest_coverage.get("is_covered", False) + needs_dependency_refine = bool(latest_nodes_without_upstream) + needs_finalize_call = not latest_coverage + can_refine = ( + (needs_coverage_refine and coverage_refine_rounds < max_coverage_refine_rounds) + or (needs_dependency_refine and dependency_refine_rounds < max_dependency_refine_rounds) + or (needs_finalize_call and coverage_refine_rounds < max_coverage_refine_rounds) + ) + + if not can_refine: + break + + if needs_coverage_refine or needs_finalize_call: + coverage_refine_rounds += 1 + if needs_dependency_refine: + dependency_refine_rounds += 1 + + if needs_finalize_call: + latest_finalize_feedback = ( + "尚未调用 finalize_workflow,请在结构规划完成后调用 finalize_workflow 以触发覆盖度与依赖检查。" + ) + + log_info( + "[StructurePlanner] 触发自动补全轮次", + json.dumps( + { + "coverage_round": coverage_refine_rounds, + "dependency_round": dependency_refine_rounds, + "needs_finalize_call": needs_finalize_call, + }, + ensure_ascii=False, + ), + ) + run_input = f"{nl_requirement}\n\n[系统反馈]\n{latest_finalize_feedback}" if not finalized_state.get("done"): if latest_coverage and not latest_coverage.get("is_covered", False): log_warn("[Planner] 规划结束但覆盖度未通过,返回当前骨架。") + elif not latest_coverage: + log_warn("[Planner] 未调用 finalize_workflow,使用当前骨架继续后续阶段。") else: log_warn("[Planner] 未收到 finalize_workflow,使用当前骨架继续后续阶段。") From 03cc72fd9f7e94638553cc42aacc6b9f01920195 Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sat, 20 Dec 2025 01:13:22 -0800 Subject: [PATCH 2/5] Refine planner for isolated nodes --- velvetflow/planner/structure.py | 93 +++++++++++++++++++++++++++------ 1 file changed, 76 insertions(+), 17 deletions(-) diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 69242472..5028eb68 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -371,22 +371,41 @@ def _prepare_skeleton_for_coverage( return _attach_inferred_edges(skeleton) -def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]: - nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else [] - inferred_edges = infer_edges_from_bindings(nodes) +def _summarize_node(node: Mapping[str, Any]) -> Dict[str, Any]: + return { + "id": node.get("id"), + "type": node.get("type"), + "action_id": node.get("action_id") if isinstance(node.get("action_id"), str) else None, + "display_name": node.get("display_name"), + } - indegree = {} + +def _build_degree_maps(nodes: List[Mapping[str, Any]]) -> tuple[Dict[str, int], Dict[str, int]]: + inferred_edges = infer_edges_from_bindings(nodes) + indegree: Dict[str, int] = {} + outdegree: Dict[str, int] = {} for node in nodes: if isinstance(node, Mapping) and isinstance(node.get("id"), str): indegree[node["id"]] = 0 + outdegree[node["id"]] = 0 for edge in inferred_edges: if not isinstance(edge, Mapping): continue + source = edge.get("from") target = edge.get("to") + if isinstance(source, str) and source in outdegree: + outdegree[source] += 1 if isinstance(target, str) and target in indegree: indegree[target] += 1 + return indegree, outdegree + + +def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]: + nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else [] + indegree, _ = _build_degree_maps(nodes) + dangling: List[Dict[str, Any]] = [] for node in nodes: if not isinstance(node, Mapping): @@ -398,20 +417,31 @@ def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str, continue if indegree.get(node_id, 0) == 0: - dangling.append( - { - "id": node_id, - "type": node_type, - "action_id": node.get("action_id") - if isinstance(node.get("action_id"), str) - else None, - "display_name": node.get("display_name"), - } - ) + dangling.append(_summarize_node(node)) return dangling +def _find_isolated_nodes(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]: + nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else [] + indegree, outdegree = _build_degree_maps(nodes) + + isolated: List[Dict[str, Any]] = [] + for node in nodes: + if not isinstance(node, Mapping): + continue + + node_id = node.get("id") + node_type = node.get("type") + if not isinstance(node_id, str) or not isinstance(node_type, str): + continue + + if indegree.get(node_id, 0) == 0 and outdegree.get(node_id, 0) == 0: + isolated.append(_summarize_node(node)) + + return isolated + + def _run_coverage_check( *, nl_requirement: str, @@ -467,6 +497,20 @@ def _build_dependency_feedback_message( f"{json.dumps(workflow, ensure_ascii=False)}" ) + +def _build_isolated_nodes_feedback_message( + *, workflow: Mapping[str, Any], isolated_nodes: List[Mapping[str, Any]] +) -> str: + return ( + "检测到以下节点没有任何入度与出度(孤立节点)," + "请判断它们是否应该挂接到现有 DAG 中;" + "如果可以,请使用规划工具更新节点的 depends_on 或相关参数以建立依赖;" + "如果确认需要保留独立节点,请在 finalize_workflow.notes 中说明原因。\n" + f"- isolated_nodes: {json.dumps(isolated_nodes, ensure_ascii=False)}\n" + "当前 workflow 供参考(含推导的 edges):\n" + f"{json.dumps(workflow, ensure_ascii=False)}" + ) + def plan_workflow_structure_with_llm( nl_requirement: str, search_service: HybridActionSearchService, @@ -487,6 +531,7 @@ def plan_workflow_structure_with_llm( latest_coverage: Dict[str, Any] = {} latest_finalize_feedback: str = "" latest_nodes_without_upstream: List[Mapping[str, Any]] = [] + latest_isolated_nodes: List[Mapping[str, Any]] = [] finalized_state: Dict[str, bool] = {"done": False} def _emit_progress(label: str, workflow_obj: Mapping[str, Any]) -> None: @@ -1285,7 +1330,13 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin 校验结果与反馈的字典,包含 coverage、workflow 与 should_continue 等字段。 """ _log_tool_call("finalize_workflow", {"ready": ready, "notes": notes}) - nonlocal latest_skeleton, latest_coverage, latest_finalize_feedback, latest_nodes_without_upstream + nonlocal ( + latest_skeleton, + latest_coverage, + latest_finalize_feedback, + latest_nodes_without_upstream, + latest_isolated_nodes, + ) skeleton, coverage = _run_coverage_check( nl_requirement=nl_requirement, builder=builder, @@ -1295,9 +1346,12 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin latest_skeleton = skeleton latest_coverage = coverage nodes_without_upstream = _find_nodes_without_upstream(skeleton) + isolated_nodes = _find_isolated_nodes(skeleton) latest_nodes_without_upstream = nodes_without_upstream + latest_isolated_nodes = isolated_nodes is_covered = bool(coverage.get("is_covered", False)) needs_dependency_review = bool(nodes_without_upstream) + needs_isolated_review = bool(isolated_nodes) feedback_parts: List[str] = [] if not is_covered: @@ -1308,8 +1362,12 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin feedback_parts.append( _build_dependency_feedback_message(workflow=skeleton, nodes_without_upstream=nodes_without_upstream) ) + if isolated_nodes: + feedback_parts.append( + _build_isolated_nodes_feedback_message(workflow=skeleton, isolated_nodes=isolated_nodes) + ) - should_continue = not ready or not is_covered or needs_dependency_review + should_continue = not ready or not is_covered or needs_dependency_review or needs_isolated_review finalized_state["done"] = not should_continue latest_finalize_feedback = "\n\n".join(feedback_parts) if feedback_parts else "结构已通过覆盖度与依赖检查。" _snapshot("finalize") @@ -1319,6 +1377,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin "notes": notes, "coverage": coverage, "nodes_without_upstream": nodes_without_upstream, + "isolated_nodes": isolated_nodes, "should_continue": should_continue, "feedback": latest_finalize_feedback, "workflow": skeleton, @@ -1465,7 +1524,7 @@ def _run_agent(input_payload: Any, max_turns: int) -> Any: break needs_coverage_refine = bool(latest_coverage) and not latest_coverage.get("is_covered", False) - needs_dependency_refine = bool(latest_nodes_without_upstream) + needs_dependency_refine = bool(latest_nodes_without_upstream) or bool(latest_isolated_nodes) needs_finalize_call = not latest_coverage can_refine = ( (needs_coverage_refine and coverage_refine_rounds < max_coverage_refine_rounds) From 48c56afdbb4c0e61137d2fbdcca01439990db1b1 Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sat, 20 Dec 2025 01:16:19 -0800 Subject: [PATCH 3/5] Fix nonlocal syntax in structure planner --- velvetflow/planner/structure.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 5028eb68..4e57e1d7 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -1330,13 +1330,11 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin 校验结果与反馈的字典,包含 coverage、workflow 与 should_continue 等字段。 """ _log_tool_call("finalize_workflow", {"ready": ready, "notes": notes}) - nonlocal ( - latest_skeleton, - latest_coverage, - latest_finalize_feedback, - latest_nodes_without_upstream, - latest_isolated_nodes, - ) + nonlocal latest_skeleton + nonlocal latest_coverage + nonlocal latest_finalize_feedback + nonlocal latest_nodes_without_upstream + nonlocal latest_isolated_nodes skeleton, coverage = _run_coverage_check( nl_requirement=nl_requirement, builder=builder, From 0330ac9ec436823ed40edeb0aee98be0e9507b3e Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sat, 20 Dec 2025 01:27:08 -0800 Subject: [PATCH 4/5] Track finalize calls for auto-refine --- velvetflow/planner/structure.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 4e57e1d7..877a1407 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -532,6 +532,7 @@ def plan_workflow_structure_with_llm( latest_finalize_feedback: str = "" latest_nodes_without_upstream: List[Mapping[str, Any]] = [] latest_isolated_nodes: List[Mapping[str, Any]] = [] + latest_finalize_called = False finalized_state: Dict[str, bool] = {"done": False} def _emit_progress(label: str, workflow_obj: Mapping[str, Any]) -> None: @@ -1335,12 +1336,14 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin nonlocal latest_finalize_feedback nonlocal latest_nodes_without_upstream nonlocal latest_isolated_nodes + nonlocal latest_finalize_called skeleton, coverage = _run_coverage_check( nl_requirement=nl_requirement, builder=builder, action_registry=action_registry, search_service=search_service, ) + latest_finalize_called = True latest_skeleton = skeleton latest_coverage = coverage nodes_without_upstream = _find_nodes_without_upstream(skeleton) @@ -1523,7 +1526,7 @@ def _run_agent(input_payload: Any, max_turns: int) -> Any: needs_coverage_refine = bool(latest_coverage) and not latest_coverage.get("is_covered", False) needs_dependency_refine = bool(latest_nodes_without_upstream) or bool(latest_isolated_nodes) - needs_finalize_call = not latest_coverage + needs_finalize_call = not latest_finalize_called can_refine = ( (needs_coverage_refine and coverage_refine_rounds < max_coverage_refine_rounds) or (needs_dependency_refine and dependency_refine_rounds < max_dependency_refine_rounds) From 3640fe405637483c39115949cd8f271c4eb62445 Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sat, 20 Dec 2025 01:41:16 -0800 Subject: [PATCH 5/5] Reset finalize state per planning run --- velvetflow/planner/structure.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 877a1407..73c0d718 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -1520,6 +1520,11 @@ def _run_agent(input_payload: Any, max_turns: int) -> Any: coverage_refine_rounds = 0 dependency_refine_rounds = 0 while True: + latest_finalize_called = False + latest_coverage = {} + latest_nodes_without_upstream = [] + latest_isolated_nodes = [] + latest_finalize_feedback = "" result = _run_agent(run_input, max_rounds) if finalized_state.get("done"): break