diff --git a/README.md b/README.md index bbaa04bd..bd28f52b 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ flowchart LR ```mermaid stateDiagram-v2 [*] --> Ready - Ready --> Running: 选择 start 节点 + Ready --> Running: 选择入口节点 Running --> Suspended: 节点返回 async_pending Suspended --> Running: resume_from_suspension() Running --> Completed: pending 为空 @@ -213,8 +213,8 @@ VelvetFlow (repo root) - **条件/引用类型矫正**:根据 condition kind 需求与输出 Schema 自动转换数值/正则类型,并为绑定引用与目标 Schema 之间的类型不匹配提供自动包装或错误提示。【F:velvetflow/planner/orchestrator.py†L444-L580】 - **loop.exports 补全**:自动填充缺失的 `params.exports` 映射并规范化导出字段,避免循环节点留空导致 LLM 返工。【F:velvetflow/planner/orchestrator.py†L589-L662】 - **Schema 感知修复**:移除动作 Schema 未定义的字段、为空字段写入默认值或尝试按类型转换,再进入正式校验;无法修复的错误会打包为 `ValidationError` 供后续 LLM 使用。【F:velvetflow/planner/repair_tools.py†L63-L215】【F:velvetflow/planner/orchestrator.py†L664-L817】 -5. **静态校验 + LLM 自修复循环**:`validate_completed_workflow` 会在每轮本地修复后运行,若仍有错误则将错误分布与上下文交给 `_repair_with_llm_and_fallback`,在限定轮次内迭代直至通过或返回最后一个合法版本。【F:velvetflow/planner/orchestrator.py†L664-L940】 -6. **持久化与可视化**:通过校验后写出 `workflow_output.json`,并可用 `render_workflow_image.py` 生成 `workflow_dag.jpg`,同时日志保留所有 LLM 对话与自动修复记录便于审计。 +5. **LLM 自修复循环**:若本地修复仍未通过,将错误分布与上下文交给 `_repair_with_llm_and_fallback`,在限定轮次内迭代直至通过或返回最后一个合法版本。【F:velvetflow/planner/orchestrator.py†L664-L940】 +6. **持久化与可视化**:写出 `workflow_output.json`,并可用 `render_workflow_image.py` 生成 `workflow_dag.jpg`,同时日志保留所有 LLM 对话与自动修复记录便于审计。 下方流程图将关键输入/输出、自动修复节点与 LLM 交互标出: @@ -228,7 +228,7 @@ flowchart TD F["Jinja 规范化 + 本地修复\n类型对齐、exports/alias 修复、路径归一化"] --> G G{{"LLM: 自修复 (_repair_with_llm_and_fallback)\n按需多轮"}} --> H F -->|仍有错误| G - H["静态校验 (validate_completed_workflow)\n输出: 完整 Workflow 模型"] --> J["持久化与可视化\nworkflow_output.json + workflow_dag.jpg"] + H["LLM 自修复\n输出: 完整 Workflow 模型"] --> J["持久化与可视化\nworkflow_output.json + workflow_dag.jpg"] classDef llm fill:#fff6e6,stroke:#e67e22,stroke-width:2px; class C,G llm; @@ -238,7 +238,7 @@ LLM / Agent SDK 相关节点说明: - **结构规划 Agent**:需求拆解在前置阶段完成,规划时直接复用结构化任务清单,再通过节点增删改与 `update_node_params` 工具补全 params(Jinja 表达式为主)。`WorkflowBuilder` 会把推导出的 edges、condition 分支与 `depends_on` 写回骨架,方便下游校验共享上下文;节点字段也会按节点类型或 action schema 过滤无关字段,避免 Agent 生成不可识别的参数。【F:velvetflow/planner/structure.py†L373-L1168】【F:velvetflow/planner/workflow_builder.py†L20-L222】 - **动作合法性守卫**:若发现 `action_id` 缺失或未注册,会先尝试基于 display_name/原 action_id 检索替换,再将剩余问题交给 LLM 修复,避免幻觉动作进入最终 Workflow。【F:velvetflow/planner/orchestrator.py†L104-L343】 - **Jinja 规范化与参数一致性**:规划/校验阶段会将简单路径转换为 Jinja 字符串,并对非模板参数给出修复建议;参数补全阶段的 schema 约束来自 `params_tools.py`,确保节点 params 与动作 arg_schema 对齐。【F:velvetflow/planner/params_tools.py†L1-L193】【F:velvetflow/verification/jinja_validation.py†L10-L189】 -- **自修复 Agent**:当静态校验或本地修复仍未通过时,使用当前 workflow 字典与 `ValidationError` 列表提示模型修复。Agent 可以调用命名修复工具(如替换引用、补必填参数、规范化绑定路径)或提交补丁文本,直至通过或达到 `max_repair_rounds`,并在过程中保留最近一次合法版本以确保可回退。【F:velvetflow/planner/repair.py†L616-L756】【F:velvetflow/planner/orchestrator.py†L664-L940】 +- **自修复 Agent**:当静态校验或本地修复仍未通过时,使用当前 workflow 字典与 `ValidationError` 列表提示模型修复。Agent 可以调用命名修复工具(如替换引用、补必填参数、规范化绑定路径)或提交补丁文本,直至通过或达到预设轮次,并在过程中保留最近一次合法版本以确保可回退。【F:velvetflow/planner/repair.py†L616-L756】【F:velvetflow/planner/orchestrator.py†L664-L940】 ### Agent 工具的设计与运行方式 - **会话级工具与闭包状态**:结构规划的工具集(需求拆解、检索、设置 meta、节点增删改、参数补全)在 `plan_workflow_structure_with_llm` 内使用 `@function_tool(strict_mode=False)` 声明,并依托闭包保存 `WorkflowBuilder`、动作候选与检索结果等上下文,`planner/agent_runtime.py` 统一导出 `Agent`/`Runner`/`function_tool` 便于切换 Agent SDK 版本。【F:velvetflow/planner/structure.py†L373-L1836】【F:velvetflow/planner/agent_runtime.py†L4-L26】 @@ -278,14 +278,13 @@ LLM / Agent SDK 相关节点说明: ```jsonc { "id": "唯一字符串", - "type": "start|end|action|condition|switch|loop|parallel", + "type": "action|condition|switch|loop|parallel", "action_id": "仅 action 节点需要,对应 tools/business_actions/ 中的 id", "display_name": "可选: 用于可视化/日志的友好名称", "params": { /* 取决于节点类型的参数,下文详述 */ } } ``` -- **start/end**:只需 `id` 与 `type`,`params` 可为空。常作为入口/出口。 - **action**:`action_id` 必填;`params` 按动作的 `arg_schema` 填写,支持绑定 DSL(见下文)。 - **condition**:`params.expression` 为布尔 Jinja 表达式,并需要通过 `true_to_node`/`false_to_node` 显式指向下游节点(或设置为 `null` 表示分支终止)。 - **switch**:支持多分支匹配,`params` 中可携带 `source/field` 并在 `cases` 指定 `value` 与 `to_node` 的映射,未命中时走 `default_to_node`。 diff --git a/build_workflow.py b/build_workflow.py index 3c64be4e..6ae25bb3 100644 --- a/build_workflow.py +++ b/build_workflow.py @@ -57,7 +57,6 @@ def plan_workflow(user_nl: str, search_service: Optional[HybridActionSearchServi search_service=hybrid_searcher, action_registry=BUSINESS_ACTIONS, max_rounds=50, - max_repair_rounds=3, ) diff --git a/docs/core_concepts.md b/docs/core_concepts.md index ce87e881..e360c1a2 100644 --- a/docs/core_concepts.md +++ b/docs/core_concepts.md @@ -10,7 +10,7 @@ - `condition`/`switch` 节点按布尔或多分支结果选择下游,未命中的分支会被标记为阻断以避免重复执行。 - `loop` 节点使用 `source` 定义循环集合,`item_alias` 指定循环体内引用当前元素的名称,`body_subgraph` 执行子图,`exports` 使用 `{key: Jinja 表达式}` 收集逐轮结果列表,表达式必须引用 body_subgraph 节点字段(例如 `{{ result_of.node.field }}`),最终通过 `result_of..exports.` 读取。 - 循环体可使用 `loop.item`/`loop.index`/`loop.size`/`loop.accumulator`,或 `item_alias` 指定的别名。 - - `parallel/start/end` 仍可作为结构辅助节点存在(`parallel` 目前只用于 UI 分组,执行器会将其视为无副作用节点)。 + - `parallel` 仍可作为结构辅助节点存在(`parallel` 目前只用于 UI 分组,执行器会将其视为无副作用节点)。 - `velvetflow.models` 内置 `model_validate` 强类型校验节点字段、loop 子图完整性与引用合法性,失败时抛出统一的 `ValidationError`(接口形态与 Pydantic 类似但不依赖其运行时)。 ## 绑定与上下文 diff --git a/docs/internal_design.md b/docs/internal_design.md index 81d38f20..3b18c85b 100644 --- a/docs/internal_design.md +++ b/docs/internal_design.md @@ -15,7 +15,7 @@ flowchart TD V1[Workflow.model_validate] --> V2[infer_depends_on_from_edges] end subgraph runtime[动态执行] - start[选择 start 节点] --> topo[拓扑排序 + depends map] + entry[选择入口节点] --> topo[拓扑排序 + depends map] topo --> loop[循环 body 执行] topo --> cond[条件/开关分支] topo --> act[业务动作调用] diff --git a/docs/workflow_dsl_schema.md b/docs/workflow_dsl_schema.md index d694ad84..b6d91638 100644 --- a/docs/workflow_dsl_schema.md +++ b/docs/workflow_dsl_schema.md @@ -18,7 +18,6 @@ "workflow_name": "send_newsletter", "description": "向 CRM 客户发送新品资讯并记录审批", "nodes": [ - {"id": "start", "type": "start"}, { "id": "search_users", "type": "action", @@ -30,7 +29,7 @@ "type": "condition", "params": {"expression": "{{ result_of.search_users.count >= 10 }}"}, "true_to_node": "send_email", - "false_to_node": "end" + "false_to_node": null }, { "id": "send_email", @@ -41,15 +40,15 @@ "template_id": "spring_launch" } }, - {"id": "end", "type": "end"} + {"id": "send_email_audit", "type": "action", "action_id": "crm.log_email", "params": {"campaign_id": "spring_launch"}} ] } ``` -上述 workflow 的 edges 会自动推导为 `start → search_users → approve → send_email/end`,无需显式维护。 +上述 workflow 的 edges 会自动推导为 `search_users → approve → send_email` 等有向连线,无需显式维护。 ## 通用节点字段 - `id`:必填字符串且在同一 graph 内唯一。 -- `type`:必填枚举,支持 `start`、`end`、`action`、`condition`、`switch`、`loop`、`parallel`。 +- `type`:必填枚举,支持 `action`、`condition`、`switch`、`loop`、`parallel`。 - `display_name`:可选友好名称,便于可视化。 - `depends_on`:可选显式依赖数组,用于覆盖自动推导或串联未被绑定引用捕获的顺序约束。 - `params`:节点专属参数,结构取决于节点类型。 @@ -60,14 +59,7 @@ - **模板语法校验**:params 字符串支持 Jinja 表达式,校验/执行时会折叠常量并报出语法错误。 ## 节点类型与示例 -### 1. `start` / `end` -结构化入口或终点,通常只需 `id` 与 `type`: -```json -{"id": "start", "type": "start"} -{"id": "end", "type": "end"} -``` - -### 2. `action` +### 1. `action` - **字段**:`action_id` 指向注册表中的工具,`params` 对应工具的 `arg_schema`,`out_params_schema` 可选(覆盖/补充动作的输出 Schema)。 - **示例**: ```json @@ -83,7 +75,7 @@ } ``` -### 3. `condition` +### 2. `condition` - **字段**:`params.expression` 为布尔 Jinja 表达式;`true_to_node`、`false_to_node` 指定分支去向,未命中分支自动阻断。 - **示例**: ```json @@ -96,7 +88,7 @@ } ``` -### 4. `switch` +### 3. `switch` - **字段**:`params.source` 定位被匹配的对象(可用绑定或 `result_of.*` 路径);`params.field` 可选,用于从对象中取子字段;`cases` 为数组,支持 `match`/`value`、`field`(进一步取子字段)、`to_node`;`default_to_node` 为兜底分支。 - **示例**: ```json @@ -112,7 +104,7 @@ } ``` -### 5. `loop` +### 4. `loop` - **字段**: - `loop_kind`:必填,支持 `for_each`/`foreach`/`while`。 - `source`:数组/序列的引用路径(如 `{{ result_of.search.items }}`),仅支持字符串/Jinja 模板。 @@ -185,14 +177,13 @@ { "workflow_name": "async_order_pipeline", "nodes": [ - {"id": "start", "type": "start"}, {"id": "search_orders", "type": "action", "action_id": "crm.list_orders", "params": {"days": 7}}, { "id": "if_has_orders", "type": "condition", "params": {"expression": "{{ result_of.search_orders.count > 0 }}"}, "true_to_node": "for_each_order", - "false_to_node": "end" + "false_to_node": null }, { "id": "for_each_order", @@ -228,7 +219,7 @@ "action_id": "finance.sync_erp", "params": {"orders": "{{ result_of.for_each_order.exports.orders }}"} }, - {"id": "end", "type": "end"} + {"id": "notify_summary", "type": "action", "action_id": "crm.send_summary", "params": {"summary": "{{ result_of.for_each_order.exports.summary }}"}} ] } ``` diff --git a/tests/test_update_workflow_planner.py b/tests/test_update_workflow_planner.py index 21eacf62..ae1bed67 100644 --- a/tests/test_update_workflow_planner.py +++ b/tests/test_update_workflow_planner.py @@ -88,7 +88,6 @@ def _fake_plan( search_service=_DummySearch(), action_registry=[{"action_id": "demo.start", "params_schema": {}}, {"action_id": "demo.summary", "params_schema": {}}], max_rounds=1, - max_repair_rounds=0, ) assert captured_existing["value"]["workflow_name"] == "demo" @@ -99,4 +98,3 @@ def _fake_plan( summarize = next(node for node in result_nodes if node.get("id") == "summarize") assert summarize.get("depends_on") == ["start"] assert summarize.get("params", {}).get("source") == "{{ result_of.start.message }}" - diff --git a/update_workflow.py b/update_workflow.py index e0378953..a25e5bd5 100644 --- a/update_workflow.py +++ b/update_workflow.py @@ -90,12 +90,6 @@ def main(argv: list[str] | None = None) -> int: default=100, help="结构规划阶段的最大迭代轮次(默认: 100)", ) - parser.add_argument( - "--max-repair-rounds", - type=int, - default=3, - help="LLM 修复阶段的最大迭代轮次(默认: 3)", - ) args = parser.parse_args(argv) # Parse requirement from CLI/file input before reading the workflow file. @@ -133,7 +127,6 @@ def main(argv: list[str] | None = None) -> int: search_service=search_service, action_registry=action_registry, max_rounds=args.max_rounds, - max_repair_rounds=args.max_repair_rounds, model=args.model or OPENAI_MODEL, ) except Exception as exc: # noqa: BLE001 - surface model/IO issues diff --git a/velvetflow/bindings.py b/velvetflow/bindings.py index c1ae9eac..333f0da3 100644 --- a/velvetflow/bindings.py +++ b/velvetflow/bindings.py @@ -697,7 +697,7 @@ def _validate_result_reference(self, src_path: str) -> None: ) return - # 控制节点(condition/start/end 等)也允许被引用,缺少 action_id 时跳过 schema 校验 + # 控制节点(condition 等)也允许被引用,缺少 action_id 时跳过 schema 校验 if node.type != "action" or not node.action_id: return diff --git a/velvetflow/executor/dynamic_executor.py b/velvetflow/executor/dynamic_executor.py index 93b2687f..74631e07 100644 --- a/velvetflow/executor/dynamic_executor.py +++ b/velvetflow/executor/dynamic_executor.py @@ -162,7 +162,7 @@ def _execute_graph( if not start_nodes: start_nodes = zero_indegree if not start_nodes and sorted_nodes: - log_warn("未找到 start 节点,将从任意一个节点开始(仅 demo)。") + log_warn("未找到入口节点,将从任意一个节点开始(仅 demo)。") start_nodes = [sorted_nodes[0].id] visited = set() diff --git a/velvetflow/executor/graph.py b/velvetflow/executor/graph.py index fa5e3d13..2d57e7da 100644 --- a/velvetflow/executor/graph.py +++ b/velvetflow/executor/graph.py @@ -24,7 +24,7 @@ def _derive_edges(self, workflow: Workflow) -> List[Dict[str, Any]]: def _find_start_nodes( self, nodes: Mapping[str, Dict[str, Any]], edges: List[Dict[str, Any]] ) -> List[str]: - """Locate start nodes by combining explicit标记与“无入度”节点。""" + """Locate entry nodes by combining condition starts and inbound-free nodes.""" all_ids = set(nodes.keys()) to_ids = set() @@ -42,17 +42,12 @@ def _find_start_nodes( inbound_free = list(all_ids - to_ids) - starts = [nid for nid, n in nodes.items() if n.get("type") == "start"] condition_starts = [ nid for nid, n in nodes.items() if n.get("type") == "condition" and nid in inbound_free ] outbound_roots = [nid for nid in inbound_free if nid in from_ids] - if starts: - return list(dict.fromkeys(starts + condition_starts + outbound_roots)) - - # 若没有显式 start 节点,则退化为无入度节点集合以保证流程仍可执行。 merged = list(dict.fromkeys(condition_starts + outbound_roots + inbound_free)) return merged diff --git a/velvetflow/models.py b/velvetflow/models.py index dfd8819d..9e91265c 100644 --- a/velvetflow/models.py +++ b/velvetflow/models.py @@ -369,7 +369,7 @@ def model_validate(cls, data: Any) -> "Node": if not isinstance(node_type, str): errors.append({"loc": ("type",), "msg": "节点类型必须是字符串"}) else: - allowed = {"start", "end", "action", "condition", "switch", "loop", "parallel"} + allowed = {"action", "condition", "switch", "loop", "parallel"} if node_type not in allowed: errors.append({ "loc": ("type",), diff --git a/velvetflow/planner/orchestrator.py b/velvetflow/planner/orchestrator.py index fd4eb5f0..77f28281 100644 --- a/velvetflow/planner/orchestrator.py +++ b/velvetflow/planner/orchestrator.py @@ -5,10 +5,9 @@ This module orchestrates structure planning and parameter completion. It runs a two-pass reasoning flow with an LLM to draft the workflow structure and fill -parameters, then validates the result before returning. +parameters, then returns the result. """ -import json from typing import Any, Callable, Dict, List, Mapping from velvetflow.logging_utils import ( @@ -21,22 +20,12 @@ log_warn, use_trace_context, ) -from velvetflow.models import PydanticValidationError, ValidationError, Workflow +from velvetflow.models import PydanticValidationError, Workflow from velvetflow.planner.action_guard import ensure_registered_actions from velvetflow.planner.requirement_analysis import analyze_user_requirement from velvetflow.planner.structure import plan_workflow_structure_with_llm from velvetflow.search import HybridActionSearchService -from velvetflow.verification import precheck_loop_body_graphs, validate_completed_workflow - - -def _validate_workflow( - workflow: Workflow, action_registry: List[Dict[str, Any]] -) -> List[ValidationError]: - """Run lightweight validation and return any errors.""" - - return validate_completed_workflow( - workflow.model_dump(by_alias=True), action_registry=action_registry - ) +from velvetflow.verification import precheck_loop_body_graphs def plan_workflow_with_two_pass( @@ -49,7 +38,7 @@ def plan_workflow_with_two_pass( trace_id: str | None = None, progress_callback: Callable[[str, Mapping[str, Any]], None] | None = None, ) -> Workflow: - """Plan a workflow in two passes and validate the result.""" + """Plan a workflow in two passes and return the result.""" parsed_requirement = analyze_user_requirement( nl_requirement, @@ -101,13 +90,6 @@ def plan_workflow_with_two_pass( ) guarded = guarded if isinstance(guarded, Workflow) else Workflow.model_validate(guarded) - validation_errors = _validate_workflow(guarded, action_registry) - if validation_errors: - log_warn( - "[plan_workflow_with_two_pass] Validation produced errors; returning workflow with warnings.", - json.dumps([err.model_dump() for err in validation_errors], ensure_ascii=False), - ) - return guarded @@ -170,13 +152,6 @@ def update_workflow_with_two_pass( "[update_workflow_with_two_pass] progress_callback failed during update stage and was ignored." ) - validation_errors = _validate_workflow(updated_workflow, action_registry) - if validation_errors: - log_warn( - "[update_workflow_with_two_pass] Validation produced errors; returning workflow with warnings.", - json.dumps([err.model_dump() for err in validation_errors], ensure_ascii=False), - ) - return updated_workflow diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 50f9cb2e..5ff28558 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -22,7 +22,7 @@ ) from velvetflow.loop_dsl import iter_workflow_and_loop_body_nodes from velvetflow.planner.requirement_analysis import _normalize_requirements_payload -from velvetflow.search import HybridActionSearchService, get_openai_client +from velvetflow.search import HybridActionSearchService from velvetflow.models import ( ALLOWED_PARAM_AGGREGATORS, Node, @@ -196,23 +196,6 @@ def _attach_inferred_edges(workflow: Dict[str, Any]) -> Dict[str, Any]: return attach_condition_branches(copied) -def _ensure_start_node(builder: WorkflowBuilder) -> str: - for node in builder.nodes.values(): - if isinstance(node, Mapping) and node.get("type") == "start": - node_id = node.get("id") - return node_id if isinstance(node_id, str) else "start" - - base_id = "start" - node_id = base_id - counter = 1 - while node_id in builder.nodes: - node_id = f"{base_id}_{counter}" - counter += 1 - - builder.add_node(node_id=node_id, node_type="start", display_name="Start", params={}) - return node_id - - def _hydrate_builder_from_workflow( *, builder: WorkflowBuilder, workflow: Mapping[str, Any] ) -> None: @@ -620,7 +603,7 @@ def _build_combined_prompt() -> str: "[Workflow DSL syntax and semantics (must follow)]\n" "- workflow = {workflow_name, description, nodes: []}; only return valid JSON (edges will be automatically inferred by the system based on node bindings, no need to generate them).\n" "- Node basic structure: {id, type, display_name, params, depends_on, action_id?, out_params_schema?, loop/subgraph/branches?}.\n" - " type allows start/action/condition/loop/parallel; include a start node to mark the workflow entry.\n" + " type allows action/condition/switch/loop/parallel.\n" " Action nodes must specify action_id (from the action library) and params; only action nodes allow out_params_schema.\n" " Condition node params can only include expression (a single Jinja expression returning a boolean); true_to_node/false_to_node must be top-level fields (string or null), not inside params.\n" " Loop nodes contain loop_kind/iter/source/body_subgraph/exports. Values in exports must reference fields of nodes inside body_subgraph (e.g., {{ result_of.node.field }}); outside the loop you may only reference exports.. body_subgraph only needs the nodes array—no entry/exit/edges.\n" @@ -632,8 +615,9 @@ def _build_combined_prompt() -> str: "Construction steps:\n" "1) Use set_workflow_meta to set the workflow name and description.\n" "2) When business actions are needed, you must first call search_business_actions to query candidates; add_action_node.action_id must come from the most recent candidates.id.\n" - "3) When condition/switch/loop nodes are needed, you must first create them with add_condition_node/add_switch_node/add_loop_node; expressions and references in params must strictly follow Jinja template syntax.\n" - "4) Call update_node_params to complete and validate params for created nodes.\n" + "3) Before adding a new node, check whether a similar node already exists; if so, do not add a duplicate node.\n" + "4) When condition/switch/loop nodes are needed, you must first create them with add_condition_node/add_switch_node/add_loop_node; expressions and references in params must strictly follow Jinja template syntax.\n" + "5) Call update_node_params to complete and validate params for created nodes.\n" "6) If an existing node needs to be modified (adding display_name/params/branch targets/parent nodes, etc.), call update_action_node or update_condition_node with the fields to overwrite; after calling, be sure to check whether related upstream/downstream nodes also need updates to stay consistent.\n" "7) Condition nodes must explicitly provide true_to_node and false_to_node. Values can be a node id (continue execution) or null (indicating that branch ends); express dependencies through input/output references in node params—no need to draw edges explicitly.\n" "8) Maintain depends_on (array of strings) for each node, listing its direct upstream dependencies; when a node is targeted by condition.true_to_node/false_to_node, you must add that condition node to the target node's depends_on.\n" @@ -673,11 +657,7 @@ def _build_combined_prompt() -> str: return f"{structure_prompt}\n\n{param_prompt}" -def _find_start_nodes_for_params(workflow: Workflow) -> List[str]: - starts = [n.id for n in workflow.nodes if n.type == "start"] - if starts: - return starts - +def _find_entry_nodes_for_params(workflow: Workflow) -> List[str]: to_ids = {e.to_node for e in workflow.edges} candidates = [n.id for n in workflow.nodes if n.id not in to_ids] if candidates: @@ -687,7 +667,7 @@ def _find_start_nodes_for_params(workflow: Workflow) -> List[str]: def _traverse_order(workflow: Workflow) -> List[str]: - """按 start -> downstream 的顺序遍历,保证上游节点先被处理。""" + """按 entry -> downstream 的顺序遍历,保证上游节点先被处理。""" adj: Dict[str, List[str]] = {} for e in workflow.edges: @@ -695,7 +675,7 @@ def _traverse_order(workflow: Workflow) -> List[str]: visited: set[str] = set() order: List[str] = [] - dq: deque[str] = deque(_find_start_nodes_for_params(workflow)) + dq: deque[str] = deque(_find_entry_nodes_for_params(workflow)) while dq: nid = dq.popleft() @@ -996,7 +976,6 @@ def plan_workflow_structure_with_llm( builder = WorkflowBuilder() if existing_workflow: _hydrate_builder_from_workflow(builder=builder, workflow=existing_workflow) - _ensure_start_node(builder) action_schemas = _build_action_schema_map(action_registry) last_action_candidates: List[str] = [] all_action_candidates: List[str] = [] @@ -1069,6 +1048,54 @@ def _build_validation_error(message: str, **extra: Any) -> Dict[str, Any]: payload.update(extra) return payload + def _collect_result_of_node_ids(params: Mapping[str, Any]) -> set[str]: + node_ids: set[str] = set() + + def _walk(val: Any) -> None: + if isinstance(val, Mapping): + for item in val.values(): + _walk(item) + elif isinstance(val, list): + for item in val: + _walk(item) + elif isinstance(val, str): + for expr in _iter_template_references(val): + for path in _extract_result_of_paths(expr): + node_ids.add(path.split(".", 2)[1]) + for path in _extract_result_of_paths(val): + node_ids.add(path.split(".", 2)[1]) + + _walk(params) + return node_ids + + def _validate_existing_references( + *, node_id: str, params: Mapping[str, Any] | None = None, depends_on: List[str] | None = None + ) -> Dict[str, Any] | None: + missing_nodes: set[str] = set() + if depends_on: + for dep in depends_on: + if isinstance(dep, str) and dep not in builder.nodes: + missing_nodes.add(dep) + if params: + for ref in _collect_result_of_node_ids(params): + if ref not in builder.nodes: + missing_nodes.add(ref) + if missing_nodes: + return _build_validation_error( + "params 或 depends_on 引用了不存在的节点。请先创建这些节点再继续当前节点的创建/更新。", + node_id=node_id, + missing_nodes=sorted(missing_nodes), + ) + return None + + def _reject_duplicate_node_id(node_id: str) -> Dict[str, Any] | None: + if node_id in builder.nodes: + return _build_validation_error( + f"节点 id 已存在: {node_id}。请为新节点重命名并使用新的 id 后重试。", + duplicate_node_id=node_id, + ) + return None + def _build_workflow_snapshot() -> Workflow: workflow_dict = _attach_inferred_edges(builder.to_workflow()) return Workflow.model_validate(workflow_dict) @@ -1228,6 +1255,9 @@ def add_action_node( "parent_node_id": parent_node_id, }, ) + duplicate_error = _reject_duplicate_node_id(id) + if duplicate_error: + return _return_tool_result("add_action_node", duplicate_error) if not all_action_candidates: result = _build_validation_error("action 节点必须在调用 search_business_actions 之后创建。") return _return_tool_result("add_action_node", result) @@ -1255,6 +1285,11 @@ def add_action_node( action_schemas=action_schemas, action_id=action_id, ) + ref_error = _validate_existing_references( + node_id=id, params=cleaned_params, depends_on=depends_on or [] + ) + if ref_error: + return _return_tool_result("add_action_node", ref_error) reference_issues, available_nodes = _collect_param_reference_issues(cleaned_params) if reference_issues: result = _build_validation_error( @@ -1328,6 +1363,9 @@ def add_loop_node( "parent_node_id": parent_node_id, }, ) + duplicate_error = _reject_duplicate_node_id(id) + if duplicate_error: + return _return_tool_result("add_loop_node", duplicate_error) if parent_node_id is not None and not isinstance(parent_node_id, str): result = _build_validation_error("parent_node_id 需要是字符串或 null。") return _return_tool_result("add_loop_node", result) @@ -1355,6 +1393,11 @@ def add_loop_node( cleaned_params, removed_fields = _filter_supported_params( node_type="loop", params=merged_params, action_schemas=action_schemas ) + ref_error = _validate_existing_references( + node_id=id, params=cleaned_params, depends_on=depends_on or [] + ) + if ref_error: + return _return_tool_result("add_loop_node", ref_error) builder.add_node( node_id=id, node_type="loop", @@ -1417,6 +1460,9 @@ def add_condition_node( "parent_node_id": parent_node_id, }, ) + duplicate_error = _reject_duplicate_node_id(id) + if duplicate_error: + return _return_tool_result("add_condition_node", duplicate_error) if parent_node_id is not None and not isinstance(parent_node_id, str): result = _build_validation_error("parent_node_id 需要是字符串或 null。") return _return_tool_result("add_condition_node", result) @@ -1441,6 +1487,11 @@ def add_condition_node( cleaned_params, removed_fields = _filter_supported_params( node_type="condition", params=normalized_params, action_schemas=action_schemas ) + ref_error = _validate_existing_references( + node_id=id, params=cleaned_params, depends_on=depends_on or [] + ) + if ref_error: + return _return_tool_result("add_condition_node", ref_error) builder.add_node( node_id=id, node_type="condition", @@ -1501,6 +1552,9 @@ def add_switch_node( "parent_node_id": parent_node_id, }, ) + duplicate_error = _reject_duplicate_node_id(id) + if duplicate_error: + return _return_tool_result("add_switch_node", duplicate_error) if parent_node_id is not None and not isinstance(parent_node_id, str): result = _build_validation_error("parent_node_id 需要是字符串或 null。") return _return_tool_result("add_switch_node", result) @@ -1537,6 +1591,11 @@ def add_switch_node( cleaned_params, removed_fields = _filter_supported_params( node_type="switch", params=params or {}, action_schemas=action_schemas ) + ref_error = _validate_existing_references( + node_id=id, params=cleaned_params, depends_on=depends_on or [] + ) + if ref_error: + return _return_tool_result("add_switch_node", ref_error) builder.add_node( node_id=id, node_type="switch", @@ -1663,6 +1722,13 @@ def update_action_node( if depends_on is not None: updates["depends_on"] = depends_on + ref_error = _validate_existing_references( + node_id=id, + params=cleaned_params if params is not None else None, + depends_on=depends_on or [], + ) + if ref_error: + return _return_tool_result("update_action_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) @@ -1758,6 +1824,13 @@ def update_condition_node( if depends_on is not None: updates["depends_on"] = depends_on + ref_error = _validate_existing_references( + node_id=id, + params=cleaned_params if params is not None else None, + depends_on=depends_on or [], + ) + if ref_error: + return _return_tool_result("update_condition_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) @@ -1867,6 +1940,13 @@ def update_switch_node( if depends_on is not None: updates["depends_on"] = depends_on + ref_error = _validate_existing_references( + node_id=id, + params=cleaned_params if params is not None else None, + depends_on=depends_on or [], + ) + if ref_error: + return _return_tool_result("update_switch_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) @@ -1942,6 +2022,13 @@ def update_loop_node( if depends_on is not None: updates["depends_on"] = depends_on + ref_error = _validate_existing_references( + node_id=id, + params=cleaned_params if params is not None else None, + depends_on=depends_on or [], + ) + if ref_error: + return _return_tool_result("update_loop_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() _attach_sub_graph_nodes(builder, id, normalized_nodes) @@ -1959,6 +2046,73 @@ def update_loop_node( result = {"status": "ok", "type": "node_updated", "node_id": id} return _return_tool_result("update_loop_node", result) + @function_tool(strict_mode=False) + def remove_node(id: str) -> Mapping[str, Any]: + """Remove a node and report any remaining references to it.""" + _log_tool_call("remove_node", {"id": id}) + if not isinstance(id, str): + result = {"status": "error", "message": "remove_node 需要提供字符串类型的 id。"} + return _return_tool_result("remove_node", result) + if id not in builder.nodes: + result = {"status": "error", "message": "节点不存在,无法删除。"} + return _return_tool_result("remove_node", result) + + builder.nodes.pop(id, None) + filled_params.pop(id, None) + if id in validated_node_ids: + validated_node_ids.remove(id) + + referencing_nodes: List[str] = [] + for node_id, node in builder.nodes.items(): + if not isinstance(node, Mapping): + continue + params = node.get("params") if isinstance(node.get("params"), Mapping) else {} + if id in _collect_result_of_node_ids(params): + referencing_nodes.append(node_id) + continue + depends_on = node.get("depends_on") if isinstance(node.get("depends_on"), list) else [] + if id in depends_on: + referencing_nodes.append(node_id) + continue + if node.get("true_to_node") == id or node.get("false_to_node") == id: + referencing_nodes.append(node_id) + continue + if node.get("default_to_node") == id: + referencing_nodes.append(node_id) + continue + cases = node.get("cases") if isinstance(node.get("cases"), list) else [] + if any(isinstance(case, Mapping) and case.get("to_node") == id for case in cases): + referencing_nodes.append(node_id) + continue + + _reset_workflow_check_state() + snapshot = _snapshot(f"remove_node_{id}") + referencing_nodes = sorted(set(referencing_nodes)) + if referencing_nodes: + result = { + "status": "needs_more_work", + "type": "node_removed", + "node_id": id, + "referencing_nodes": referencing_nodes, + "message": ( + "Removed node still referenced by other nodes. " + "Please update references in these nodes: " + + ", ".join(referencing_nodes) + + "." + ), + "workflow": snapshot, + } + return _return_tool_result("remove_node", result) + + result = { + "status": "ok", + "type": "node_removed", + "node_id": id, + "referencing_nodes": [], + "workflow": snapshot, + } + return _return_tool_result("remove_node", result) + @function_tool(strict_mode=False) def dump_model() -> Mapping[str, Any]: """Export the current workflow snapshot for debugging or display. @@ -1999,75 +2153,6 @@ def _walk(val: Any) -> None: _walk(obj) return refs - def _parse_llm_json_payload(payload: str) -> Dict[str, Any]: - try: - return json.loads(payload) - except json.JSONDecodeError: - match = re.search(r"\{.*\}", payload, flags=re.DOTALL) - if match: - try: - return json.loads(match.group(0)) - except json.JSONDecodeError: - pass - log_warn("[StructurePlanner] check_workflow LLM 返回非 JSON 内容。") - return {} - - def _evaluate_workflow_requirements( - *, - workflow_snapshot: Mapping[str, Any], - user_requirements: Mapping[str, Any], - ) -> Dict[str, Any]: - client = get_openai_client() - system_prompt = ( - "你是工作流质量审查助手。请判断给定 workflow 是否能够完全满足用户需求。" - "必须输出 JSON 对象,字段:" - "satisfies (布尔值), missing_requirements (字符串数组), " - "update_suggestions (字符串数组)。" - "若 workflow 已满足需求,missing_requirements 和 update_suggestions 为空数组。" - "若不满足,missing_requirements 描述缺失的需求点," - "update_suggestions 需要说明应如何新增或更新节点来满足需求。" - ) - user_payload = json.dumps( - {"user_requirements": user_requirements, "workflow": workflow_snapshot}, - ensure_ascii=False, - ) - try: - response = client.chat.completions.create( - model=OPENAI_MODEL, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_payload}, - ], - temperature=0, - response_format={"type": "json_object"}, - ) - except Exception as exc: - log_warn(f"[StructurePlanner] check_workflow LLM 调用失败: {exc}") - return { - "satisfies": False, - "missing_requirements": ["LLM 校验失败,无法确认工作流满足所有需求。"], - "update_suggestions": ["请检查 OpenAI 调用并重试 check_workflow。"], - "error": str(exc), - } - - content = "" - if response.choices and response.choices[0].message: - content = response.choices[0].message.content or "" - parsed = _parse_llm_json_payload(content) - satisfies = bool(parsed.get("satisfies")) - missing = parsed.get("missing_requirements") - suggestions = parsed.get("update_suggestions") - if not isinstance(missing, list): - missing = [] - if not isinstance(suggestions, list): - suggestions = [] - return { - "satisfies": satisfies, - "missing_requirements": [item for item in missing if isinstance(item, str)], - "update_suggestions": [item for item in suggestions if isinstance(item, str)], - "raw_response": content, - } - @function_tool(strict_mode=False) def check_workflow() -> Mapping[str, Any]: """Check whether action nodes reference upstream nodes via Jinja templates. @@ -2076,67 +2161,103 @@ def check_workflow() -> Mapping[str, Any]: """ _log_tool_call("check_workflow") snapshot = _attach_inferred_edges(builder.to_workflow()) - llm_eval = _evaluate_workflow_requirements( - workflow_snapshot=snapshot, - user_requirements=parsed_requirement, - ) - llm_satisfies = bool(llm_eval.get("satisfies")) - llm_missing = llm_eval.get("missing_requirements", []) - llm_suggestions = llm_eval.get("update_suggestions", []) + llm_suggestions: List[str] = [] nodes = list(iter_workflow_and_loop_body_nodes(snapshot)) - edges = infer_edges_from_bindings(nodes) node_ids = { node.get("id") for node in nodes if isinstance(node, Mapping) and isinstance(node.get("id"), str) } - incoming_counts: Dict[str, int] = {nid: 0 for nid in node_ids if isinstance(nid, str)} - for edge in edges: - if not isinstance(edge, Mapping): - continue - to_node = edge.get("to") if "to" in edge else edge.get("to_node") - if isinstance(to_node, str) and to_node in incoming_counts: - incoming_counts[to_node] += 1 nodes_without_refs: List[str] = [] + condition_missing_targets: List[Dict[str, str]] = [] + loop_missing_actions: List[Dict[str, Any]] = [] for node in nodes: if not isinstance(node, Mapping): continue - if node.get("type") != "action": - continue + if node.get("type") == "condition": + condition_id = node.get("id") + if isinstance(condition_id, str): + for branch_key in ("true_to_node", "false_to_node"): + target = node.get(branch_key) + if isinstance(target, str) and target not in node_ids: + condition_missing_targets.append( + {"condition_id": condition_id, "branch": branch_key, "target": target} + ) node_id = node.get("id") if not isinstance(node_id, str): continue - has_incoming = incoming_counts.get(node_id, 0) > 0 + if node.get("type") == "loop": + params = node.get("params") if isinstance(node.get("params"), Mapping) else {} + body = params.get("body_subgraph") if isinstance(params, Mapping) else None + body_nodes = [] + if isinstance(body, Mapping): + body_nodes = body.get("nodes") if isinstance(body.get("nodes"), list) else [] + elif isinstance(body, list): + body_nodes = body + has_action = any( + isinstance(sub_node, Mapping) and sub_node.get("type") == "action" + for sub_node in body_nodes + ) + if not has_action: + loop_missing_actions.append( + { + "loop_id": node_id, + "body_subgraph": body if body is not None else {"nodes": body_nodes}, + } + ) + depends_on = node.get("depends_on") if isinstance(node.get("depends_on"), list) else [] param_refs = _collect_template_node_refs(node.get("params", {})) has_param_refs = any( ref in node_ids and ref != node_id for ref in param_refs if isinstance(ref, str) ) - if not has_incoming and not has_param_refs: + if depends_on and not has_param_refs: nodes_without_refs.append(node_id) - has_issues = bool(nodes_without_refs) or not llm_satisfies + has_issues = bool(nodes_without_refs) or bool(condition_missing_targets) or bool(loop_missing_actions) status = "needs_more_work" if has_issues else "ok" feedback_parts = [] if nodes_without_refs: + llm_suggestions.append( + "The following nodes declare depends_on but do not reference any upstream outputs in params. " + "Please connect them by adding Jinja references to upstream node outputs: " + + ", ".join(nodes_without_refs) + + "." + ) feedback_parts.append( - "Some action nodes have no upstream references. " + "Some nodes declare depends_on but have no upstream references in params. " "Update params to reference upstream outputs for nodes: " f"{', '.join(nodes_without_refs)}." ) - if not llm_satisfies: - missing_text = "; ".join(llm_missing) if llm_missing else "未满足的需求未明确。" - suggestion_text = ( - "; ".join(llm_suggestions) - if llm_suggestions - else "请新增或更新节点以覆盖缺失需求。" + if loop_missing_actions: + loop_summaries = [ + f"{item['loop_id']}: {item.get('body_subgraph')}" + for item in loop_missing_actions + if item.get("loop_id") + ] + llm_suggestions.append( + "Loop sub-graphs must include at least one action node. " + "Please add a user-requirement-related action node to the loop body, " + "or remove the loop if it's unnecessary or can be replaced by other nodes: " + + "; ".join(loop_summaries) + + "." ) - feedback_parts.append( - "Workflow does not fully satisfy user requirements. " - f"Missing: {missing_text}. Suggestions: {suggestion_text}." + feedback_parts.append(llm_suggestions[-1]) + if condition_missing_targets: + missing_descriptions = [ + f"{item['condition_id']} ({item['branch']} -> {item['target']})" + for item in condition_missing_targets + if item.get("condition_id") and item.get("branch") and item.get("target") + ] + llm_suggestions.append( + "Condition node branches reference missing nodes. " + "Please create the referenced nodes or update true_to_node/false_to_node to valid node ids: " + + "; ".join(missing_descriptions) + + "." ) + feedback_parts.append(llm_suggestions[-1]) feedback = ( - "All action nodes reference upstream outputs and the workflow matches user requirements." + "All nodes with depends_on reference upstream outputs and condition branches are valid." if not feedback_parts else " ".join(feedback_parts) ) @@ -2145,9 +2266,9 @@ def check_workflow() -> Mapping[str, Any]: "status": status, "type": "check_workflow", "nodes_without_references": nodes_without_refs, - "requirements_missing": llm_missing, + "condition_nodes_missing_targets": condition_missing_targets, + "loop_nodes_missing_actions": loop_missing_actions, "requirements_suggestions": llm_suggestions, - "llm_requirement_check": llm_eval, "has_issues": has_issues, "feedback": feedback, "workflow": snapshot, @@ -2264,6 +2385,9 @@ def update_node_params(id: str, params: Dict[str, Any]) -> Mapping[str, Any]: return _return_tool_result("update_node_params", result) normalized_params, _ = _normalize_params_templates(dict(params)) + ref_error = _validate_existing_references(node_id=id, params=normalized_params) + if ref_error: + return _return_tool_result("update_node_params", ref_error) workflow = _build_workflow_for_params() nodes_by_id = {n.id: n for n in workflow.nodes} @@ -2314,6 +2438,7 @@ def update_node_params(id: str, params: Dict[str, Any]) -> Mapping[str, Any]: update_condition_node, update_switch_node, update_loop_node, + remove_node, get_param_context, update_node_params, check_workflow, diff --git a/velvetflow/verification/workflow_validation.py b/velvetflow/verification/workflow_validation.py index 534d7529..07f3780c 100644 --- a/velvetflow/verification/workflow_validation.py +++ b/velvetflow/verification/workflow_validation.py @@ -328,7 +328,6 @@ def validate_completed_workflow( errors.set_context(None) # ---------- Graph connectivity validation ---------- - start_nodes = [n["id"] for n in nodes if n.get("type") == "start"] reachable: set = set() if nodes: adj: Dict[str, List[str]] = {} @@ -357,14 +356,14 @@ def validate_completed_workflow( code="DISCONNECTED_GRAPH", node_id=None, field=None, - message="No node with indegree 0 found; unable to determine topological start.", + message="No node with indegree 0 found; unable to determine entry point.", ) ) # Treat nodes without inbound references as additional roots so workflows # that rely solely on parameter bindings remain connected. inbound_free = zero_indegree - candidate_roots = list(dict.fromkeys((start_nodes or []) + inbound_free)) + candidate_roots = list(dict.fromkeys(inbound_free)) dq = deque(candidate_roots) while dq: @@ -403,7 +402,7 @@ def validate_completed_workflow( code="DISCONNECTED_GRAPH", node_id=nid, field=None, - message=f"Node '{nid}' is unreachable from start nodes.", + message=f"Node '{nid}' is unreachable from entry nodes.", ) ) errors.set_context(None) diff --git a/velvetflow/visualization.py b/velvetflow/visualization.py index 3097ccd2..89a7afe7 100644 --- a/velvetflow/visualization.py +++ b/velvetflow/visualization.py @@ -18,8 +18,6 @@ RGB = Tuple[int, int, int] NODE_COLORS: Dict[str, RGB] = { - "start": (76, 175, 80), - "end": (156, 39, 176), "action": (33, 150, 243), "condition": (255, 152, 0), "switch": (121, 85, 72), diff --git a/webapp/server.py b/webapp/server.py index 4dcc6e1b..03239765 100644 --- a/webapp/server.py +++ b/webapp/server.py @@ -154,7 +154,6 @@ def _run_planner_for_request( search_service=search_service, action_registry=BUSINESS_ACTIONS, max_rounds=100, - max_repair_rounds=3, progress_callback=progress_callback, ) @@ -163,7 +162,6 @@ def _run_planner_for_request( search_service=search_service, action_registry=BUSINESS_ACTIONS, max_rounds=100, - max_repair_rounds=3, progress_callback=progress_callback, )