Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 136 additions & 23 deletions velvetflow/planner/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,41 @@ def _prepare_skeleton_for_coverage(
return _attach_inferred_edges(skeleton)


def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]:
nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else []
inferred_edges = infer_edges_from_bindings(nodes)
def _summarize_node(node: Mapping[str, Any]) -> Dict[str, Any]:
return {
"id": node.get("id"),
"type": node.get("type"),
"action_id": node.get("action_id") if isinstance(node.get("action_id"), str) else None,
"display_name": node.get("display_name"),
}


indegree = {}
def _build_degree_maps(nodes: List[Mapping[str, Any]]) -> tuple[Dict[str, int], Dict[str, int]]:
inferred_edges = infer_edges_from_bindings(nodes)
indegree: Dict[str, int] = {}
outdegree: Dict[str, int] = {}
for node in nodes:
if isinstance(node, Mapping) and isinstance(node.get("id"), str):
indegree[node["id"]] = 0
outdegree[node["id"]] = 0

for edge in inferred_edges:
if not isinstance(edge, Mapping):
continue
source = edge.get("from")
target = edge.get("to")
if isinstance(source, str) and source in outdegree:
outdegree[source] += 1
if isinstance(target, str) and target in indegree:
indegree[target] += 1

return indegree, outdegree


def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]:
nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else []
indegree, _ = _build_degree_maps(nodes)

dangling: List[Dict[str, Any]] = []
for node in nodes:
if not isinstance(node, Mapping):
Expand All @@ -398,20 +417,31 @@ def _find_nodes_without_upstream(workflow: Mapping[str, Any]) -> List[Dict[str,
continue

if indegree.get(node_id, 0) == 0:
dangling.append(
{
"id": node_id,
"type": node_type,
"action_id": node.get("action_id")
if isinstance(node.get("action_id"), str)
else None,
"display_name": node.get("display_name"),
}
)
dangling.append(_summarize_node(node))

return dangling


def _find_isolated_nodes(workflow: Mapping[str, Any]) -> List[Dict[str, Any]]:
nodes = workflow.get("nodes") if isinstance(workflow.get("nodes"), list) else []
indegree, outdegree = _build_degree_maps(nodes)

isolated: List[Dict[str, Any]] = []
for node in nodes:
if not isinstance(node, Mapping):
continue

node_id = node.get("id")
node_type = node.get("type")
if not isinstance(node_id, str) or not isinstance(node_type, str):
continue

if indegree.get(node_id, 0) == 0 and outdegree.get(node_id, 0) == 0:
isolated.append(_summarize_node(node))

return isolated


def _run_coverage_check(
*,
nl_requirement: str,
Expand Down Expand Up @@ -467,6 +497,20 @@ def _build_dependency_feedback_message(
f"{json.dumps(workflow, ensure_ascii=False)}"
)


def _build_isolated_nodes_feedback_message(
*, workflow: Mapping[str, Any], isolated_nodes: List[Mapping[str, Any]]
) -> str:
return (
"检测到以下节点没有任何入度与出度(孤立节点),"
"请判断它们是否应该挂接到现有 DAG 中;"
"如果可以,请使用规划工具更新节点的 depends_on 或相关参数以建立依赖;"
"如果确认需要保留独立节点,请在 finalize_workflow.notes 中说明原因。\n"
f"- isolated_nodes: {json.dumps(isolated_nodes, ensure_ascii=False)}\n"
"当前 workflow 供参考(含推导的 edges):\n"
f"{json.dumps(workflow, ensure_ascii=False)}"
)

def plan_workflow_structure_with_llm(
nl_requirement: str,
search_service: HybridActionSearchService,
Expand All @@ -485,6 +529,10 @@ def plan_workflow_structure_with_llm(
all_action_candidates: List[str] = []
latest_skeleton: Dict[str, Any] = {}
latest_coverage: Dict[str, Any] = {}
latest_finalize_feedback: str = ""
latest_nodes_without_upstream: List[Mapping[str, Any]] = []
latest_isolated_nodes: List[Mapping[str, Any]] = []
latest_finalize_called = False
finalized_state: Dict[str, bool] = {"done": False}

def _emit_progress(label: str, workflow_obj: Mapping[str, Any]) -> None:
Expand Down Expand Up @@ -1283,18 +1331,28 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin
校验结果与反馈的字典,包含 coverage、workflow 与 should_continue 等字段。
"""
_log_tool_call("finalize_workflow", {"ready": ready, "notes": notes})
nonlocal latest_skeleton, latest_coverage
nonlocal latest_skeleton
nonlocal latest_coverage
nonlocal latest_finalize_feedback
nonlocal latest_nodes_without_upstream
nonlocal latest_isolated_nodes
nonlocal latest_finalize_called
skeleton, coverage = _run_coverage_check(
nl_requirement=nl_requirement,
builder=builder,
action_registry=action_registry,
search_service=search_service,
)
latest_finalize_called = True
latest_skeleton = skeleton
latest_coverage = coverage
nodes_without_upstream = _find_nodes_without_upstream(skeleton)
isolated_nodes = _find_isolated_nodes(skeleton)
latest_nodes_without_upstream = nodes_without_upstream
latest_isolated_nodes = isolated_nodes
is_covered = bool(coverage.get("is_covered", False))
needs_dependency_review = bool(nodes_without_upstream)
needs_isolated_review = bool(isolated_nodes)

feedback_parts: List[str] = []
if not is_covered:
Expand All @@ -1305,18 +1363,24 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin
feedback_parts.append(
_build_dependency_feedback_message(workflow=skeleton, nodes_without_upstream=nodes_without_upstream)
)
if isolated_nodes:
feedback_parts.append(
_build_isolated_nodes_feedback_message(workflow=skeleton, isolated_nodes=isolated_nodes)
)

should_continue = not ready or not is_covered or needs_dependency_review
should_continue = not ready or not is_covered or needs_dependency_review or needs_isolated_review
finalized_state["done"] = not should_continue
latest_finalize_feedback = "\n\n".join(feedback_parts) if feedback_parts else "结构已通过覆盖度与依赖检查。"
_snapshot("finalize")
return {
"status": "ok" if not should_continue else "needs_more_work",
"type": "finalized",
"notes": notes,
"coverage": coverage,
"nodes_without_upstream": nodes_without_upstream,
"isolated_nodes": isolated_nodes,
"should_continue": should_continue,
"feedback": "\n\n".join(feedback_parts) if feedback_parts else "结构已通过覆盖度与依赖检查。",
"feedback": latest_finalize_feedback,
"workflow": skeleton,
}

Expand Down Expand Up @@ -1443,19 +1507,68 @@ def validate_node_params(id: str, params: Optional[Dict[str, Any]] = None) -> Ma
model=OPENAI_MODEL
)

total_rounds = max_rounds + max_coverage_refine_rounds + max_dependency_refine_rounds
log_section("结构规划 - Agent SDK")

def _run_agent(input_payload: Any, max_turns: int) -> Any:
try:
return Runner.run_sync(agent, input_payload, max_turns=max_turns) # type: ignore[arg-type]
except TypeError:
coro = Runner.run(agent, input_payload) # type: ignore[call-arg]
return asyncio.run(coro) if asyncio.iscoroutine(coro) else coro

run_input: Any = nl_requirement
try:
result = Runner.run_sync(agent, run_input, max_turns=total_rounds) # type: ignore[arg-type]
except TypeError:
coro = Runner.run(agent, run_input) # type: ignore[call-arg]
result = asyncio.run(coro) if asyncio.iscoroutine(coro) else coro
coverage_refine_rounds = 0
dependency_refine_rounds = 0
while True:
latest_finalize_called = False
latest_coverage = {}
latest_nodes_without_upstream = []
latest_isolated_nodes = []
latest_finalize_feedback = ""
result = _run_agent(run_input, max_rounds)
if finalized_state.get("done"):
break

needs_coverage_refine = bool(latest_coverage) and not latest_coverage.get("is_covered", False)
needs_dependency_refine = bool(latest_nodes_without_upstream) or bool(latest_isolated_nodes)
needs_finalize_call = not latest_finalize_called
can_refine = (
(needs_coverage_refine and coverage_refine_rounds < max_coverage_refine_rounds)
or (needs_dependency_refine and dependency_refine_rounds < max_dependency_refine_rounds)
or (needs_finalize_call and coverage_refine_rounds < max_coverage_refine_rounds)
)

if not can_refine:
break

if needs_coverage_refine or needs_finalize_call:
coverage_refine_rounds += 1
if needs_dependency_refine:
dependency_refine_rounds += 1

if needs_finalize_call:
latest_finalize_feedback = (
"尚未调用 finalize_workflow,请在结构规划完成后调用 finalize_workflow 以触发覆盖度与依赖检查。"
)

log_info(
"[StructurePlanner] 触发自动补全轮次",
json.dumps(
{
"coverage_round": coverage_refine_rounds,
"dependency_round": dependency_refine_rounds,
"needs_finalize_call": needs_finalize_call,
},
ensure_ascii=False,
),
)
run_input = f"{nl_requirement}\n\n[系统反馈]\n{latest_finalize_feedback}"

if not finalized_state.get("done"):
if latest_coverage and not latest_coverage.get("is_covered", False):
log_warn("[Planner] 规划结束但覆盖度未通过,返回当前骨架。")
elif not latest_coverage:
log_warn("[Planner] 未调用 finalize_workflow,使用当前骨架继续后续阶段。")
else:
log_warn("[Planner] 未收到 finalize_workflow,使用当前骨架继续后续阶段。")

Expand Down