Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions velvetflow/planner/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,52 @@ def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str,
return dangling


def _find_isolated_nodes(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]:
nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else []
inferred_edges = infer_edges_from_bindings(nodes)

indegree: Dict[str, int] = {}
outdegree: Dict[str, int] = {}
for node in nodes:
if isinstance(node, Mapping) and isinstance(node.get("id"), str):
indegree[node["id"]] = 0
outdegree[node["id"]] = 0

for edge in inferred_edges:
if not isinstance(edge, Mapping):
continue
source = edge.get("from")
target = edge.get("to")
if isinstance(source, str) and source in outdegree:
outdegree[source] += 1
if isinstance(target, str) and target in indegree:
indegree[target] += 1

isolated: List[Dict[str, Any]] = []
for node in nodes:
if not isinstance(node, Mapping):
continue

node_id = node.get("id")
node_type = node.get("type")
if not isinstance(node_id, str) or not isinstance(node_type, str):
continue

if indegree.get(node_id, 0) == 0 and outdegree.get(node_id, 0) == 0:
isolated.append(
{
"id": node_id,
"type": node_type,
"action_id": node.get("action_id")
if isinstance(node.get("action_id"), str)
else None,
"display_name": node.get("display_name"),
}
)

return isolated


def _run_coverage_check(
*,
nl_requirement: str,
Expand Down Expand Up @@ -467,6 +513,19 @@ def _build_dependency_feedback_message(
f"{json.dumps(workflow, ensure_ascii=False)}"
)


def _build_isolated_nodes_feedback_message(
*, workflow: Mapping[str, Any], isolated_nodes: List[Mapping[str, Any]]
) -> str:
return (
"检测到以下节点是孤立节点(零入度且零出度)。请分析它们是否应引用 workflow 中其他节点字段,"
"从而建立上下游依赖;若需要,请使用 update_action_node/update_condition_node/update_loop_node "
"等工具补充 params/depends_on 或分支指向以形成连接。\n"
f"- isolated_nodes: {json.dumps(isolated_nodes, ensure_ascii=False)}\n"
"当前 workflow 供参考(含推导的 edges):\n"
f"{json.dumps(workflow, ensure_ascii=False)}"
)

def plan_workflow_structure_with_llm(
nl_requirement: str,
search_service: HybridActionSearchService,
Expand Down Expand Up @@ -559,7 +618,9 @@ def _snapshot(label: str) -> Dict[str, Any]:
"你必须确保工作流结构能够完全覆盖用户自然语言需求中的每个子任务,而不是只覆盖前半部分:\n"
"例如,如果需求包含:触发 + 查询 + 筛选 + 总结 + 通知,你不能只实现触发 + 查询,\n"
"必须在结构里显式包含筛选、总结、通知等对应节点和数据流。\n"
"调用 finalize_workflow 后系统会立即对照 nl_requirement 做覆盖度检查;如果发现 missing_points 会把缺失点和当前 workflow 反馈给你,请继续用规划工具修补后再次 finalize。"
"调用 finalize_workflow 后系统会立即对照 nl_requirement 做覆盖度检查;如果发现 missing_points 会把缺失点和当前 workflow 反馈给你,请继续用规划工具修补后再次 finalize。\n"
"调用 finalize_workflow 后系统也会检查是否存在孤立节点(零入度且零出度);若存在请分析是否应引用其他节点字段,"
"如需建立依赖请使用更新工具补充参数或分支指向后再次 finalize。"
)

def _build_validation_error(message: str, **extra: Any) -> Dict[str, Any]:
Expand Down Expand Up @@ -1293,8 +1354,10 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin
latest_skeleton = skeleton
latest_coverage = coverage
nodes_without_upstream = _find_nodes_without_upstream(skeleton)
isolated_nodes = _find_isolated_nodes(skeleton)
is_covered = bool(coverage.get("is_covered", False))
needs_dependency_review = bool(nodes_without_upstream)
needs_isolation_review = bool(isolated_nodes)

feedback_parts: List[str] = []
if not is_covered:
Expand All @@ -1305,8 +1368,14 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin
feedback_parts.append(
_build_dependency_feedback_message(workflow=skeleton, nodes_without_upstream=nodes_without_upstream)
)
if isolated_nodes:
feedback_parts.append(
_build_isolated_nodes_feedback_message(workflow=skeleton, isolated_nodes=isolated_nodes)
)

should_continue = not ready or not is_covered or needs_dependency_review
should_continue = (
not ready or not is_covered or needs_dependency_review or needs_isolation_review
)
finalized_state["done"] = not should_continue
_snapshot("finalize")
return {
Expand All @@ -1315,6 +1384,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin
"notes": notes,
"coverage": coverage,
"nodes_without_upstream": nodes_without_upstream,
"isolated_nodes": isolated_nodes,
"should_continue": should_continue,
"feedback": "\n\n".join(feedback_parts) if feedback_parts else "结构已通过覆盖度与依赖检查。",
"workflow": skeleton,
Expand Down