Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ flowchart LR
```mermaid
stateDiagram-v2
[*] --> Ready
Ready --> Running: 选择 start 节点
Ready --> Running: 选择入口节点
Running --> Suspended: 节点返回 async_pending
Suspended --> Running: resume_from_suspension()
Running --> Completed: pending 为空
Expand Down Expand Up @@ -213,8 +213,8 @@ VelvetFlow (repo root)
- **条件/引用类型矫正**:根据 condition kind 需求与输出 Schema 自动转换数值/正则类型,并为绑定引用与目标 Schema 之间的类型不匹配提供自动包装或错误提示。【F:velvetflow/planner/orchestrator.py†L444-L580】
- **loop.exports 补全**:自动填充缺失的 `params.exports` 映射并规范化导出字段,避免循环节点留空导致 LLM 返工。【F:velvetflow/planner/orchestrator.py†L589-L662】
- **Schema 感知修复**:移除动作 Schema 未定义的字段、为空字段写入默认值或尝试按类型转换,再进入正式校验;无法修复的错误会打包为 `ValidationError` 供后续 LLM 使用。【F:velvetflow/planner/repair_tools.py†L63-L215】【F:velvetflow/planner/orchestrator.py†L664-L817】
5. **静态校验 + LLM 自修复循环**:`validate_completed_workflow` 会在每轮本地修复后运行,若仍有错误则将错误分布与上下文交给 `_repair_with_llm_and_fallback`,在限定轮次内迭代直至通过或返回最后一个合法版本。【F:velvetflow/planner/orchestrator.py†L664-L940】
6. **持久化与可视化**:通过校验后写出 `workflow_output.json`,并可用 `render_workflow_image.py` 生成 `workflow_dag.jpg`,同时日志保留所有 LLM 对话与自动修复记录便于审计。
5. **LLM 自修复循环**:若本地修复仍未通过,将错误分布与上下文交给 `_repair_with_llm_and_fallback`,在限定轮次内迭代直至通过或返回最后一个合法版本。【F:velvetflow/planner/orchestrator.py†L664-L940】
6. **持久化与可视化**:写出 `workflow_output.json`,并可用 `render_workflow_image.py` 生成 `workflow_dag.jpg`,同时日志保留所有 LLM 对话与自动修复记录便于审计。

下方流程图将关键输入/输出、自动修复节点与 LLM 交互标出:

Expand All @@ -228,7 +228,7 @@ flowchart TD
F["Jinja 规范化 + 本地修复\n类型对齐、exports/alias 修复、路径归一化"] --> G
G{{"LLM: 自修复 (_repair_with_llm_and_fallback)\n按需多轮"}} --> H
F -->|仍有错误| G
H["静态校验 (validate_completed_workflow)\n输出: 完整 Workflow 模型"] --> J["持久化与可视化\nworkflow_output.json + workflow_dag.jpg"]
H["LLM 自修复\n输出: 完整 Workflow 模型"] --> J["持久化与可视化\nworkflow_output.json + workflow_dag.jpg"]

classDef llm fill:#fff6e6,stroke:#e67e22,stroke-width:2px;
class C,G llm;
Expand All @@ -238,7 +238,7 @@ LLM / Agent SDK 相关节点说明:
- **结构规划 Agent**:需求拆解在前置阶段完成,规划时直接复用结构化任务清单,再通过节点增删改与 `update_node_params` 工具补全 params(Jinja 表达式为主)。`WorkflowBuilder` 会把推导出的 edges、condition 分支与 `depends_on` 写回骨架,方便下游校验共享上下文;节点字段也会按节点类型或 action schema 过滤无关字段,避免 Agent 生成不可识别的参数。【F:velvetflow/planner/structure.py†L373-L1168】【F:velvetflow/planner/workflow_builder.py†L20-L222】
- **动作合法性守卫**:若发现 `action_id` 缺失或未注册,会先尝试基于 display_name/原 action_id 检索替换,再将剩余问题交给 LLM 修复,避免幻觉动作进入最终 Workflow。【F:velvetflow/planner/orchestrator.py†L104-L343】
- **Jinja 规范化与参数一致性**:规划/校验阶段会将简单路径转换为 Jinja 字符串,并对非模板参数给出修复建议;参数补全阶段的 schema 约束来自 `params_tools.py`,确保节点 params 与动作 arg_schema 对齐。【F:velvetflow/planner/params_tools.py†L1-L193】【F:velvetflow/verification/jinja_validation.py†L10-L189】
- **自修复 Agent**:当静态校验或本地修复仍未通过时,使用当前 workflow 字典与 `ValidationError` 列表提示模型修复。Agent 可以调用命名修复工具(如替换引用、补必填参数、规范化绑定路径)或提交补丁文本,直至通过或达到 `max_repair_rounds`,并在过程中保留最近一次合法版本以确保可回退。【F:velvetflow/planner/repair.py†L616-L756】【F:velvetflow/planner/orchestrator.py†L664-L940】
- **自修复 Agent**:当静态校验或本地修复仍未通过时,使用当前 workflow 字典与 `ValidationError` 列表提示模型修复。Agent 可以调用命名修复工具(如替换引用、补必填参数、规范化绑定路径)或提交补丁文本,直至通过或达到预设轮次,并在过程中保留最近一次合法版本以确保可回退。【F:velvetflow/planner/repair.py†L616-L756】【F:velvetflow/planner/orchestrator.py†L664-L940】

### Agent 工具的设计与运行方式
- **会话级工具与闭包状态**:结构规划的工具集(需求拆解、检索、设置 meta、节点增删改、参数补全)在 `plan_workflow_structure_with_llm` 内使用 `@function_tool(strict_mode=False)` 声明,并依托闭包保存 `WorkflowBuilder`、动作候选与检索结果等上下文,`planner/agent_runtime.py` 统一导出 `Agent`/`Runner`/`function_tool` 便于切换 Agent SDK 版本。【F:velvetflow/planner/structure.py†L373-L1836】【F:velvetflow/planner/agent_runtime.py†L4-L26】
Expand Down Expand Up @@ -278,14 +278,13 @@ LLM / Agent SDK 相关节点说明:
```jsonc
{
"id": "唯一字符串",
"type": "start|end|action|condition|switch|loop|parallel",
"type": "action|condition|switch|loop|parallel",
"action_id": "仅 action 节点需要,对应 tools/business_actions/ 中的 id",
"display_name": "可选: 用于可视化/日志的友好名称",
"params": { /* 取决于节点类型的参数,下文详述 */ }
}
```

- **start/end**:只需 `id` 与 `type`,`params` 可为空。常作为入口/出口。
- **action**:`action_id` 必填;`params` 按动作的 `arg_schema` 填写,支持绑定 DSL(见下文)。
- **condition**:`params.expression` 为布尔 Jinja 表达式,并需要通过 `true_to_node`/`false_to_node` 显式指向下游节点(或设置为 `null` 表示分支终止)。
- **switch**:支持多分支匹配,`params` 中可携带 `source/field` 并在 `cases` 指定 `value` 与 `to_node` 的映射,未命中时走 `default_to_node`。
Expand Down
1 change: 0 additions & 1 deletion build_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def plan_workflow(user_nl: str, search_service: Optional[HybridActionSearchServi
search_service=hybrid_searcher,
action_registry=BUSINESS_ACTIONS,
max_rounds=50,
max_repair_rounds=3,
)


Expand Down
2 changes: 1 addition & 1 deletion docs/core_concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
- `condition`/`switch` 节点按布尔或多分支结果选择下游,未命中的分支会被标记为阻断以避免重复执行。
- `loop` 节点使用 `source` 定义循环集合,`item_alias` 指定循环体内引用当前元素的名称,`body_subgraph` 执行子图,`exports` 使用 `{key: Jinja 表达式}` 收集逐轮结果列表,表达式必须引用 body_subgraph 节点字段(例如 `{{ result_of.node.field }}`),最终通过 `result_of.<loop_id>.exports.<key>` 读取。
- 循环体可使用 `loop.item`/`loop.index`/`loop.size`/`loop.accumulator`,或 `item_alias` 指定的别名。
- `parallel/start/end` 仍可作为结构辅助节点存在(`parallel` 目前只用于 UI 分组,执行器会将其视为无副作用节点)。
- `parallel` 仍可作为结构辅助节点存在(`parallel` 目前只用于 UI 分组,执行器会将其视为无副作用节点)。
- `velvetflow.models` 内置 `model_validate` 强类型校验节点字段、loop 子图完整性与引用合法性,失败时抛出统一的 `ValidationError`(接口形态与 Pydantic 类似但不依赖其运行时)。

## 绑定与上下文
Expand Down
2 changes: 1 addition & 1 deletion docs/internal_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ flowchart TD
V1[Workflow.model_validate] --> V2[infer_depends_on_from_edges]
end
subgraph runtime[动态执行]
start[选择 start 节点] --> topo[拓扑排序 + depends map]
entry[选择入口节点] --> topo[拓扑排序 + depends map]
topo --> loop[循环 body 执行]
topo --> cond[条件/开关分支]
topo --> act[业务动作调用]
Expand Down
29 changes: 10 additions & 19 deletions docs/workflow_dsl_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"workflow_name": "send_newsletter",
"description": "向 CRM 客户发送新品资讯并记录审批",
"nodes": [
{"id": "start", "type": "start"},
{
"id": "search_users",
"type": "action",
Expand All @@ -30,7 +29,7 @@
"type": "condition",
"params": {"expression": "{{ result_of.search_users.count >= 10 }}"},
"true_to_node": "send_email",
"false_to_node": "end"
"false_to_node": null
},
{
"id": "send_email",
Expand All @@ -41,15 +40,15 @@
"template_id": "spring_launch"
}
},
{"id": "end", "type": "end"}
{"id": "send_email_audit", "type": "action", "action_id": "crm.log_email", "params": {"campaign_id": "spring_launch"}}
]
}
```
上述 workflow 的 edges 会自动推导为 `start → search_users → approve → send_email/end`,无需显式维护。
上述 workflow 的 edges 会自动推导为 `search_users → approve → send_email` 等有向连线,无需显式维护。

## 通用节点字段
- `id`:必填字符串且在同一 graph 内唯一。
- `type`:必填枚举,支持 `start`、`end`、`action`、`condition`、`switch`、`loop`、`parallel`。
- `type`:必填枚举,支持 `action`、`condition`、`switch`、`loop`、`parallel`。
- `display_name`:可选友好名称,便于可视化。
- `depends_on`:可选显式依赖数组,用于覆盖自动推导或串联未被绑定引用捕获的顺序约束。
- `params`:节点专属参数,结构取决于节点类型。
Expand All @@ -60,14 +59,7 @@
- **模板语法校验**:params 字符串支持 Jinja 表达式,校验/执行时会折叠常量并报出语法错误。

## 节点类型与示例
### 1. `start` / `end`
结构化入口或终点,通常只需 `id` 与 `type`:
```json
{"id": "start", "type": "start"}
{"id": "end", "type": "end"}
```

### 2. `action`
### 1. `action`
- **字段**:`action_id` 指向注册表中的工具,`params` 对应工具的 `arg_schema`,`out_params_schema` 可选(覆盖/补充动作的输出 Schema)。
- **示例**:
```json
Expand All @@ -83,7 +75,7 @@
}
```

### 3. `condition`
### 2. `condition`
- **字段**:`params.expression` 为布尔 Jinja 表达式;`true_to_node`、`false_to_node` 指定分支去向,未命中分支自动阻断。
- **示例**:
```json
Expand All @@ -96,7 +88,7 @@
}
```

### 4. `switch`
### 3. `switch`
- **字段**:`params.source` 定位被匹配的对象(可用绑定或 `result_of.*` 路径);`params.field` 可选,用于从对象中取子字段;`cases` 为数组,支持 `match`/`value`、`field`(进一步取子字段)、`to_node`;`default_to_node` 为兜底分支。
- **示例**:
```json
Expand All @@ -112,7 +104,7 @@
}
```

### 5. `loop`
### 4. `loop`
- **字段**:
- `loop_kind`:必填,支持 `for_each`/`foreach`/`while`。
- `source`:数组/序列的引用路径(如 `{{ result_of.search.items }}`),仅支持字符串/Jinja 模板。
Expand Down Expand Up @@ -185,14 +177,13 @@
{
"workflow_name": "async_order_pipeline",
"nodes": [
{"id": "start", "type": "start"},
{"id": "search_orders", "type": "action", "action_id": "crm.list_orders", "params": {"days": 7}},
{
"id": "if_has_orders",
"type": "condition",
"params": {"expression": "{{ result_of.search_orders.count > 0 }}"},
"true_to_node": "for_each_order",
"false_to_node": "end"
"false_to_node": null
},
{
"id": "for_each_order",
Expand Down Expand Up @@ -228,7 +219,7 @@
"action_id": "finance.sync_erp",
"params": {"orders": "{{ result_of.for_each_order.exports.orders }}"}
},
{"id": "end", "type": "end"}
{"id": "notify_summary", "type": "action", "action_id": "crm.send_summary", "params": {"summary": "{{ result_of.for_each_order.exports.summary }}"}}
]
}
```
Expand Down
2 changes: 0 additions & 2 deletions tests/test_update_workflow_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def _fake_plan(
search_service=_DummySearch(),
action_registry=[{"action_id": "demo.start", "params_schema": {}}, {"action_id": "demo.summary", "params_schema": {}}],
max_rounds=1,
max_repair_rounds=0,
)

assert captured_existing["value"]["workflow_name"] == "demo"
Expand All @@ -99,4 +98,3 @@ def _fake_plan(
summarize = next(node for node in result_nodes if node.get("id") == "summarize")
assert summarize.get("depends_on") == ["start"]
assert summarize.get("params", {}).get("source") == "{{ result_of.start.message }}"

7 changes: 0 additions & 7 deletions update_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ def main(argv: list[str] | None = None) -> int:
default=100,
help="结构规划阶段的最大迭代轮次(默认: 100)",
)
parser.add_argument(
"--max-repair-rounds",
type=int,
default=3,
help="LLM 修复阶段的最大迭代轮次(默认: 3)",
)
args = parser.parse_args(argv)

# Parse requirement from CLI/file input before reading the workflow file.
Expand Down Expand Up @@ -133,7 +127,6 @@ def main(argv: list[str] | None = None) -> int:
search_service=search_service,
action_registry=action_registry,
max_rounds=args.max_rounds,
max_repair_rounds=args.max_repair_rounds,
model=args.model or OPENAI_MODEL,
)
except Exception as exc: # noqa: BLE001 - surface model/IO issues
Expand Down
2 changes: 1 addition & 1 deletion velvetflow/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ def _validate_result_reference(self, src_path: str) -> None:
)
return

# 控制节点(condition/start/end 等)也允许被引用,缺少 action_id 时跳过 schema 校验
# 控制节点(condition 等)也允许被引用,缺少 action_id 时跳过 schema 校验
if node.type != "action" or not node.action_id:
return

Expand Down
2 changes: 1 addition & 1 deletion velvetflow/executor/dynamic_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def _execute_graph(
if not start_nodes:
start_nodes = zero_indegree
if not start_nodes and sorted_nodes:
log_warn("未找到 start 节点,将从任意一个节点开始(仅 demo)。")
log_warn("未找到入口节点,将从任意一个节点开始(仅 demo)。")
start_nodes = [sorted_nodes[0].id]

visited = set()
Expand Down
7 changes: 1 addition & 6 deletions velvetflow/executor/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _derive_edges(self, workflow: Workflow) -> List[Dict[str, Any]]:
def _find_start_nodes(
self, nodes: Mapping[str, Dict[str, Any]], edges: List[Dict[str, Any]]
) -> List[str]:
"""Locate start nodes by combining explicit标记与“无入度”节点。"""
"""Locate entry nodes by combining condition starts and inbound-free nodes."""

all_ids = set(nodes.keys())
to_ids = set()
Expand All @@ -42,17 +42,12 @@ def _find_start_nodes(

inbound_free = list(all_ids - to_ids)

starts = [nid for nid, n in nodes.items() if n.get("type") == "start"]
condition_starts = [
nid for nid, n in nodes.items() if n.get("type") == "condition" and nid in inbound_free
]

outbound_roots = [nid for nid in inbound_free if nid in from_ids]

if starts:
return list(dict.fromkeys(starts + condition_starts + outbound_roots))

# 若没有显式 start 节点,则退化为无入度节点集合以保证流程仍可执行。
merged = list(dict.fromkeys(condition_starts + outbound_roots + inbound_free))
return merged

Expand Down
2 changes: 1 addition & 1 deletion velvetflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def model_validate(cls, data: Any) -> "Node":
if not isinstance(node_type, str):
errors.append({"loc": ("type",), "msg": "节点类型必须是字符串"})
else:
allowed = {"start", "end", "action", "condition", "switch", "loop", "parallel"}
allowed = {"action", "condition", "switch", "loop", "parallel"}
if node_type not in allowed:
errors.append({
"loc": ("type",),
Expand Down
33 changes: 4 additions & 29 deletions velvetflow/planner/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

This module orchestrates structure planning and parameter completion. It runs a
two-pass reasoning flow with an LLM to draft the workflow structure and fill
parameters, then validates the result before returning.
parameters, then returns the result.
"""

import json
from typing import Any, Callable, Dict, List, Mapping

from velvetflow.logging_utils import (
Expand All @@ -21,22 +20,12 @@
log_warn,
use_trace_context,
)
from velvetflow.models import PydanticValidationError, ValidationError, Workflow
from velvetflow.models import PydanticValidationError, Workflow
from velvetflow.planner.action_guard import ensure_registered_actions
from velvetflow.planner.requirement_analysis import analyze_user_requirement
from velvetflow.planner.structure import plan_workflow_structure_with_llm
from velvetflow.search import HybridActionSearchService
from velvetflow.verification import precheck_loop_body_graphs, validate_completed_workflow


def _validate_workflow(
workflow: Workflow, action_registry: List[Dict[str, Any]]
) -> List[ValidationError]:
"""Run lightweight validation and return any errors."""

return validate_completed_workflow(
workflow.model_dump(by_alias=True), action_registry=action_registry
)
from velvetflow.verification import precheck_loop_body_graphs


def plan_workflow_with_two_pass(
Expand All @@ -49,7 +38,7 @@ def plan_workflow_with_two_pass(
trace_id: str | None = None,
progress_callback: Callable[[str, Mapping[str, Any]], None] | None = None,
) -> Workflow:
"""Plan a workflow in two passes and validate the result."""
"""Plan a workflow in two passes and return the result."""

parsed_requirement = analyze_user_requirement(
nl_requirement,
Expand Down Expand Up @@ -101,13 +90,6 @@ def plan_workflow_with_two_pass(
)
guarded = guarded if isinstance(guarded, Workflow) else Workflow.model_validate(guarded)

validation_errors = _validate_workflow(guarded, action_registry)
if validation_errors:
log_warn(
"[plan_workflow_with_two_pass] Validation produced errors; returning workflow with warnings.",
json.dumps([err.model_dump() for err in validation_errors], ensure_ascii=False),
)

return guarded


Expand Down Expand Up @@ -170,13 +152,6 @@ def update_workflow_with_two_pass(
"[update_workflow_with_two_pass] progress_callback failed during update stage and was ignored."
)

validation_errors = _validate_workflow(updated_workflow, action_registry)
if validation_errors:
log_warn(
"[update_workflow_with_two_pass] Validation produced errors; returning workflow with warnings.",
json.dumps([err.model_dump() for err in validation_errors], ensure_ascii=False),
)

return updated_workflow


Expand Down
Loading
Loading