diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 3dce2148..9bc64067 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -412,6 +412,52 @@ def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str, return dangling +def _find_isolated_nodes(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]: + nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else [] + inferred_edges = infer_edges_from_bindings(nodes) + + indegree: Dict[str, int] = {} + outdegree: Dict[str, int] = {} + for node in nodes: + if isinstance(node, Mapping) and isinstance(node.get("id"), str): + indegree[node["id"]] = 0 + outdegree[node["id"]] = 0 + + for edge in inferred_edges: + if not isinstance(edge, Mapping): + continue + source = edge.get("from") + target = edge.get("to") + if isinstance(source, str) and source in outdegree: + outdegree[source] += 1 + if isinstance(target, str) and target in indegree: + indegree[target] += 1 + + isolated: List[Dict[str, Any]] = [] + for node in nodes: + if not isinstance(node, Mapping): + continue + + node_id = node.get("id") + node_type = node.get("type") + if not isinstance(node_id, str) or not isinstance(node_type, str): + continue + + if indegree.get(node_id, 0) == 0 and outdegree.get(node_id, 0) == 0: + isolated.append( + { + "id": node_id, + "type": node_type, + "action_id": node.get("action_id") + if isinstance(node.get("action_id"), str) + else None, + "display_name": node.get("display_name"), + } + ) + + return isolated + + def _run_coverage_check( *, nl_requirement: str, @@ -467,6 +513,19 @@ def _build_dependency_feedback_message( f"{json.dumps(workflow, ensure_ascii=False)}" ) + +def _build_isolated_nodes_feedback_message( + *, workflow: Mapping[str, Any], isolated_nodes: List[Mapping[str, Any]] +) -> str: + return ( + "检测到以下节点是孤立节点(零入度且零出度)。请分析它们是否应引用 workflow 中其他节点字段," + "从而建立上下游依赖;若需要,请使用 update_action_node/update_condition_node/update_loop_node " + "等工具补充 params/depends_on 或分支指向以形成连接。\n" + f"- isolated_nodes: {json.dumps(isolated_nodes, ensure_ascii=False)}\n" + "当前 workflow 供参考(含推导的 edges):\n" + f"{json.dumps(workflow, ensure_ascii=False)}" + ) + def plan_workflow_structure_with_llm( nl_requirement: str, search_service: HybridActionSearchService, @@ -559,7 +618,9 @@ def _snapshot(label: str) -> Dict[str, Any]: "你必须确保工作流结构能够完全覆盖用户自然语言需求中的每个子任务,而不是只覆盖前半部分:\n" "例如,如果需求包含:触发 + 查询 + 筛选 + 总结 + 通知,你不能只实现触发 + 查询,\n" "必须在结构里显式包含筛选、总结、通知等对应节点和数据流。\n" - "调用 finalize_workflow 后系统会立即对照 nl_requirement 做覆盖度检查;如果发现 missing_points 会把缺失点和当前 workflow 反馈给你,请继续用规划工具修补后再次 finalize。" + "调用 finalize_workflow 后系统会立即对照 nl_requirement 做覆盖度检查;如果发现 missing_points 会把缺失点和当前 workflow 反馈给你,请继续用规划工具修补后再次 finalize。\n" + "调用 finalize_workflow 后系统也会检查是否存在孤立节点(零入度且零出度);若存在请分析是否应引用其他节点字段," + "如需建立依赖请使用更新工具补充参数或分支指向后再次 finalize。" ) def _build_validation_error(message: str, **extra: Any) -> Dict[str, Any]: @@ -1293,8 +1354,10 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin latest_skeleton = skeleton latest_coverage = coverage nodes_without_upstream = _find_nodes_without_upstream(skeleton) + isolated_nodes = _find_isolated_nodes(skeleton) is_covered = bool(coverage.get("is_covered", False)) needs_dependency_review = bool(nodes_without_upstream) + needs_isolation_review = bool(isolated_nodes) feedback_parts: List[str] = [] if not is_covered: @@ -1305,8 +1368,14 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin feedback_parts.append( _build_dependency_feedback_message(workflow=skeleton, nodes_without_upstream=nodes_without_upstream) ) + if isolated_nodes: + feedback_parts.append( + _build_isolated_nodes_feedback_message(workflow=skeleton, isolated_nodes=isolated_nodes) + ) - should_continue = not ready or not is_covered or needs_dependency_review + should_continue = ( + not ready or not is_covered or needs_dependency_review or needs_isolation_review + ) finalized_state["done"] = not should_continue _snapshot("finalize") return { @@ -1315,6 +1384,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin "notes": notes, "coverage": coverage, "nodes_without_upstream": nodes_without_upstream, + "isolated_nodes": isolated_nodes, "should_continue": should_continue, "feedback": "\n\n".join(feedback_parts) if feedback_parts else "结构已通过覆盖度与依赖检查。", "workflow": skeleton,