From 1b92a7448c8812c49d8cf7706e4b12f939d7534a Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sun, 18 Jan 2026 23:27:23 -0800 Subject: [PATCH 1/3] Remove validate_workflow_data tooling --- README.en.md | 3 +- README.md | 16 +- docs/quickstart.en.md | 6 +- docs/quickstart.md | 6 +- docs/troubleshooting.md | 4 - tests/test_loop_body_validation.py | 388 ----------------------------- tests/test_workflow_parser.py | 45 ---- update_workflow.py | 8 +- validate_workflow.py | 241 ------------------ 9 files changed, 15 insertions(+), 702 deletions(-) delete mode 100644 tests/test_loop_body_validation.py delete mode 100644 tests/test_workflow_parser.py delete mode 100644 validate_workflow.py diff --git a/README.en.md b/README.en.md index c0bac879..6fd040ad 100644 --- a/README.en.md +++ b/README.en.md @@ -103,8 +103,7 @@ Key highlights: python execute_workflow.py --workflow-json workflow_output.json python render_workflow_image.py --workflow-json workflow_output.json --output workflow_dag.jpg ``` -6. **Validate or incrementally update**: +6. **Incrementally update an existing workflow**: ```bash - python validate_workflow.py path/to/workflow.json --print-normalized python update_workflow.py path/to/workflow.json --requirement "add an approval step" --output workflow_updated.json ``` diff --git a/README.md b/README.md index bd28f52b..3de663c8 100644 --- a/README.md +++ b/README.md @@ -160,12 +160,7 @@ VelvetFlow (repo root) python render_workflow_image.py --workflow-json workflow_output.json --output workflow_dag.jpg ``` - 读取已有的 workflow JSON,将 DAG 渲染成 JPEG。对于 action 节点,会额外显示调用的工具名称和输入参数。 -7. **校验任意 workflow JSON(可选)** - ```bash - python validate_workflow.py path/to/workflow.json --action-registry tools/business_actions --print-normalized - ``` - - 复用语法/语义解析、`Workflow.model_validate` 与静态规则输出详细错误;`--print-normalized` 可打印归一化后的 DSL。 -8. **在现有 workflow 上迭代需求(可选)** +7. **在现有 workflow 上迭代需求(可选)** ```bash python update_workflow.py path/to/workflow.json --requirement "新增审批环节" --output workflow_updated.json ``` @@ -305,14 +300,13 @@ LLM / Agent SDK 相关节点说明: - 执行器解析 condition 或绑定聚合时,会在运行时填充上下文后求值,并复用相同的语法校验逻辑,异常会带上参数路径便于追踪。【F:velvetflow/executor/conditions.py†L14-L208】【F:velvetflow/bindings.py†L334-L532】 ### 手动调试与排错建议 -1. **先跑校验**:使用 `python validate_workflow.py your_workflow.json --print-normalized`,可以立刻发现重复节点、边引用不存在、loop 子图 schema 不合法等问题。【F:validate_workflow.py†L1-L58】 -2. **检查绑定路径**:如果 Jinja 引用报错,确认 `result_of..` 中的节点是否存在且有对应字段;loop 节点需检查 `exports` 中是否声明了该字段。 +1. **检查绑定路径**:如果 Jinja 引用报错,确认 `result_of..` 中的节点是否存在且有对应字段;loop 节点需检查 `exports` 中是否声明了该字段。 ### 常见绑定警告示例 - **引用了 loop 未导出的字段**:loop 节点的输出仅包含 `params.exports` 声明的字段,不会直接暴露子图节点的字段。引用 loop 内部节点字段会触发校验错误,应改为引用 loop 导出的数组(如 `result_of.loop_each_news.exports.summary`)。 -3. **最小化修改面**:调试时优先修改 `params` 中的绑定表达式或 condition 节点的跳转(`true_to_node`/`false_to_node`),避免破坏整体拓扑。 -4. **模拟执行观察输出**:用 `python execute_workflow.py --workflow-json your_workflow.json`,日志会标明每个节点解析后的参数值,便于确认聚合逻辑是否符合预期。 -5. **可视化辅助**:通过 `python render_workflow_image.py --workflow-json your_workflow.json --output tmp.jpg` 生成 DAG,快速核对节点/边连通性与显示名称。 +2. **最小化修改面**:调试时优先修改 `params` 中的绑定表达式或 condition 节点的跳转(`true_to_node`/`false_to_node`),避免破坏整体拓扑。 +3. **模拟执行观察输出**:用 `python execute_workflow.py --workflow-json your_workflow.json`,日志会标明每个节点解析后的参数值,便于确认聚合逻辑是否符合预期。 +4. **可视化辅助**:通过 `python render_workflow_image.py --workflow-json your_workflow.json --output tmp.jpg` 生成 DAG,快速核对节点/边连通性与显示名称。 ## 示例与 Demo 下面的资源可以帮助快速理解 VelvetFlow 的输入、输出与运行效果: diff --git a/docs/quickstart.en.md b/docs/quickstart.en.md index 2a5d8f96..fe12f8b9 100644 --- a/docs/quickstart.en.md +++ b/docs/quickstart.en.md @@ -49,11 +49,7 @@ python build_action_index.py --output tools/action_index.json --model text-embed python render_workflow_image.py --workflow-json workflow_output.json --output workflow_dag.jpg ``` -### Validate or Incrementally Update -- Validate and print normalized DSL: - ```bash - python validate_workflow.py path/to/workflow.json --print-normalized - ``` +### Incrementally Update - Append new requirements to an existing workflow: ```bash python update_workflow.py path/to/workflow.json --requirement "add an approval step" --output workflow_updated.json diff --git a/docs/quickstart.md b/docs/quickstart.md index 47240dec..384e4154 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -50,11 +50,7 @@ python build_action_index.py --output tools/action_index.json --model text-embed python render_workflow_image.py --workflow-json workflow_output.json --output workflow_dag.jpg ``` -## 校验或增量更新 -- 校验并打印归一化 DSL: - ```bash - python validate_workflow.py path/to/workflow.json --print-normalized - ``` +## 增量更新 - 在已有 workflow 上追加需求: ```bash python update_workflow.py path/to/workflow.json --requirement "新增审批环节" --output workflow_updated.json diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index e73f1fb3..b231c86b 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -15,10 +15,6 @@ - **parallel 分支未执行**:`parallel` 节点当前仅用于前端分组与可视化,执行器不会调度 `branches` 内的节点;请将实际节点放在顶层 `nodes` 并使用 `depends_on`/绑定推导控制顺序。 ## 定位手段 -- **打印归一化 DSL**: - ```bash - python validate_workflow.py path/to/workflow.json --print-normalized - ``` - **查看规划日志**:`build_workflow.py` 会输出需求拆解、Agent 工具调用与返回结果,便于定位 LLM 生成阶段的问题。 - **执行期事件**:`execute_workflow.py` 默认打印节点起止、条件结果、loop 聚合等日志,异步挂起时会输出 `WorkflowSuspension` 细节。 diff --git a/tests/test_loop_body_validation.py b/tests/test_loop_body_validation.py deleted file mode 100644 index 1711f262..00000000 --- a/tests/test_loop_body_validation.py +++ /dev/null @@ -1,388 +0,0 @@ -# Author: Zhongkai Fu (fuzhongkai@gmail.com) -# License: BSD 3-Clause License - -import sys -from pathlib import Path - -import pytest - -ROOT_DIR = Path(__file__).parent.parent -if str(ROOT_DIR) not in sys.path: - sys.path.insert(0, str(ROOT_DIR)) - -from validate_workflow import validate_workflow_data -from velvetflow.action_registry import BUSINESS_ACTIONS -from velvetflow.loop_dsl import index_loop_body_nodes -from velvetflow.models import PydanticValidationError, Workflow -from velvetflow.verification import precheck_loop_body_graphs - -ACTION_REGISTRY = BUSINESS_ACTIONS - - -def test_loop_requires_body_subgraph(): - """Missing body_subgraph should be surfaced during workflow validation.""" - - workflow = { - "workflow_name": "missing_body_subgraph", - "nodes": [ - { - "id": "fetch_items", - "type": "action", - "action_id": "common.search_news.v1", - "params": {"query": "AI"}, - }, - { - "id": "loop_over_items", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "result_of.fetch_items.results", - "item_alias": "item", - }, - }, - ], - "edges": [{"from": "fetch_items", "to": "loop_over_items"}], - } - - errors = validate_workflow_data(workflow, ACTION_REGISTRY) - - assert errors, "Expected validation errors for missing body_subgraph" - assert any( - err.code == "INVALID_LOOP_BODY" and err.field == "body_subgraph" for err in errors - ) - - -def test_model_validation_rejects_missing_body_subgraph(): - """Workflow.model_validate should fail fast when loop body is absent.""" - - workflow = { - "workflow_name": "missing_body_subgraph_model", - "nodes": [ - { - "id": "loop_warning_summary", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "result_of.loop_check_temperature.exports.employee_ids", - "item_alias": "warning_employee", - }, - } - ], - } - - with pytest.raises(PydanticValidationError): - Workflow.model_validate(workflow) - -def test_exports_disallowed_outside_loop_body(): - """Non-loop nodes carrying exports should be rejected early.""" - - workflow = { - "workflow_name": "invalid_exports_location", - "nodes": [ - { - "id": "search_news", - "type": "action", - "action_id": "common.search_news.v1", - "params": { - "query": "AI", - "exports": {"items": "{{ result_of.search_news.results }}"}, - }, - } - ], - "edges": [], - } - - errors = validate_workflow_data(workflow, ACTION_REGISTRY) - - assert errors, "Expected validation errors for misplaced exports" - assert any(err.field == "exports" and err.code == "INVALID_SCHEMA" for err in errors) - - -def test_precheck_is_available_for_planner_users(): - """Planner 层的 precheck 导出应可独立调用。""" - - workflow = { - "workflow_name": "news_summary", - "nodes": [ - { - "id": "loop_summarize", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "result_of.search_news.results", - "body_subgraph": { - "nodes": [ - { - "id": "summarize", - "type": "action", - "action_id": "common.summarize.v1", - "params": {"text": "placeholder"}, - } - ], - "entry": "summarize", - "exit": "missing", - }, - }, - } - ], - } - - errors = precheck_loop_body_graphs(workflow) - - assert errors == [] - - -def test_loop_body_requires_action_node_for_planning(): - """Precheck should surface loop bodies that lack actionable steps.""" - - workflow = { - "workflow_name": "news_summary", - "nodes": [ - { - "id": "loop_summarize", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "result_of.search_news.results", - "item_alias": "news_item", - "body_subgraph": { - "nodes": [ - {"id": "start", "type": "start"}, - { - "id": "guard_branch", - "type": "condition", - "params": {"expression": "{{ (loop.item or []) | length > 0 }}"}, - }, - {"id": "exit", "type": "end"}, - ], - "entry": "guard_branch", - "exit": "exit", - }, - }, - } - ], - } - - errors = precheck_loop_body_graphs(workflow) - - assert any( - err.code == "INVALID_LOOP_BODY" and err.field == "body_subgraph.nodes" for err in errors - ) - - -def test_loop_body_pydantic_errors_are_preserved(): - """Pydantic 校验错误需要原样向上传递,避免被包装成 ValueError。""" - - workflow = { - "workflow_name": "news_summary", - "nodes": [ - { - "id": "loop_summarize", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "result_of.search_news.results", - # body_subgraph 缺少节点 id,会触发 Pydantic 校验错误 - "body_subgraph": {"nodes": [{"type": "action"}], "edges": []}, - }, - } - ], - "edges": [], - } - - with pytest.raises(PydanticValidationError) as exc_info: - Workflow.model_validate(workflow) - - # 错误位置信息应该保留,并标注在 body_subgraph 下,方便修复逻辑使用 - error = exc_info.value.errors()[0] - assert error.get("loc", ())[:3] == ("body_subgraph", "nodes", 0) - - -def test_condition_field_on_loop_item_alias_should_be_allowed(): - """Loop body aliases represent single objects and should allow field access.""" - - workflow = { - "workflow_name": "员工体温健康预警工作流", - "nodes": [ - { - "id": "get_temperatures", - "type": "action", - "action_id": "hr.get_today_temperatures.v1", - "params": {"date": "today"}, - }, - { - "id": "loop_check_each_employee", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "result_of.get_temperatures.data", - "item_alias": "employee_temp", - "body_subgraph": { - "nodes": [ - { - "id": "condition_temp_high", - "type": "condition", - "display_name": "体温是否超过38度", - "params": { - "expression": "{{ employee_temp.temperature > 38 }}", - }, - "true_to_node": "add_to_warning_list", - "false_to_node": None, - }, - { - "id": "add_to_warning_list", - "type": "action", - "display_name": "加入健康预警列表", - "action_id": "hr.update_employee_health_profile.v1", - "params": { - "employee_id": "employee_temp.employee_id", - "last_temperature": "employee_temp.temperature", - "status": "预警", - }, - }, - ], - "entry": "condition_temp_high", - "exit": "add_to_warning_list", - }, - }, - }, - ], - "edges": [{"from": "get_temperatures", "to": "loop_check_each_employee"}], - } - - errors = validate_workflow_data(workflow, ACTION_REGISTRY) - - assert not any( - e.code == "SCHEMA_MISMATCH" - and e.node_id == "condition_temp_high" - and e.field == "field" - for e in errors - ) - - -def test_loop_exports_allow_body_node_reference_without_field(): - """Loop exports may collect entire body nodes, not just their output fields.""" - - workflow = { - "workflow_name": "loop_body_node_exports", - "nodes": [ - { - "id": "search_news", - "type": "action", - "action_id": "common.search_news.v1", - "params": {"query": "AI"}, - }, - { - "id": "loop_collect_summaries", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "result_of.search_news.results", - "item_alias": "item", - "body_subgraph": { - "nodes": [ - { - "id": "summarize_news", - "type": "action", - "action_id": "common.summarize.v1", - "params": {"text": "{{ item.title }}"}, - }, - {"id": "exit", "type": "end"}, - ], - "edges": [{"from": "summarize_news", "to": "exit"}], - "entry": "summarize_news", - "exit": "exit", - }, - "exports": {"summaries": "{{ result_of.summarize_news }}"}, - }, - }, - ], - "edges": [{"from": "search_news", "to": "loop_collect_summaries"}], - } - - errors = validate_workflow_data(workflow, ACTION_REGISTRY) - - assert errors == [] - - -def test_loop_source_allows_constant_template_list(): - """Loops may iterate over literal sequences provided via templated constants.""" - - workflow = { - "workflow_name": "loop_over_constant_contacts", - "nodes": [ - { - "id": "notify_contacts", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": "{{ [{'contact': 'user1@example.com', 'preference': 'dingding'}] }}", - "item_alias": "recipient", - "body_subgraph": { - "nodes": [ - { - "id": "send_digest", - "type": "action", - "action_id": "common.summarize.v1", - "params": {"text": "{{ recipient.contact }}"}, - }, - {"id": "exit", "type": "end"}, - ], - "edges": [{"from": "send_digest", "to": "exit"}], - "entry": "send_digest", - "exit": "exit", - }, - }, - } - ], - "edges": [], - } - - errors = validate_workflow_data(workflow, ACTION_REGISTRY) - - assert errors == [] - - -def test_index_loop_body_nodes_includes_nested_loops(): - workflow = { - "workflow_name": "nested_loop_mapping", - "nodes": [ - { - "id": "outer_loop", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": [], - "item_alias": "item", - "body_subgraph": { - "nodes": [ - { - "id": "inner_loop", - "type": "loop", - "params": { - "loop_kind": "for_each", - "source": [], - "item_alias": "sub_item", - "body_subgraph": { - "nodes": [ - { - "id": "inner_action", - "type": "action", - "action_id": "demo.action", - "params": {}, - } - ] - }, - }, - } - ] - }, - }, - } - ], - } - - mapping = index_loop_body_nodes(workflow) - - assert mapping["inner_loop"] == "outer_loop" - assert mapping["inner_action"] == "inner_loop" diff --git a/tests/test_workflow_parser.py b/tests/test_workflow_parser.py deleted file mode 100644 index 736c24ff..00000000 --- a/tests/test_workflow_parser.py +++ /dev/null @@ -1,45 +0,0 @@ -import sys -from pathlib import Path - -import pytest - -PROJECT_ROOT = Path(__file__).resolve().parents[1] -if str(PROJECT_ROOT) not in sys.path: - sys.path.insert(0, str(PROJECT_ROOT)) - -from validate_workflow import validate_workflow_data -from velvetflow.workflow_parser import IncrementalWorkflowParser, parse_workflow_source - - -def test_parser_reports_syntax_error_and_recovery(): - text = '{"workflow_name": "demo" "nodes": [], "edges": []}' - result = parse_workflow_source(text) - - assert result.syntax_errors - assert result.recovery_edits - assert result.recovered is True - assert result.ast is not None - - -def test_validate_workflow_data_reports_grammar_issues(): - parser_result = parse_workflow_source("{}") - errors = validate_workflow_data(parser_result.ast or {}, [], parser_result=parser_result) - - assert errors - assert errors[0].code == "GRAMMAR_VIOLATION" - assert "nodes" in errors[0].message - - -def test_incremental_parser_reuses_cached_issues(): - parser = IncrementalWorkflowParser() - base = {"workflow_name": "demo", "nodes": [{"id": "n1"}], "edges": []} - - first = parser.parse(base) - assert any(issue.path == "nodes[0].type" for issue in first.grammar_issues) - - updated = {**base, "description": "added"} - second = parser.parse(updated) - - assert second.reused_issues >= 1 - assert "description" in second.changed_paths - assert any(issue.path == "nodes[0].type" for issue in second.grammar_issues) diff --git a/update_workflow.py b/update_workflow.py index a25e5bd5..4a5aa3de 100644 --- a/update_workflow.py +++ b/update_workflow.py @@ -9,13 +9,19 @@ import json import sys from pathlib import Path +from typing import Any +from velvetflow.action_registry import load_actions_from_path, validate_actions from velvetflow.config import OPENAI_MODEL from velvetflow.models import Workflow from velvetflow.planner import update_workflow_with_two_pass from velvetflow.search import build_search_service_from_actions -from validate_workflow import _load_action_registry + + +def _load_action_registry(path: Path) -> list[dict[str, Any]]: + actions = load_actions_from_path(path) + return validate_actions(actions) def _resolve_requirement(args: argparse.Namespace) -> str: diff --git a/validate_workflow.py b/validate_workflow.py deleted file mode 100644 index d4c35591..00000000 --- a/validate_workflow.py +++ /dev/null @@ -1,241 +0,0 @@ -# Author: Zhongkai Fu (fuzhongkai@gmail.com) -# License: BSD 3-Clause License - -"""Standalone Workflow DSL validation tool. - -This module exposes a small CLI that validates a workflow JSON file against -pydantic schema rules and the planner's static validation rules. It reuses the -existing DSL validation logic and produces readable error messages when issues -are found. -""" - -from __future__ import annotations - -import argparse -import json -import sys -from pathlib import Path -from typing import Any, Iterable, List, Mapping - -from velvetflow.action_registry import load_actions_from_path, validate_actions -from velvetflow.models import PydanticValidationError, ValidationError, Workflow -from velvetflow.workflow_parser import WorkflowParseResult, parse_workflow_source -from velvetflow.verification import precheck_loop_body_graphs, validate_completed_workflow -from velvetflow.verification import generate_repair_suggestions -from velvetflow.verification.semantic_analysis import analyze_workflow_semantics - - -def _convert_pydantic_errors( - workflow_raw: Any, error: PydanticValidationError -) -> List[ValidationError]: - """Map Pydantic validation errors to generic ``ValidationError`` objects.""" - - nodes = [] - if isinstance(workflow_raw, dict): - nodes = workflow_raw.get("nodes") or [] - - def _node_id_from_index(index: int): - if 0 <= index < len(nodes): - node = nodes[index] - if isinstance(node, dict): - return node.get("id") - if hasattr(node, "id"): - return getattr(node, "id") - return None - - validation_errors: List[ValidationError] = [] - for err in error.errors(): - loc = err.get("loc", ()) or () - msg = err.get("msg", "") - - node_id = None - field = None - - if loc: - if loc[0] == "nodes" and len(loc) >= 2 and isinstance(loc[1], int): - node_id = _node_id_from_index(loc[1]) - if len(loc) >= 3: - field = str(loc[2]) - elif loc[0] == "edges" and len(loc) >= 2 and isinstance(loc[1], int): - if len(loc) >= 3 and isinstance(loc[-1], str): - field = str(loc[-1]) - else: - field = "edges" - else: - field = ".".join(str(part) for part in loc) - - validation_errors.append( - ValidationError( - code="INVALID_SCHEMA", - node_id=node_id, - field=field, - message=msg, - ) - ) - - return validation_errors - - -def _load_action_registry(path: Path) -> List[dict[str, Any]]: - actions = load_actions_from_path(path) - return validate_actions(actions) - - -def _format_errors(errors: Iterable[ValidationError]) -> str: - lines = [] - for idx, err in enumerate(errors, start=1): - location_bits = [] - if err.node_id: - location_bits.append(f"node={err.node_id}") - if err.field: - location_bits.append(f"field={err.field}") - location = f" ({', '.join(location_bits)})" if location_bits else "" - lines.append(f"{idx}. [{err.code}]{location} {err.message}") - return "\n".join(lines) - - -def _convert_parser_issues(parse_result: WorkflowParseResult) -> List[ValidationError]: - parser_errors: List[ValidationError] = [] - - for syntax_err in parse_result.syntax_errors: - message = f"语法错误(行 {syntax_err.line}, 列 {syntax_err.column}):{syntax_err.message}" - if syntax_err.expected: - message += f";期望: {', '.join(syntax_err.expected)}" - parser_errors.append( - ValidationError( - code="SYNTAX_ERROR", - node_id=None, - field=None, - message=message, - ) - ) - - for issue in parse_result.grammar_issues: - message = issue.message - if issue.expected: - message += f";期望: {', '.join(issue.expected)}" - if issue.recovery: - message += f"(建议 {issue.recovery.description})" - parser_errors.append( - ValidationError( - code="GRAMMAR_VIOLATION", - node_id=issue.node_id, - field=issue.path, - message=message, - ) - ) - - return parser_errors - - -def validate_workflow_data( - workflow_raw: Any, action_registry: List[dict[str, Any]], *, parser_result: WorkflowParseResult | None = None -) -> List[ValidationError]: - """Validate workflow DSL structure and static rules. - - The function first runs grammar-aware parsing, then Pydantic schema - validation, then applies the planner's static rules (action references, - graph connectivity, bindings, etc.). All found errors are returned for - reporting by the caller. - """ - - errors: List[ValidationError] = [] - - parse_result = parser_result or parse_workflow_source(workflow_raw) - errors.extend(_convert_parser_issues(parse_result)) - if errors: - return errors - - workflow_parsed = parse_result.ast if parse_result.ast is not None else workflow_raw - - semantic_errors = analyze_workflow_semantics(workflow_parsed, action_registry) - errors.extend(semantic_errors) - - precheck_errors = precheck_loop_body_graphs(workflow_parsed) - errors.extend(precheck_errors) - - try: - workflow_model = Workflow.model_validate(workflow_parsed) - except PydanticValidationError as exc: # pragma: no cover - exercised via unit test - errors.extend(_convert_pydantic_errors(workflow_parsed, exc)) - return errors - except Exception as exc: # pragma: no cover - errors.append( - ValidationError( - code="INVALID_SCHEMA", - node_id=None, - field=None, - message=str(exc), - ) - ) - return errors - - workflow_dict = workflow_model.model_dump(by_alias=True) - errors.extend(validate_completed_workflow(workflow_dict, action_registry)) - return errors - - -def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser(description="Validate a workflow DSL JSON file.") - parser.add_argument("workflow", type=Path, help="Path to workflow JSON file") - parser.add_argument( - "--action-registry", - type=Path, - default=Path(__file__).resolve().parent / "tools" / "business_actions", - help="Path to action registry directory or JSON file (defaults to built-in registry).", - ) - parser.add_argument( - "--print-normalized", - action="store_true", - help="Print normalized workflow JSON after successful validation.", - ) - parser.add_argument( - "--suggest-fixes", - action="store_true", - help="在校验失败时输出基于 AST 模板/约束求解的修复建议。", - ) - args = parser.parse_args(argv) - - try: - workflow_text = args.workflow.read_text(encoding="utf-8") - except FileNotFoundError: - print(f"找不到 workflow 文件: {args.workflow}", file=sys.stderr) - return 2 - - try: - action_registry = _load_action_registry(args.action_registry) - except Exception as exc: # pragma: no cover - CLI convenience - print(f"加载 Action Registry 失败: {exc}", file=sys.stderr) - return 2 - - parser_result = parse_workflow_source(workflow_text) - errors = validate_workflow_data(parser_result.ast or {}, action_registry, parser_result=parser_result) - if errors: - print("校验未通过,发现以下问题:", file=sys.stderr) - print(_format_errors(errors), file=sys.stderr) - if args.suggest_fixes: - patched, suggestions = generate_repair_suggestions( - parser_result.ast or {}, action_registry, errors=errors - ) - if suggestions: - print("\n自动修复建议:", file=sys.stderr) - for idx, suggestion in enumerate(suggestions, start=1): - print( - f"{idx}. [{suggestion.strategy}] {suggestion.description}" - f"(路径: {suggestion.path}, 置信度: {suggestion.confidence:.2f})", - file=sys.stderr, - ) - print(f" 建议补丁: {suggestion.patch}", file=sys.stderr) - else: - print("未生成自动修复建议。", file=sys.stderr) - return 1 - - print("workflow DSL 校验通过。") - if args.print_normalized: - normalized = Workflow.model_validate(parser_result.ast or {}).model_dump(by_alias=True) - print(json.dumps(normalized, ensure_ascii=False, indent=2)) - return 0 - - -if __name__ == "__main__": - sys.exit(main()) From ef11094ea510546b6c0db4ae372fe258d5814d7d Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sun, 18 Jan 2026 23:38:54 -0800 Subject: [PATCH 2/3] Add strftime helper to Jinja environment --- velvetflow/jinja_utils.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/velvetflow/jinja_utils.py b/velvetflow/jinja_utils.py index f2ae9fce..7931aa43 100644 --- a/velvetflow/jinja_utils.py +++ b/velvetflow/jinja_utils.py @@ -72,7 +72,29 @@ def _format_date(value: Any, fmt: str = "yyyy-MM-dd") -> str: ) return dt.strftime(format_str) + def _strftime(value: Any = None, fmt: str = "%Y-%m-%d") -> str: + if value in {None, ""}: + dt = datetime.now() + elif isinstance(value, datetime): + dt = value + elif isinstance(value, date_cls): + dt = datetime.combine(value, datetime.min.time()) + elif isinstance(value, str) and value.strip().lower() in {"now", "today"}: + dt = datetime.now() + elif isinstance(value, str): + try: + dt = datetime.fromisoformat(value) + except Exception: + try: + dt = datetime.combine(date_cls.fromisoformat(value), datetime.min.time()) + except Exception: + dt = datetime.now() + else: + dt = datetime.now() + return dt.strftime(fmt) + env.filters.setdefault("date", _format_date) + env.globals.setdefault("strftime", _strftime) env.tests.setdefault("truthy", lambda value: bool(value)) return env @@ -158,6 +180,8 @@ def render_jinja_template(template: str, context: Mapping[str, Any]) -> str: env = get_jinja_env() return env.from_string(template).render(_prepare_context(context)) + + class _AttrDict(dict): def __getattribute__(self, item: str) -> Any: # pragma: no cover - small wrapper if item in ("__class__", "__iter__", "__len__", "__getitem__", "__setitem__"): From 865aae7ead5d8a822ef997437ba3034ebf2c622d Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sun, 18 Jan 2026 23:41:23 -0800 Subject: [PATCH 3/3] Register strftime as a Jinja filter --- velvetflow/jinja_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/velvetflow/jinja_utils.py b/velvetflow/jinja_utils.py index 7931aa43..0f1422a9 100644 --- a/velvetflow/jinja_utils.py +++ b/velvetflow/jinja_utils.py @@ -94,6 +94,7 @@ def _strftime(value: Any = None, fmt: str = "%Y-%m-%d") -> str: return dt.strftime(fmt) env.filters.setdefault("date", _format_date) + env.filters.setdefault("strftime", _strftime) env.globals.setdefault("strftime", _strftime) env.tests.setdefault("truthy", lambda value: bool(value)) return env