From d0bce800f53ba40abcffbe09af60401c16481a97 Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Sun, 18 Jan 2026 23:59:39 -0800 Subject: [PATCH 1/2] Validate loop depends_on against subgraph --- velvetflow/planner/structure.py | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 5ff2855..06fce43 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -1048,6 +1048,31 @@ def _build_validation_error(message: str, **extra: Any) -> Dict[str, Any]: payload.update(extra) return payload + def _build_loop_depends_on_subgraph_conflict( + *, loop_id: str, depends_on: List[str], sub_graph_nodes: List[str] + ) -> Dict[str, Any] | None: + depends_on_set = {dep for dep in depends_on if isinstance(dep, str)} + sub_graph_set = {nid for nid in sub_graph_nodes if isinstance(nid, str)} + overlap = sorted(depends_on_set.intersection(sub_graph_set)) + if not overlap: + return None + suggestion = ( + "Loop node depends_on includes nodes inside its body_subgraph. " + f"loop_id={loop_id}, depends_on={sorted(depends_on_set)}, " + f"sub_graph_nodes={sorted(sub_graph_set)}. " + "Please rename the node name either in depends_on or in the loop subgraph " + "to avoid this overlap." + ) + return { + "status": "needs_more_work", + "message": "loop 节点的 depends_on 不能包含 body_subgraph 内的节点,请调整后重试。", + "node_id": loop_id, + "depends_on": sorted(depends_on_set), + "sub_graph_nodes": sorted(sub_graph_set), + "overlap_nodes": overlap, + "requirements_suggestions": [suggestion], + } + def _collect_result_of_node_ids(params: Mapping[str, Any]) -> set[str]: node_ids: set[str] = set() @@ -1387,6 +1412,13 @@ def add_loop_node( if sub_graph_error: result = {"status": "error", **sub_graph_error} return _return_tool_result("add_loop_node", result) + conflict_error = _build_loop_depends_on_subgraph_conflict( + loop_id=id, + depends_on=depends_on or [], + sub_graph_nodes=normalized_nodes, + ) + if conflict_error: + return _return_tool_result("add_loop_node", conflict_error) merged_params = dict(params or {}) merged_params.update({"loop_kind": loop_kind, "source": source, "item_alias": item_alias}) @@ -2007,6 +2039,24 @@ def update_loop_node( result = {"status": "error", **sub_graph_error} return _return_tool_result("update_loop_node", result) + existing_depends_on = builder.nodes.get(id, {}).get("depends_on") if isinstance(builder.nodes.get(id), dict) else [] + if not isinstance(existing_depends_on, list): + existing_depends_on = [] + effective_depends_on = depends_on if depends_on is not None else existing_depends_on + current_body_nodes = [ + node_id + for node_id, node in builder.nodes.items() + if isinstance(node, dict) and node.get("parent_node_id") == id + ] + effective_sub_graph_nodes = normalized_nodes if sub_graph_nodes is not None else current_body_nodes + conflict_error = _build_loop_depends_on_subgraph_conflict( + loop_id=id, + depends_on=effective_depends_on or [], + sub_graph_nodes=effective_sub_graph_nodes, + ) + if conflict_error: + return _return_tool_result("update_loop_node", conflict_error) + updates: Dict[str, Any] = {} if display_name is not None: updates["display_name"] = display_name From 4131e83e9d7e3e0cdd2b9d7451b0ae52256cba51 Mon Sep 17 00:00:00 2001 From: Zhongkai Fu Date: Mon, 19 Jan 2026 00:26:45 -0800 Subject: [PATCH 2/2] Require loop source on create and update --- velvetflow/planner/structure.py | 37 +++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 06fce43..d20ed59 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -1073,6 +1073,27 @@ def _build_loop_depends_on_subgraph_conflict( "requirements_suggestions": [suggestion], } + def _build_loop_source_required( + *, loop_id: str, source: Any, params: Mapping[str, Any] | None = None + ) -> Dict[str, Any] | None: + is_empty_string = isinstance(source, str) and not source.strip() + is_empty_mapping = isinstance(source, Mapping) and not source + if source is None or is_empty_string or is_empty_mapping: + suggestion = ( + "Loop node requires a non-empty source value. " + f"loop_id={loop_id}, current_source={source}, params={params or {}}. " + "Please re-call add_loop_node/update_loop_node with a valid source." + ) + return { + "status": "needs_more_work", + "message": "loop 节点的 source 字段不能为空,请补充后重试。", + "node_id": loop_id, + "source": source, + "params": params or {}, + "requirements_suggestions": [suggestion], + } + return None + def _collect_result_of_node_ids(params: Mapping[str, Any]) -> set[str]: node_ids: set[str] = set() @@ -1407,6 +1428,9 @@ def add_loop_node( invalid_fields=invalid_fields, ) return _return_tool_result("add_loop_node", result) + source_error = _build_loop_source_required(loop_id=id, source=source, params=params or {}) + if source_error: + return _return_tool_result("add_loop_node", source_error) normalized_nodes, sub_graph_error = _normalize_sub_graph_nodes(sub_graph_nodes, builder=builder) if sub_graph_error: @@ -2034,6 +2058,19 @@ def update_loop_node( result = _build_validation_error("loop 节点的 params 需要是对象。") return _return_tool_result("update_loop_node", result) + existing_params = builder.nodes.get(id, {}).get("params") if isinstance(builder.nodes.get(id), dict) else {} + if not isinstance(existing_params, Mapping): + existing_params = {} + effective_params = params if params is not None else dict(existing_params) + effective_source = ( + effective_params.get("source") if isinstance(effective_params, Mapping) else None + ) + source_error = _build_loop_source_required( + loop_id=id, source=effective_source, params=effective_params + ) + if source_error: + return _return_tool_result("update_loop_node", source_error) + normalized_nodes, sub_graph_error = _normalize_sub_graph_nodes(sub_graph_nodes, builder=builder) if sub_graph_error: result = {"status": "error", **sub_graph_error}