Skip to content
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ LLM / Agent SDK 相关节点说明:
- **结构规划 Agent**:需求拆解在前置阶段完成,规划时直接复用结构化任务清单,再通过节点增删改与 `update_node_params` 工具补全 params(Jinja 表达式为主)。`WorkflowBuilder` 会把推导出的 edges、condition 分支与 `depends_on` 写回骨架,方便下游校验共享上下文;节点字段也会按节点类型或 action schema 过滤无关字段,避免 Agent 生成不可识别的参数。【F:velvetflow/planner/structure.py†L373-L1168】【F:velvetflow/planner/workflow_builder.py†L20-L222】
- **动作合法性守卫**:若发现 `action_id` 缺失或未注册,会先尝试基于 display_name/原 action_id 检索替换,再将剩余问题交给 LLM 修复,避免幻觉动作进入最终 Workflow。【F:velvetflow/planner/orchestrator.py†L104-L343】
- **Jinja 规范化与参数一致性**:规划/校验阶段会将简单路径转换为 Jinja 字符串,并对非模板参数给出修复建议;参数补全阶段的 schema 约束来自 `params_tools.py`,确保节点 params 与动作 arg_schema 对齐。【F:velvetflow/planner/params_tools.py†L1-L193】【F:velvetflow/verification/jinja_validation.py†L10-L189】
- **自修复 Agent**:当静态校验或本地修复仍未通过时,使用当前 workflow 字典与 `ValidationError` 列表提示模型修复。Agent 可以调用命名修复工具(如替换引用、补必填参数、规范化绑定路径)或提交补丁文本,直至通过或达到 `max_repair_rounds`,并在过程中保留最近一次合法版本以确保可回退。【F:velvetflow/planner/repair.py†L616-L756】【F:velvetflow/planner/orchestrator.py†L664-L940】
- **自修复 Agent**:当静态校验或本地修复仍未通过时,使用当前 workflow 字典与 `ValidationError` 列表提示模型修复。Agent 可以调用命名修复工具(如替换引用、补必填参数、规范化绑定路径)或提交补丁文本,直至通过,并在过程中保留最近一次合法版本以确保可回退。【F:velvetflow/planner/repair.py†L616-L756】【F:velvetflow/planner/orchestrator.py†L664-L940】

### Agent 工具的设计与运行方式
- **会话级工具与闭包状态**:结构规划的工具集(需求拆解、检索、设置 meta、节点增删改、参数补全)在 `plan_workflow_structure_with_llm` 内使用 `@function_tool(strict_mode=False)` 声明,并依托闭包保存 `WorkflowBuilder`、动作候选与检索结果等上下文,`planner/agent_runtime.py` 统一导出 `Agent`/`Runner`/`function_tool` 便于切换 Agent SDK 版本。【F:velvetflow/planner/structure.py†L373-L1836】【F:velvetflow/planner/agent_runtime.py†L4-L26】
Expand Down
1 change: 0 additions & 1 deletion build_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def plan_workflow(user_nl: str, search_service: Optional[HybridActionSearchServi
search_service=hybrid_searcher,
action_registry=BUSINESS_ACTIONS,
max_rounds=50,
max_repair_rounds=3,
)


Expand Down
5 changes: 3 additions & 2 deletions docs/workflow_dsl_schema.en.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

- **Workflow fields**: Metadata, nodes list, and optional edges (often derived from bindings). Ensures DAG semantics and reachability before execution.
- **Node types**:
- `action`: Executes a registered action (`action_id`) with parameters/bindings and declares exports.
- `action`: Executes a registered action (`action_id`) with parameters/bindings and optionally declares `out_params_schema`.
- `condition`: Evaluates a Jinja `params.expression` to decide branch traversal.
- `loop`: Iterates over a collection, executing `body_subgraph` and collecting `params.exports` each round.
- `loop`: Iterates over a collection, executing `body_subgraph` (must include at least one action node) and collecting `params.exports` each round.
- **Bindings**: Must use Jinja expressions, referencing `result_of.<node>.<field>` or loop exports. Normalization resolves aliases and ensures consistent paths.
- **`out_params_schema` format**: Use a full JSON schema (`type`/`properties`), or a shorthand nested map where string values map to `{"type": "<type>"}` and nested objects are wrapped as `{"type": "object", "properties": ...}`.
- **Example**: A full workflow JSON example (see Chinese section above) shows nodes, bindings, exports, and edges as produced by the planner and validated before execution.
17 changes: 17 additions & 0 deletions docs/workflow_dsl_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@

### 2. `action`
- **字段**:`action_id` 指向注册表中的工具,`params` 对应工具的 `arg_schema`,`out_params_schema` 可选(覆盖/补充动作的输出 Schema)。
- **`out_params_schema` 形式**:
- 可使用完整 JSON Schema(带 `type`/`properties` 等)。
- 也支持简写嵌套映射:字符串类型会被视为 `{"type": "<type>"}`,嵌套对象会被转换为 `{"type": "object", "properties": ...}`,便于快速声明深层字段。
- **示例**:
```json
{
Expand All @@ -82,6 +85,20 @@
"out_params_schema": {"type": "object", "properties": {"summary": {"type": "string"}}}
}
```
```json
{
"id": "extract_profile",
"type": "action",
"action_id": "crm.extract_profile",
"params": {"source": "{{ result_of.fetch_user.raw }}"},
"out_params_schema": {
"profile": {
"name": "string",
"contact": {"email": "string", "phone": "string"}
}
}
}
```

### 3. `condition`
- **字段**:`params.expression` 为布尔 Jinja 表达式;`true_to_node`、`false_to_node` 指定分支去向,未命中分支自动阻断。
Expand Down
2 changes: 0 additions & 2 deletions tests/test_update_workflow_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def _fake_plan(
search_service=_DummySearch(),
action_registry=[{"action_id": "demo.start", "params_schema": {}}, {"action_id": "demo.summary", "params_schema": {}}],
max_rounds=1,
max_repair_rounds=0,
)

assert captured_existing["value"]["workflow_name"] == "demo"
Expand All @@ -99,4 +98,3 @@ def _fake_plan(
summarize = next(node for node in result_nodes if node.get("id") == "summarize")
assert summarize.get("depends_on") == ["start"]
assert summarize.get("params", {}).get("source") == "{{ result_of.start.message }}"

7 changes: 0 additions & 7 deletions update_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ def main(argv: list[str] | None = None) -> int:
default=100,
help="结构规划阶段的最大迭代轮次(默认: 100)",
)
parser.add_argument(
"--max-repair-rounds",
type=int,
default=3,
help="LLM 修复阶段的最大迭代轮次(默认: 3)",
)
args = parser.parse_args(argv)

# Parse requirement from CLI/file input before reading the workflow file.
Expand Down Expand Up @@ -133,7 +127,6 @@ def main(argv: list[str] | None = None) -> int:
search_service=search_service,
action_registry=action_registry,
max_rounds=args.max_rounds,
max_repair_rounds=args.max_repair_rounds,
model=args.model or OPENAI_MODEL,
)
except Exception as exc: # noqa: BLE001 - surface model/IO issues
Expand Down
29 changes: 25 additions & 4 deletions velvetflow/planner/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
log_warn,
use_trace_context,
)
from dataclasses import asdict, is_dataclass

from velvetflow.models import PydanticValidationError, ValidationError, Workflow
from velvetflow.planner.action_guard import ensure_registered_actions
from velvetflow.planner.requirement_analysis import analyze_user_requirement
Expand All @@ -39,6 +41,19 @@ def _validate_workflow(
)


def _serialize_validation_error(error: ValidationError) -> Dict[str, Any]:
if hasattr(error, "model_dump"):
return error.model_dump() # type: ignore[no-any-return]
if is_dataclass(error):
return asdict(error)
return {
"code": getattr(error, "code", None),
"node_id": getattr(error, "node_id", None),
"field": getattr(error, "field", None),
"message": getattr(error, "message", str(error)),
}


def plan_workflow_with_two_pass(
nl_requirement: str,
search_service: HybridActionSearchService,
Expand Down Expand Up @@ -104,8 +119,11 @@ def plan_workflow_with_two_pass(
validation_errors = _validate_workflow(guarded, action_registry)
if validation_errors:
log_warn(
"[plan_workflow_with_two_pass] Validation produced errors; returning workflow with warnings.",
json.dumps([err.model_dump() for err in validation_errors], ensure_ascii=False),
"[plan_workflow_with_two_pass] Validation produced errors; returning workflow with warnings. "
+ json.dumps(
[_serialize_validation_error(err) for err in validation_errors],
ensure_ascii=False,
),
)

return guarded
Expand Down Expand Up @@ -173,8 +191,11 @@ def update_workflow_with_two_pass(
validation_errors = _validate_workflow(updated_workflow, action_registry)
if validation_errors:
log_warn(
"[update_workflow_with_two_pass] Validation produced errors; returning workflow with warnings.",
json.dumps([err.model_dump() for err in validation_errors], ensure_ascii=False),
"[update_workflow_with_two_pass] Validation produced errors; returning workflow with warnings. "
+ json.dumps(
[_serialize_validation_error(err) for err in validation_errors],
ensure_ascii=False,
),
)

return updated_workflow
Expand Down
67 changes: 65 additions & 2 deletions velvetflow/planner/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,26 @@ def add_condition_node(
if false_to_node is not None and not isinstance(false_to_node, str):
result = _build_validation_error("false_to_node 只能是节点 id 或 null。", invalid_fields=["false_to_node"])
return _return_tool_result("add_condition_node", result)
workflow_nodes = builder.to_workflow().get("nodes", [])
known_node_ids = {
node.get("id")
for node in workflow_nodes
if isinstance(node, Mapping) and isinstance(node.get("id"), str)
}
missing_branches = [
node_id
for node_id in (true_to_node, false_to_node)
if isinstance(node_id, str) and node_id not in known_node_ids
]
if missing_branches:
result = _build_validation_error(
"condition 节点的分支目标必须先存在。"
"请先创建分支节点后再创建或更新 condition 节点;"
"如果分支不需要,请将 true_to_node/false_to_node 设为 null。",
invalid_fields=["true_to_node", "false_to_node"],
missing_nodes=missing_branches,
)
return _return_tool_result("add_condition_node", result)
normalized_params: Dict[str, Any] = dict(params or {})
expr_val = normalized_params.get("expression")
if isinstance(expr_val, str):
Expand Down Expand Up @@ -1727,6 +1747,27 @@ def update_condition_node(
if false_to_node is not None and not isinstance(false_to_node, str):
result = _build_validation_error("false_to_node 只能是节点 id 或 null。", invalid_fields=["false_to_node"])
return _return_tool_result("update_condition_node", result)
if true_to_node is not None or false_to_node is not None:
workflow_nodes = builder.to_workflow().get("nodes", [])
known_node_ids = {
node.get("id")
for node in workflow_nodes
if isinstance(node, Mapping) and isinstance(node.get("id"), str)
}
missing_branches = [
node_id
for node_id in (true_to_node, false_to_node)
if isinstance(node_id, str) and node_id not in known_node_ids
]
if missing_branches:
result = _build_validation_error(
"condition 节点的分支目标必须先存在。"
"请先创建分支节点后再更新 condition 节点;"
"如果分支不需要,请将 true_to_node/false_to_node 设为 null。",
invalid_fields=["true_to_node", "false_to_node"],
missing_nodes=missing_branches,
)
return _return_tool_result("update_condition_node", result)
if params is not None and not isinstance(params, Mapping):
result = _build_validation_error("condition 节点的 params 需要是对象。")
return _return_tool_result("update_condition_node", result)
Expand Down Expand Up @@ -2038,7 +2079,6 @@ def _evaluate_workflow_requirements(
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_payload},
],
temperature=0,
response_format={"type": "json_object"},
)
except Exception as exc:
Expand Down Expand Up @@ -2115,7 +2155,23 @@ def check_workflow() -> Mapping[str, Any]:
if not has_incoming and not has_param_refs:
nodes_without_refs.append(node_id)

has_issues = bool(nodes_without_refs) or not llm_satisfies
loop_nodes_missing_actions: List[str] = []
for node in nodes:
if not isinstance(node, Mapping):
continue
if node.get("type") != "loop":
continue
node_id = node.get("id")
params = node.get("params") if isinstance(node.get("params"), Mapping) else {}
body_nodes = (params.get("body_subgraph") or {}).get("nodes", [])
has_action = any(
isinstance(body_node, Mapping) and body_node.get("type") == "action"
for body_node in body_nodes
)
if not has_action and isinstance(node_id, str):
loop_nodes_missing_actions.append(node_id)

has_issues = bool(nodes_without_refs) or bool(loop_nodes_missing_actions) or not llm_satisfies
status = "needs_more_work" if has_issues else "ok"
feedback_parts = []
if nodes_without_refs:
Expand All @@ -2124,6 +2180,12 @@ def check_workflow() -> Mapping[str, Any]:
"Update params to reference upstream outputs for nodes: "
f"{', '.join(nodes_without_refs)}."
)
if loop_nodes_missing_actions:
feedback_parts.append(
"Loop nodes must include at least one action node in body_subgraph. "
"Add an action node to the subgraph or remove the loop if it is unnecessary: "
f"{', '.join(loop_nodes_missing_actions)}."
)
if not llm_satisfies:
missing_text = "; ".join(llm_missing) if llm_missing else "未满足的需求未明确。"
suggestion_text = (
Expand All @@ -2145,6 +2207,7 @@ def check_workflow() -> Mapping[str, Any]:
"status": status,
"type": "check_workflow",
"nodes_without_references": nodes_without_refs,
"loop_nodes_missing_actions": loop_nodes_missing_actions,
"requirements_missing": llm_missing,
"requirements_suggestions": llm_suggestions,
"llm_requirement_check": llm_eval,
Expand Down
23 changes: 16 additions & 7 deletions velvetflow/verification/binding_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ def _index_actions_by_id(action_registry: List[Dict[str, Any]]) -> Dict[str, Dic
return {a["action_id"]: a for a in action_registry}


def _coerce_out_param_schema(value: Any) -> Mapping[str, Any]:
if isinstance(value, Mapping):
if any(key in value for key in ("type", "properties", "$schema")):
return value
properties = {key: _coerce_out_param_schema(item) for key, item in value.items()}
return {"type": "object", "properties": properties}
Comment on lines +26 to +31
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Treat non-type schema keywords as schemas, not objects

The new _coerce_out_param_schema treats any mapping that lacks type/properties/$schema as a nested object map, which changes the previous behavior for field-level schemas that only use other JSON Schema keywords (e.g., {"enum": [...]}, {"oneOf": [...]}, {"items": {...}}). Those were previously passed through as-is, but now get coerced into {type: object, properties: {enum: ...}}, so reference validation will interpret foo.enum as a property instead of applying the schema to foo. This breaks existing out_params_schema definitions that rely on keyword-only schemas; consider broadening the “already a schema” check to include other JSON Schema keywords.

Useful? React with 👍 / 👎.


if isinstance(value, str):
return {"type": value}

return {}


def _schema_from_out_params_schema(
out_params_schema: Mapping[str, Any] | None,
) -> Optional[Mapping[str, Any]]:
Expand All @@ -35,13 +48,9 @@ def _schema_from_out_params_schema(
if any(key in out_params_schema for key in ("type", "properties", "$schema")):
return out_params_schema

properties: Dict[str, Any] = {}
for key, value in out_params_schema.items():
if isinstance(value, Mapping):
properties[key] = value
else:
# Best-effort wrapper when only scalar types are provided.
properties[key] = {"type": value} if isinstance(value, str) else {}
properties: Dict[str, Any] = {
key: _coerce_out_param_schema(value) for key, value in out_params_schema.items()
}

return {"type": "object", "properties": properties}

Expand Down
2 changes: 0 additions & 2 deletions webapp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def _run_planner_for_request(
search_service=search_service,
action_registry=BUSINESS_ACTIONS,
max_rounds=100,
max_repair_rounds=3,
progress_callback=progress_callback,
)

Expand All @@ -163,7 +162,6 @@ def _run_planner_for_request(
search_service=search_service,
action_registry=BUSINESS_ACTIONS,
max_rounds=100,
max_repair_rounds=3,
progress_callback=progress_callback,
)

Expand Down
Loading