Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions velvetflow/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ def _validate_result_reference(self, src_path: str) -> None:

# 控制节点(condition 等)也允许被引用,缺少 action_id 时跳过 schema 校验
if node.type != "action" or not node.action_id:
output_schema = node.out_params_schema
field_path = parts[2:]
if output_schema and field_path and self._schema_has_path(output_schema, field_path):
return
if output_schema and not field_path:
return
return
Comment on lines +704 to 708
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject invalid from paths for data nodes

When a non-action node has out_params_schema, this branch returns even if the requested field_path is not in the schema. That means result_of.<dataNode>.missingField passes validation and only fails later at runtime when _get_from_context raises a KeyError. This is a regression from the stated goal (“allow … when out_params_schema provides the requested path”): it should error when the schema exists but the path is absent, otherwise broken bindings slip through plan-time checks and crash execution.

Useful? React with 👍 / 👎.


action_id = node.action_id
Expand Down
28 changes: 28 additions & 0 deletions velvetflow/executor/dynamic_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,34 @@ def _execute_graph(
)
continue

if ntype == "data":
resolved_params = eval_node_params(node_model, binding_ctx)
log_json("resolved params", resolved_params)

payload = {
"schema": resolved_params.get("schema"),
"dataset": resolved_params.get("dataset"),
}
results[nid] = payload
self._record_node_metrics(payload)
next_ids = self._next_nodes(edges, nid, nodes_data=nodes_data)
for nxt in next_ids:
if nxt not in visited:
reachable.add(nxt)
log_event(
"node_end",
{
"node_id": nid,
"type": ntype,
"resolved_params": resolved_params,
"result": payload,
"next_nodes": next_ids,
},
node_id=nid,
action_id=action_id,
)
continue

if ntype == "condition":
cond_eval = self._eval_condition(node, binding_ctx, include_debug=True)
if isinstance(cond_eval, tuple) and len(cond_eval) == 2:
Expand Down
35 changes: 34 additions & 1 deletion velvetflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def model_validate(cls, data: Any) -> "Node":
if not isinstance(node_type, str):
errors.append({"loc": ("type",), "msg": "节点类型必须是字符串"})
else:
allowed = {"action", "condition", "switch", "loop", "parallel", "reasoning"}
allowed = {"action", "condition", "switch", "loop", "parallel", "reasoning", "data"}
if node_type not in allowed:
errors.append({
"loc": ("type",),
Expand Down Expand Up @@ -442,6 +442,26 @@ def model_validate(cls, data: Any) -> "Node":
depends_on=depends_on,
)

if node_type == "data":
out_params_schema = data.get("out_params_schema")
if out_params_schema is not None and not isinstance(out_params_schema, Mapping):
errors.append(
{
"loc": ("out_params_schema",),
"msg": "out_params_schema 必须是对象",
}
)
if errors:
raise PydanticValidationError(errors)

return DataNode(
id=node_id,
display_name=data.get("display_name"),
params=dict(params),
out_params_schema=out_params_schema if isinstance(out_params_schema, Mapping) else None,
depends_on=depends_on,
)

if node_type == "condition":
true_to_node = data.get("true_to_node")
false_to_node = data.get("false_to_node")
Expand Down Expand Up @@ -572,6 +592,19 @@ def model_dump(self, *, by_alias: bool = False) -> Dict[str, Any]:
return data


@dataclass
class DataNode(Node):
"""Data node holding schema and dataset."""

type: Literal["data"] = "data"
out_params_schema: Optional[Dict[str, Any]] = None

def model_dump(self, *, by_alias: bool = False) -> Dict[str, Any]:
data = super().model_dump(by_alias=by_alias)
data["out_params_schema"] = self.out_params_schema
return data


@dataclass
class Edge:
"""Directed edge between workflow nodes."""
Expand Down
Loading
Loading