Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions tests/test_jinja_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,39 @@ def test_condition_field_literal_wrapped_as_jinja():
assert errors == []
assert summary.get("applied") in {False, None}
assert normalized["nodes"][0]["params"]["expression"] == "{{ result_of.some_loop.exports.items | length > 0 }}"


def test_malformed_conditional_literal_is_repaired():
workflow = _workflow({"status": "abnormal' if loop.item.temperature > 38 else 'normal"})

normalized, summary, errors = normalize_params_to_jinja(workflow)

assert errors == []
assert summary["applied"] is True
assert summary["replacements"] == [
{
"path": "params.status",
"from": "abnormal' if loop.item.temperature > 38 else 'normal",
"to": "{{ 'abnormal' if loop.item.temperature > 38 else 'normal' }}",
}
]


def test_malformed_conditional_literal_with_filter_is_repaired():
workflow = _workflow(
{
"status": "alert' if (loop.item.temperature is not none and (loop.item.temperature | float) > 38.0) else 'normal",
}
)

normalized, summary, errors = normalize_params_to_jinja(workflow)

assert errors == []
assert summary["applied"] is True
assert summary["replacements"] == [
{
"path": "params.status",
"from": "alert' if (loop.item.temperature is not none and (loop.item.temperature | float) > 38.0) else 'normal",
"to": "{{ 'alert' if (loop.item.temperature is not none and (loop.item.temperature | float) > 38.0) else 'normal' }}",
}
]
153 changes: 76 additions & 77 deletions velvetflow/planner/repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,20 +366,20 @@ def _make_error_key(e: ValidationError) -> str:


_ERROR_TYPE_PROMPTS: Dict[str, str] = {
"MISSING_REQUIRED_PARAM": "补齐 params.<field>,优先绑定上游符合 arg_schema 的输出或使用 schema 默认值。示例:如果 action 需要 params.prompt 且缺失,可绑定上游文案生成节点的 result.output",
"UNKNOWN_ACTION_ID": "替换为 Action Registry 中存在的 action_id,保持节点含义最接近并同步校正 params。示例:将未知的 action_id 改为 registry 中的 text.generate,并保留 prompt/temperature 等参数。",
"UNKNOWN_PARAM": "移除或重命名 params 中未在 arg_schema 声明的字段,使其与 schema 对齐。示例:action schema 仅接受 prompt/temperature,应删除意外出现的 max_token 字段。",
"DISCONNECTED_GRAPH": "连接无入度/出度节点,确保每个节点都位于可达路径上。示例:为孤立的 summary 节点添加来自前置生成节点的边,并把结果导向下游汇总节点。",
"INVALID_EDGE": "修复边的 from/to 以引用存在的节点,并保持条件字段合法。示例:若 edge.to 指向不存在的 node-3,可改为实际存在的 reviewer 节点。",
"SCHEMA_MISMATCH": "调整绑定字段或聚合方式,使类型与上游 output_schema/arg_schema 兼容。示例:如果上游输出是 list 而当前节点期望 string,可改为 join 列表或选择单个元素。",
"INVALID_SCHEMA": "DSL 结构补齐/修正字段类型(nodes/edges/params),避免 JSON 结构缺失。示例:在 workflow 缺少 edges 字段时补空数组,并确保 nodes 为列表。",
"INVALID_LOOP_BODY": "补齐 loop.body_subgraph.nodes(至少一个 action),并确保 exports 使用 {key: Jinja表达式} 结构引用 body_subgraph 节点字段。",
"STATIC_RULES_SUMMARY": "遵循静态规则摘要中的提示逐项修复,优先处理连通性和 schema 不匹配。示例:按摘要要求先修复未连接的边,再补齐缺失的必填参数。",
"EMPTY_PARAM_VALUE": "为空的参数应填入非空值或绑定,避免空字符串/空对象。示例:将空的 params.prompt 改为具体提示词或绑定上游结果。",
"EMPTY_PARAMS": "params 不能是空对象,为必填字段提供值或引用,并使用工具完成不确定的填充。示例:若节点类型为 action params {},根据 arg_schema 填入 prompt/template 等必填字段。",
"SELF_REFERENCE": "移除节点对自身 result_of.<node> 的引用,改用上游输出或拆分节点并通过工具应用修改。示例:若 node-a.params.input 引用 result_of.node-a,可改为 result_of.previous-node",
"SYNTAX_ERROR": "修正 DSL 语法(括号、逗号、引号等),使解析器能生成 AST。示例:补全缺失的引号或逗号,确保 JSON 能被解析。",
"GRAMMAR_VIOLATION": "遵循文法约束调整节点/字段位置,按期望的 token/产生式重新组织。示例:将误放在 edges 中的节点定义移回 nodes 列表。",
"MISSING_REQUIRED_PARAM": "Fill in params.<field>; prefer binding upstream outputs that match arg_schema or use schema defaults. Example: if an action requires params.prompt but it is missing, bind result.output from an upstream copywriting node.",
"UNKNOWN_ACTION_ID": "Replace with an action_id that exists in the Action Registry, keeping the node's intent closest and fixing params accordingly. Example: change an unknown action_id to text.generate in the registry and keep parameters such as prompt/temperature.",
"UNKNOWN_PARAM": "Remove or rename params fields not declared in arg_schema so they align with the schema. Example: if the action schema only accepts prompt/temperature, delete an unexpected max_token field.",
"DISCONNECTED_GRAPH": "Connect nodes with no in-degree/out-degree so every node sits on a reachable path. Example: add an edge from a prior generation node to an isolated summary node and route its result to downstream aggregation nodes.",
"INVALID_EDGE": "Fix edge.from/edge.to to reference existing nodes and keep condition fields valid. Example: if edge.to points to missing node-3, change it to an existing reviewer node.",
"SCHEMA_MISMATCH": "Adjust bound fields or aggregation so types are compatible with upstream output_schema/arg_schema. Example: if upstream output is a list but the current node expects a string, join the list or pick a single element.",
"INVALID_SCHEMA": "Fill or correct field types per the DSL structure (nodes/edges/params) to avoid missing JSON pieces. Example: if the workflow lacks an edges field, add an empty array and ensure nodes is a list.",
"INVALID_LOOP_BODY": "Fill loop.body_subgraph.nodes (at least one action) and ensure exports use the {key: Jinja expression} shape referencing body_subgraph node fields.",
"STATIC_RULES_SUMMARY": "Follow the static rules summary step by step, prioritizing connectivity and schema mismatches. Example: fix disconnected edges first per the summary, then fill missing required params.",
"EMPTY_PARAM_VALUE": "Populate empty parameters with non-empty values or bindings; avoid empty strings/objects. Example: change an empty params.prompt to a concrete prompt or bind it to an upstream result.",
"EMPTY_PARAMS": "params cannot be an empty object—provide values or references for required fields and use tools to fill uncertain parts. Example: if a node type is action but params is {}, fill required fields like prompt/template according to arg_schema.",
"SELF_REFERENCE": "Remove a node's reference to its own result_of.<node>; use upstream outputs instead, or split the node and apply tools. Example: if node-a.params.input references result_of.node-a, change it to result_of.previous-node.",
"SYNTAX_ERROR": "Fix DSL syntax (brackets, commas, quotes, etc.) so the parser can build the AST. Example: add missing quotes or commas to make the JSON parseable.",
"GRAMMAR_VIOLATION": "Rearrange nodes/fields according to grammar constraints, respecting expected tokens/productions. Example: move a node definition mistakenly placed in edges back to the nodes list.",
}


Expand Down Expand Up @@ -569,69 +569,68 @@ def repair_workflow_with_llm(
}

system_prompt = """
你是一个工作流修复助手。
【Workflow DSL 语法与语义(务必遵守)】
- workflow = {workflow_name, description, nodes: []},只能返回合法 JSON(edges 会由系统基于节点绑定自动推导,不需要生成)。
- node 基本结构:{id, type, display_name, params, action_id?, out_params_schema?, loop/subgraph/branches?}。
type 仅允许 action/condition/loop/parallel。无需 start/end/exit 节点。
action 节点必须填写 action_id(来自动作库)与 params;只有 action 节点允许 out_params_schema。
condition 节点需包含返回布尔值的 params.expression(合法 Jinja 表达式),以及 true_to_node/false_to_node(字符串或 null)。
loop 节点包含 loop_kind/iter/source/body_subgraph/exports,exports 的 value 必须引用 body_subgraph 节点字段(如 {{ result_of.node.field }}),循环外部只能引用 exports.<key>,body_subgraph 仅包含 nodes 数组。
loop.body_subgraph 内不需要也不允许显式声明 edges、entry 或 exit 节点,如发现请直接删除。
- params 必须直接使用 Jinja 表达式引用上游结果(如 {{ result_of.<node_id>.<field_path> }} 或 {{ loop.item.xxx }}),不得输出对象式绑定。
<node_id> 必须存在且字段需与上游 output_schema 或 loop.exports 对齐。
当前有一个 workflow JSON 和一组结构化校验错误 validation_errors。
validation_errors 是 JSON 数组,元素包含 code/node_id/field/message。
这些错误来自:
- action 参数缺失或不符合 arg_schema
- condition 条件不完整
- source 路径引用了不存在的节点
- source 路径与上游 action 的 output_schema 不匹配
- source 指向的数组元素 schema 中不存在某个字段

- previous_failed_attempts 会记录同一错误的历史修复尝试,请避免重复失败的方法,尝试不同的修复路径。

- workflow 结构不符合 DSL schema(例如节点 type 非法)

总体目标:在“尽量不改变工作流整体结构”的前提下,修复这些错误,使 workflow 通过静态校验。

具体要求(很重要,请严格遵守):
1. 结构保持稳定:
- 不要增加或删除节点;
- edges 会由系统根据节点引用自动推导,不需要手动增删或调整 condition。

2. action 节点修复优先级:
- 首先根据 action_schemas[action_id].arg_schema 补齐 params 里缺失的必填字段,或修正错误类型;
- 如果 action_id 本身是合法的(存在于 action_schemas 中),优先“修 params”,不要改 action_id;
- 只有当 validation_errors 明确指出 action_id 不存在时,才考虑把 action_id 改成一个更合理的候选,
并同步更新该节点的 params 使之符合新的 arg_schema。

3. condition 节点修复:
- 确保 params.expression 存在且是合法的 Jinja 布尔表达式;
- expression 应直接编写判断逻辑,引用上游结果请使用 {{ result_of.<node>.<field_path> }} 或 {{ loop.item.xxx }};
- 不要生成 kind/field/op/value 等字段,所有条件均需融入 expression。

4. loop 节点:确保 loop_kind(for_each/while)和 source 字段合法,并使用 exports 暴露 body 结果(表达式需引用 body_subgraph 节点字段),下游只能引用 result_of.<loop_id>.exports.<key>。
5. parallel 节点:branches 必须是非空数组。

6. 参数绑定修复:
- 仅输出 Jinja 表达式或字面量,禁止回退到对象式绑定;
- 聚合/过滤/拼接逻辑请直接写在 Jinja 表达式或过滤器里,并确保字段路径与上游 schema 对齐;
- 常见错误:下游引用缺少 exports 段,请补齐 result_of.<loop_id>.exports.<key>。

7. 修改范围尽量最小化:
- 当有多种修复方式时,优先选择改动最小、语义最接近原意的方案(如只改一个字段名,而不是重写整个 params)。
- 当 validation_error_summary 提供 Schema 提示或路径信息时,优先按提示矫正字段类型/结构,避免多轮重复犯错。

8. 修复回合有限且必须闭环:
- 系统会在有限轮次内重新校验;如果你忽略任何一条 validation_error,流程将直接进入下一轮甚至终止。
- 请逐条对照 validation_errors,把所有问题修到为 0 再输出结果,避免留存隐患。

9. 输出要求:
- 保持顶层结构:workflow_name/description/nodes 不变(仅节点内部内容可调整,edges 由系统推导);
- 节点的 id/type 不变;
- 必须使用工具链完成修改:通过修复工具调整 workflow 并直接使用最新结果,避免自然语言或 Markdown 代码块。
10. 可用工具:当你需要结构化修改时,优先调用提供的工具(无 LLM 依赖、结果确定),包含 fix_loop_body_references/fill_action_required_params/update_node_field/normalize_binding_paths/replace_reference_paths/drop_invalid_references。
You are a workflow repair assistant.
[Workflow DSL syntax and semantics (must follow)]
- workflow = {workflow_name, description, nodes: []}. Only return valid JSON (edges will be auto-inferred by the system from node bindings; do not generate them).
- Node base structure: {id, type, display_name, params, action_id?, out_params_schema?, loop/subgraph/branches?}.
type only allows action/condition/loop/parallel. No need for start/end/exit nodes.
action nodes must include action_id (from the action registry) and params; only action nodes may have out_params_schema.
condition nodes must include params.expression that returns a boolean (valid Jinja expression) and true_to_node/false_to_node (string or null).
loop nodes include loop_kind/iter/source/body_subgraph/exports. The value of exports must reference fields of nodes inside body_subgraph (e.g., {{ result_of.node.field }}). Outside the loop, only exports.<key> can be referenced. body_subgraph only contains a nodes array.
loop.body_subgraph does not need and must not explicitly declare edges, entry, or exit nodes—remove them if present.
- params must directly use Jinja expressions to reference upstream results (e.g., {{ result_of.<node_id>.<field_path> }} or {{ loop.item.xxx }}); do not output object-style bindings.
<node_id> must exist and the fields must align with upstream output_schema or loop.exports.
You have a workflow JSON and a set of structured validation_errors.
validation_errors is a JSON array whose elements contain code/node_id/field/message.
These errors come from:
- Missing action parameters or mismatches with arg_schema
- Incomplete condition definitions
- source paths referencing nonexistent nodes
- source paths not matching the upstream action's output_schema
- Fields missing from the array element schema referenced by source

- previous_failed_attempts records past repair attempts for the same error. Avoid repeating failed methods and try different fix paths.

- workflow structure not conforming to DSL schema (e.g., invalid node type)

Overall goal: repair these errors so the workflow passes static validation while “changing the overall workflow structure as little as possible.”

Specific requirements (very important, follow strictly):
1. Keep the structure stable:
- Do not add or delete nodes;
- edges will be auto-derived by the system based on node references. Do not manually add/remove or adjust conditions.

2. Action node repair priority:
- First, fill missing required fields in params or fix incorrect types based on action_schemas[action_id].arg_schema;
- If the action_id itself is valid (exists in action_schemas), prioritize “fixing params” instead of changing action_id;
- Only when validation_errors explicitly indicate the action_id does not exist should you consider changing action_id to a more reasonable candidate and simultaneously update that node’s params to match the new arg_schema.

3. Condition node repair:
- Ensure params.expression exists and is a valid Jinja boolean expression;
- expression should directly express the logic. To reference upstream results use {{ result_of.<node>.<field_path> }} or {{ loop.item.xxx }};
- Do not generate kind/field/op/value fields; all conditions must be within expression.

4. loop nodes: ensure loop_kind (for_each/while) and source are valid, and expose body results via exports (expressions must reference body_subgraph node fields). Downstream may only reference result_of.<loop_id>.exports.<key>.
5. parallel nodes: branches must be a non-empty array.

6. Parameter binding repairs:
- Only output Jinja expressions or literals; do not fall back to object-style bindings;
- Put aggregation/filtering/concatenation logic directly in Jinja expressions or filters and ensure field paths align with upstream schemas;
- Common error: downstream references missing the exports segment—add result_of.<loop_id>.exports.<key>.

7. Minimize the scope of changes:
- When multiple fixes exist, prefer the minimal change closest to the original intent (e.g., rename a single field instead of rewriting the entire params).
- When validation_error_summary provides schema hints or path information, prioritize adjusting field types/structures per the hint to avoid repeating mistakes.

8. Repair rounds are limited and must close the loop:
- The system revalidates within limited iterations; if you ignore any validation_error, the process may proceed to the next round or terminate.
- Address every item in validation_errors until there are zero issues before outputting results to avoid leaving problems behind.

9. Output requirements:
- Keep the top-level structure workflow_name/description/nodes unchanged (only adjust node internals; edges are inferred by the system);
- Node id/type must remain unchanged;
- You must use the toolchain to apply modifications: adjust the workflow through repair tools and directly use the latest result—avoid natural language or Markdown code blocks.
10. Available tools: when you need structured modifications, prioritize the provided tools (deterministic, no LLM dependency): fix_loop_body_references/fill_action_required_params/update_node_field/normalize_binding_paths/replace_reference_paths/drop_invalid_references.
"""

working_workflow: Dict[str, Any] = broken_workflow
Expand Down
Loading
Loading