Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions tests/test_jinja_normalize_unwrapped_expression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Author: Zhongkai Fu ([email protected])
# License: BSD 3-Clause License

from pathlib import Path
import sys

ROOT_DIR = Path(__file__).resolve().parents[1]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))

from velvetflow.verification.jinja_validation import (
normalize_condition_params_to_jinja,
normalize_params_to_jinja,
)


BASE_WORKFLOW = {
"workflow_name": "normalize_expr",
"description": "",
"nodes": [
{
"id": "fetch",
"type": "action",
"action_id": "hr.get_today_temperatures.v1",
"params": {"date": "2024-01-01"},
},
{
"id": "record",
"type": "action",
"action_id": "hr.record_health_event.v1",
"params": {
"event_type": "temperature",
"date": "result_of.fetch.date",
"abnormal_count": "result_of.fetch.data | length > 0",
},
},
{
"id": "check",
"type": "condition",
"params": {
"expression": "result_of.fetch.data | length > 0",
},
"true_to_node": None,
"false_to_node": None,
},
],
"edges": [
{"from": "fetch", "to": "record"},
{"from": "record", "to": "check"},
],
}


def test_normalize_params_wraps_unwrapped_jinja_expression():
normalized, summary, errors = normalize_params_to_jinja(BASE_WORKFLOW)

assert errors == []
assert summary.get("applied") is True

record_params = normalized["nodes"][1]["params"]
assert record_params["date"] == "{{ result_of.fetch.date }}"
assert record_params["abnormal_count"] == "{{ result_of.fetch.data | length > 0 }}"


def test_normalize_condition_params_wraps_unwrapped_expression():
normalized, summary, errors = normalize_condition_params_to_jinja(BASE_WORKFLOW)

assert errors == []
assert summary.get("applied") is True

condition_params = normalized["nodes"][2]["params"]
assert condition_params["expression"] == "{{ result_of.fetch.data | length > 0 }}"
63 changes: 63 additions & 0 deletions tests/test_jinja_template_reference_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Author: Zhongkai Fu ([email protected])
# License: BSD 3-Clause License

from pathlib import Path
import sys

ROOT_DIR = Path(__file__).resolve().parents[1]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))

from velvetflow.action_registry import BUSINESS_ACTIONS
from velvetflow.verification.validation import validate_completed_workflow

ACTION_REGISTRY = BUSINESS_ACTIONS


def _workflow_with_expression_param(expression: str):
return {
"workflow_name": "jinja_expression_param",
"description": "",
"nodes": [
{
"id": "fetch_temperatures",
"type": "action",
"action_id": "hr.get_today_temperatures.v1",
"params": {"date": "2024-01-01"},
},
{
"id": "record_event",
"type": "action",
"action_id": "hr.record_health_event.v1",
"params": {
"event_type": "temperature",
"date": "{{ result_of.fetch_temperatures.date }}",
"abnormal_count": expression,
},
},
],
"edges": [
{"from": "fetch_temperatures", "to": "record_event"},
],
}


def test_template_expression_with_missing_field_is_reported():
workflow = _workflow_with_expression_param(
"{{ (result_of.fetch_temperatures.data | length) > 0 and result_of.fetch_temperatures.data[0].missing }}"
)
errors = validate_completed_workflow(workflow, action_registry=ACTION_REGISTRY)

assert any(
err.code == "SCHEMA_MISMATCH" and err.field == "abnormal_count"
for err in errors
)


def test_template_expression_with_valid_field_passes():
workflow = _workflow_with_expression_param(
"{{ (result_of.fetch_temperatures.data | length) > 0 and result_of.fetch_temperatures.data[0].temperature }}"
)
errors = validate_completed_workflow(workflow, action_registry=ACTION_REGISTRY)

assert errors == []
29 changes: 29 additions & 0 deletions tests/test_rule_based_repair_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,32 @@ def test_rule_repairs_surface_missing_loop_body():
err.code == "INVALID_LOOP_BODY" and err.field == "body_subgraph"
for err in remaining_errors
)


def test_rule_repairs_drop_invalid_switch_defaults():
workflow = {
"nodes": [
{
"id": "route_by_label",
"type": "switch",
"params": {"source": "{{ result_of.start.label }}", "field": "label"},
"cases": [],
"default_to_node": "null",
},
{"id": "start", "type": "action", "action_id": "demo.start", "params": {}},
],
"edges": [],
}
errors = [
ValidationError(
code="UNDEFINED_REFERENCE",
node_id="route_by_label",
field="default_to_node",
message="switch default branch default_to_node points to nonexistent node 'null'",
)
]

patched, _remaining_errors = apply_rule_based_repairs(workflow, [], errors)

route = next(node for node in patched["nodes"] if node.get("id") == "route_by_label")
assert route.get("default_to_node") is None
38 changes: 38 additions & 0 deletions tests/test_workflow_builder_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,41 @@ def test_builder_adds_switch_nodes_and_infers_edges():
assert any(cond == "default" for _, _, cond in edge_conditions)


def test_builder_replaces_loop_body_exports_references():
builder = WorkflowBuilder()
builder.add_node(
node_id="loop_node",
node_type="loop",
params={
"loop_kind": "for_each",
"source": "{{ result_of.start.items }}",
"item_alias": "item",
"exports": {"total": "{{ result_of.inner.total }}"},
},
)
builder.add_node(
node_id="inner",
node_type="action",
action_id="demo.inner",
params={"value": "{{ loop.item.value }}"},
parent_node_id="loop_node",
)
builder.add_node(
node_id="use_export",
node_type="action",
action_id="demo.use_export",
params={
"value": "{{ exports.total }}",
"raw_value": "exports.total",
},
parent_node_id="loop_node",
)

workflow = builder.to_workflow()
loop_node = _find(workflow["nodes"], "loop_node")
body_nodes = (loop_node.get("params") or {}).get("body_subgraph", {}).get("nodes", [])
use_export = _find(body_nodes, "use_export")

assert use_export["params"]["value"] == "{{ result_of.inner.total }}"
assert use_export["params"]["raw_value"] == "{{ result_of.inner.total }}"

65 changes: 65 additions & 0 deletions velvetflow/jinja_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,71 @@ def _iter_filter_nodes(node: nodes.Node) -> list[nodes.Filter]:
raise ValueError(f"{path} 使用了未注册的测试: {test_arg.value}")


def extract_jinja_reference_paths(expr: str) -> list[str]:
"""Extract dotted reference paths from a Jinja expression.

Returns a list of reference strings such as ``result_of.node.field`` or
``loop.item.name`` derived from attribute/item lookup chains.
"""

if not isinstance(expr, str) or not expr.strip():
return []

env = get_jinja_env()
try:
parsed = env.parse(f"{{{{ {expr} }}}}")
except TemplateError:
return []

def _build_path(node: nodes.Node) -> str | None:
if isinstance(node, nodes.Name):
return node.name
if isinstance(node, nodes.Getattr):
base = _build_path(node.node)
if base:
return f"{base}.{node.attr}"
if isinstance(node, nodes.Getitem):
base = _build_path(node.node)
if not base:
return None
arg = node.arg
if isinstance(arg, nodes.Const):
if isinstance(arg.value, int):
return f"{base}[{arg.value}]"
if isinstance(arg.value, str):
return f"{base}.{arg.value}"
Comment on lines +176 to +180
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve bracket string keys when extracting paths

This conversion treats any string index (e.g. foo['a.b'] or foo['0']) as dotted access, which loses the original bracket semantics. When a schema has property names with dots or numeric strings, extract_jinja_reference_paths will emit foo.a.b or foo.0, and parse_field_path will split or coerce those into nested fields/array indices. That can raise SCHEMA_MISMATCH for templates that are actually valid in Jinja. This regression shows up only when bracket string keys are used in expressions, but in that case the new validation will flag false errors.

Useful? React with 👍 / 👎.

return None
return None

def _collect(node: nodes.Node) -> list[str]:
collected: list[str] = []
path = _build_path(node)
if path:
collected.append(path)
for child in node.iter_child_nodes():
collected.extend(_collect(child))
return collected

raw_paths = _collect(parsed)
unique_paths = list(dict.fromkeys(raw_paths))

def _is_prefix(candidate: str, full: str) -> bool:
if not full.startswith(candidate) or len(full) <= len(candidate):
return False
next_char = full[len(candidate)]
return next_char in {".", "["}

filtered: list[str] = []
for path in unique_paths:
if any(_is_prefix(path, other) for other in unique_paths if other != path):
continue
if "." not in path and "[" not in path:
continue
filtered.append(path)

return filtered


def eval_jinja_expr(expr: str, context: Mapping[str, Any]) -> Any:
"""Evaluate a Jinja expression string with the provided context."""

Expand Down
6 changes: 5 additions & 1 deletion velvetflow/planner/repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,11 @@ def apply_rule_based_repairs(
)
if any(err.code == "UNDEFINED_REFERENCE" for err in validation_errors):
patched_workflow, drop_summary = apply_repair_tool(
"drop_invalid_references", patched_workflow, remove_edges=True
tool_name="drop_invalid_references",
args={"remove_edges": True},
workflow=patched_workflow,
validation_errors=validation_errors,
action_registry=action_registry,
)
log_info(
"[AutoRepair] UNDEFINED_REFERENCE was auto-cleaned; handing off to the LLM for analysis.",
Expand Down
Loading