diff --git a/velvetflow/planner/node_types/__init__.py b/velvetflow/planner/node_types/__init__.py new file mode 100644 index 0000000..a9c7532 --- /dev/null +++ b/velvetflow/planner/node_types/__init__.py @@ -0,0 +1,46 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Node-type specific definitions and helper utilities.""" + +from velvetflow.planner.node_types.action import ACTION_NODE_FIELDS, build_action_schema_map +from velvetflow.planner.node_types.condition import CONDITION_NODE_FIELDS, CONDITION_PARAM_FIELDS +from velvetflow.planner.node_types.data import DATA_NODE_FIELDS, DATA_PARAM_FIELDS, build_data_node_output_schema +from velvetflow.planner.node_types.loop import ( + LOOP_NODE_FIELDS, + LOOP_PARAM_FIELDS, + ensure_loop_items_fields, + extract_loop_body_context, + fallback_loop_exports, + validate_loop_exports, +) +from velvetflow.planner.node_types.reasoning import REASONING_NODE_FIELDS, REASONING_PARAM_FIELDS +from velvetflow.planner.node_types.sanitizers import ( + filter_supported_params, + sanitize_builder_node_fields, + sanitize_builder_node_params, +) +from velvetflow.planner.node_types.switch import SWITCH_NODE_FIELDS, SWITCH_PARAM_FIELDS + +__all__ = [ + "ACTION_NODE_FIELDS", + "build_action_schema_map", + "CONDITION_NODE_FIELDS", + "CONDITION_PARAM_FIELDS", + "DATA_NODE_FIELDS", + "DATA_PARAM_FIELDS", + "LOOP_NODE_FIELDS", + "LOOP_PARAM_FIELDS", + "REASONING_NODE_FIELDS", + "REASONING_PARAM_FIELDS", + "SWITCH_NODE_FIELDS", + "SWITCH_PARAM_FIELDS", + "build_data_node_output_schema", + "ensure_loop_items_fields", + "extract_loop_body_context", + "fallback_loop_exports", + "filter_supported_params", + "sanitize_builder_node_fields", + "sanitize_builder_node_params", + "validate_loop_exports", +] diff --git a/velvetflow/planner/node_types/action.py b/velvetflow/planner/node_types/action.py new file mode 100644 index 0000000..af2ac87 --- /dev/null +++ b/velvetflow/planner/node_types/action.py @@ -0,0 +1,35 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Action node definitions.""" + +from typing import Any, Dict, List + +ACTION_NODE_FIELDS = { + "id", + "type", + "action_id", + "display_name", + "params", + "out_params_schema", + "parent_node_id", + "depends_on", +} + + +def build_action_schema_map(action_registry: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: + """Build a lookup map for action schemas keyed by action_id.""" + + action_schemas: Dict[str, Dict[str, Any]] = {} + for action in action_registry: + aid = action.get("action_id") + if not aid: + continue + action_schemas[aid] = { + "name": action.get("name", ""), + "description": action.get("description", ""), + "domain": action.get("domain", ""), + "arg_schema": action.get("arg_schema"), + "output_schema": action.get("output_schema"), + } + return action_schemas diff --git a/velvetflow/planner/node_types/condition.py b/velvetflow/planner/node_types/condition.py new file mode 100644 index 0000000..82a99a7 --- /dev/null +++ b/velvetflow/planner/node_types/condition.py @@ -0,0 +1,17 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Condition node definitions.""" + +CONDITION_PARAM_FIELDS = {"expression"} + +CONDITION_NODE_FIELDS = { + "id", + "type", + "display_name", + "params", + "true_to_node", + "false_to_node", + "parent_node_id", + "depends_on", +} diff --git a/velvetflow/planner/node_types/data.py b/velvetflow/planner/node_types/data.py new file mode 100644 index 0000000..a6d3318 --- /dev/null +++ b/velvetflow/planner/node_types/data.py @@ -0,0 +1,52 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Data node definitions and helpers.""" + +from typing import Any, Dict, Mapping + +DATA_PARAM_FIELDS = { + "schema", + "dataset", +} + +DATA_NODE_FIELDS = { + "id", + "type", + "display_name", + "params", + "out_params_schema", + "parent_node_id", + "depends_on", +} + + +def build_data_node_output_schema(schema: Any) -> Dict[str, Any]: + properties: Dict[str, Any] = {} + if isinstance(schema, list): + for field in schema: + if not isinstance(field, Mapping): + continue + name = field.get("name") + if not isinstance(name, str) or not name: + continue + field_type = field.get("type") if isinstance(field.get("type"), str) else "string" + description = field.get("description") if isinstance(field.get("description"), str) else "" + properties[name] = {"type": field_type, "description": description} + + return { + "type": "object", + "properties": { + "dataset": { + "type": "array", + "items": { + "type": "object", + "properties": properties, + }, + }, + "schema": { + "type": "array", + "items": {"type": "object"}, + }, + }, + } diff --git a/velvetflow/planner/node_types/loop.py b/velvetflow/planner/node_types/loop.py new file mode 100644 index 0000000..7df52d6 --- /dev/null +++ b/velvetflow/planner/node_types/loop.py @@ -0,0 +1,147 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Loop node definitions and helpers.""" + +from typing import Any, Dict, List, Mapping, Optional + +from velvetflow.reference_utils import parse_field_path +from velvetflow.verification.binding_checks import _iter_template_references + +LOOP_PARAM_FIELDS = { + "loop_kind", + "source", + "condition", + "item_alias", + "body_subgraph", + "exports", +} + +LOOP_NODE_FIELDS = { + "id", + "type", + "display_name", + "params", + "parent_node_id", + "depends_on", +} + + +def extract_loop_body_context( + loop_node: Mapping[str, Any], action_schemas: Mapping[str, Mapping[str, Any]] +) -> Dict[str, Any]: + params = loop_node.get("params") if isinstance(loop_node, Mapping) else None + body = params.get("body_subgraph") if isinstance(params, Mapping) else None + if not isinstance(body, Mapping): + return {"nodes": []} + + context_nodes = [] + for child in body.get("nodes", []) or []: + if not isinstance(child, Mapping): + continue + action_id = child.get("action_id") + schema = action_schemas.get(action_id, {}) if isinstance(action_id, str) else {} + context_nodes.append( + { + "id": child.get("id"), + "type": child.get("type"), + "action_id": action_id, + "display_name": child.get("display_name"), + "output_schema": schema.get("output_schema"), + } + ) + + return {"nodes": context_nodes} + + +def validate_loop_exports(*, loop_node: Mapping[str, Any], exports: Mapping[str, Any]) -> List[str]: + params = loop_node.get("params") if isinstance(loop_node.get("params"), Mapping) else {} + body = params.get("body_subgraph") if isinstance(params, Mapping) else None + if not isinstance(body, Mapping): + body = {} + + body_nodes = [bn for bn in body.get("nodes", []) or [] if isinstance(bn, Mapping)] + body_ids = {bn.get("id") for bn in body_nodes if isinstance(bn.get("id"), str)} + + errors: List[str] = [] + + if not isinstance(exports, Mapping): + return ["exports 必须是对象"] + + for key, value in exports.items(): + if not isinstance(key, str): + errors.append("exports 的 key 必须是字符串") + continue + if not isinstance(value, str) or not value.strip(): + errors.append(f"exports.{key} 必须是非空 Jinja 表达式字符串") + continue + + refs = list(_iter_template_references(value)) + if not refs: + errors.append(f"exports.{key} 必须引用 loop body 节点的输出字段") + continue + + has_body_ref = False + for ref in refs: + try: + tokens = parse_field_path(ref) + except Exception: + continue + if len(tokens) < 3 or tokens[0] != "result_of": + continue + ref_node = tokens[1] + if isinstance(ref_node, str) and ref_node in body_ids: + has_body_ref = True + else: + errors.append(f"exports.{key} 只能引用 body_subgraph 内的节点输出") + break + if not has_body_ref and not errors: + errors.append(f"exports.{key} 必须引用 loop body 节点的输出字段") + + return errors + + +def fallback_loop_exports( + loop_node: Mapping[str, Any], action_schemas: Mapping[str, Mapping[str, Any]] +) -> Optional[Dict[str, Any]]: + params = loop_node.get("params") if isinstance(loop_node, Mapping) else None + if not isinstance(params, Mapping): + return None + body = params.get("body_subgraph") + if not isinstance(body, Mapping): + return None + + body_nodes = [bn for bn in body.get("nodes", []) or [] if isinstance(bn, Mapping)] + body_ids = [bn.get("id") for bn in body_nodes if isinstance(bn.get("id"), str)] + exit_node = body.get("exit") if isinstance(body.get("exit"), str) else None + from_node = exit_node if exit_node in body_ids else (body_ids[0] if body_ids else None) + if not from_node: + return None + + target_node = next((bn for bn in body_nodes if bn.get("id") == from_node), None) + field_name = "status" + if isinstance(target_node, Mapping): + action_id = target_node.get("action_id") + schema = action_schemas.get(action_id, {}) if isinstance(action_id, str) else {} + props = ( + schema.get("output_schema", {}).get("properties") + if isinstance(schema.get("output_schema"), Mapping) + else None + ) + if isinstance(props, Mapping): + field_name = next((k for k in props.keys() if isinstance(k, str)), field_name) + + return { + "items": f"{{{{ result_of.{from_node}.{field_name} }}}}", + } + + +def ensure_loop_items_fields( + *, + exports: Mapping[str, Any], + loop_node: Mapping[str, Any], + action_schemas: Mapping[str, Mapping[str, Any]], +) -> Dict[str, Any]: + """Placeholder pass-through for loop exports.""" + + return dict(exports) diff --git a/velvetflow/planner/node_types/reasoning.py b/velvetflow/planner/node_types/reasoning.py new file mode 100644 index 0000000..92149c8 --- /dev/null +++ b/velvetflow/planner/node_types/reasoning.py @@ -0,0 +1,22 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Reasoning node definitions.""" + +REASONING_PARAM_FIELDS = { + "system_prompt", + "task_prompt", + "context", + "expected_output_format", + "toolset", +} + +REASONING_NODE_FIELDS = { + "id", + "type", + "display_name", + "params", + "out_params_schema", + "parent_node_id", + "depends_on", +} diff --git a/velvetflow/planner/node_types/sanitizers.py b/velvetflow/planner/node_types/sanitizers.py new file mode 100644 index 0000000..cf9d32c --- /dev/null +++ b/velvetflow/planner/node_types/sanitizers.py @@ -0,0 +1,103 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Node field/param sanitization helpers.""" + +from typing import Any, Dict, List, Mapping, Optional + +from velvetflow.planner.workflow_builder import WorkflowBuilder +from velvetflow.planner.node_types.action import ACTION_NODE_FIELDS +from velvetflow.planner.node_types.condition import CONDITION_NODE_FIELDS, CONDITION_PARAM_FIELDS +from velvetflow.planner.node_types.data import DATA_NODE_FIELDS, DATA_PARAM_FIELDS +from velvetflow.planner.node_types.loop import LOOP_NODE_FIELDS, LOOP_PARAM_FIELDS +from velvetflow.planner.node_types.reasoning import REASONING_NODE_FIELDS, REASONING_PARAM_FIELDS +from velvetflow.planner.node_types.switch import SWITCH_NODE_FIELDS, SWITCH_PARAM_FIELDS + + +def filter_supported_params( + *, + node_type: str, + params: Any, + action_schemas: Mapping[str, Mapping[str, Any]], + action_id: Optional[str] = None, +) -> tuple[Dict[str, Any], List[str]]: + """Keep only supported param fields for the given node type.""" + + if not isinstance(params, Mapping): + return {}, [] + + allowed_fields: Optional[set[str]] = None + if node_type == "condition": + allowed_fields = set(CONDITION_PARAM_FIELDS) + elif node_type == "reasoning": + allowed_fields = set(REASONING_PARAM_FIELDS) + elif node_type == "data": + allowed_fields = set(DATA_PARAM_FIELDS) + elif node_type == "switch": + allowed_fields = set(SWITCH_PARAM_FIELDS) + elif node_type == "loop": + allowed_fields = set(LOOP_PARAM_FIELDS) + elif node_type == "action" and action_id: + schema = action_schemas.get(action_id, {}) if isinstance(action_id, str) else {} + properties = schema.get("arg_schema", {}).get("properties") if isinstance(schema.get("arg_schema"), Mapping) else None + if isinstance(properties, Mapping): + allowed_fields = set(properties.keys()) + + if not allowed_fields: + return dict(params), [] + + cleaned: Dict[str, Any] = {k: v for k, v in params.items() if k in allowed_fields} + removed = [k for k in params if k not in allowed_fields] + + return cleaned, removed + + +def sanitize_builder_node_params( + builder: WorkflowBuilder, node_id: str, action_schemas: Mapping[str, Mapping[str, Any]] +) -> List[str]: + node = builder.nodes.get(node_id) + if not isinstance(node, Mapping): + return [] + + params = node.get("params") or {} + cleaned, removed = filter_supported_params( + node_type=str(node.get("type")), + params=params, + action_schemas=action_schemas, + action_id=node.get("action_id") if isinstance(node.get("action_id"), str) else None, + ) + + if removed: + node["params"] = cleaned + + return removed + + +def sanitize_builder_node_fields(builder: WorkflowBuilder, node_id: str) -> List[str]: + node = builder.nodes.get(node_id) + if not isinstance(node, Mapping): + return [] + + node_type = node.get("type") + allowed_fields: Optional[set[str]] = None + if node_type == "action": + allowed_fields = set(ACTION_NODE_FIELDS) + elif node_type == "reasoning": + allowed_fields = set(REASONING_NODE_FIELDS) + elif node_type == "condition": + allowed_fields = set(CONDITION_NODE_FIELDS) + elif node_type == "switch": + allowed_fields = set(SWITCH_NODE_FIELDS) + elif node_type == "loop": + allowed_fields = set(LOOP_NODE_FIELDS) + elif node_type == "data": + allowed_fields = set(DATA_NODE_FIELDS) + + if not allowed_fields: + return [] + + removed_keys = [key for key in list(node.keys()) if key not in allowed_fields] + for key in removed_keys: + node.pop(key, None) + + return removed_keys diff --git a/velvetflow/planner/node_types/switch.py b/velvetflow/planner/node_types/switch.py new file mode 100644 index 0000000..03c34e0 --- /dev/null +++ b/velvetflow/planner/node_types/switch.py @@ -0,0 +1,20 @@ +# Author: Zhongkai Fu (fuzhongkai@gmail.com) +# License: BSD 3-Clause License + +"""Switch node definitions.""" + +SWITCH_PARAM_FIELDS = { + "source", + "field", +} + +SWITCH_NODE_FIELDS = { + "id", + "type", + "display_name", + "params", + "cases", + "default_to_node", + "parent_node_id", + "depends_on", +} diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index 4daae4e..4733a11 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -31,7 +31,14 @@ infer_edges_from_bindings, ) from velvetflow.planner.relations import get_referenced_nodes -from velvetflow.reference_utils import normalize_reference_path, parse_field_path +from velvetflow.planner.node_types import ( + build_action_schema_map, + build_data_node_output_schema, + filter_supported_params, + sanitize_builder_node_fields, + sanitize_builder_node_params, +) +from velvetflow.reference_utils import normalize_reference_path from velvetflow.verification.validation import ( _check_array_item_field, _check_output_path_against_schema, @@ -39,77 +46,6 @@ from velvetflow.verification.binding_checks import _iter_template_references -CONDITION_PARAM_FIELDS = {"expression"} - -REASONING_PARAM_FIELDS = { - "system_prompt", - "task_prompt", - "context", - "expected_output_format", - "toolset", -} - -DATA_PARAM_FIELDS = { - "schema", - "dataset", -} - -SWITCH_PARAM_FIELDS = { - "source", - "field", -} - -LOOP_PARAM_FIELDS = { - "loop_kind", - "source", - "condition", - "item_alias", - "body_subgraph", - "exports", -} - -ACTION_NODE_FIELDS = { - "id", - "type", - "action_id", - "display_name", - "params", - "out_params_schema", - "parent_node_id", - "depends_on", -} - -REASONING_NODE_FIELDS = { - "id", - "type", - "display_name", - "params", - "out_params_schema", - "parent_node_id", - "depends_on", -} - -DATA_NODE_FIELDS = { - "id", - "type", - "display_name", - "params", - "out_params_schema", - "parent_node_id", - "depends_on", -} - -CONDITION_NODE_FIELDS = { - "id", - "type", - "display_name", - "params", - "true_to_node", - "false_to_node", - "parent_node_id", - "depends_on", -} - _JINJA_ENV = Environment() @@ -200,25 +136,6 @@ def _walk(val: Any) -> Any: return _walk(obj), changed -SWITCH_NODE_FIELDS = { - "id", - "type", - "display_name", - "params", - "cases", - "default_to_node", - "parent_node_id", - "depends_on", -} - -LOOP_NODE_FIELDS = { - "id", - "type", - "display_name", - "params", - "parent_node_id", - "depends_on", -} def _attach_inferred_edges(workflow: Dict[str, Any]) -> Dict[str, Any]: """Rebuild derived edges so LLMs can see the implicit wiring.""" @@ -392,267 +309,6 @@ def _attach_sub_graph_nodes(builder: WorkflowBuilder, loop_id: str, node_ids: Li node["parent_node_id"] = loop_id -def _build_data_node_output_schema(schema: Any) -> Dict[str, Any]: - properties: Dict[str, Any] = {} - if isinstance(schema, list): - for field in schema: - if not isinstance(field, Mapping): - continue - name = field.get("name") - if not isinstance(name, str) or not name: - continue - field_type = field.get("type") if isinstance(field.get("type"), str) else "string" - description = field.get("description") if isinstance(field.get("description"), str) else "" - properties[name] = {"type": field_type, "description": description} - - return { - "type": "object", - "properties": { - "dataset": { - "type": "array", - "items": { - "type": "object", - "properties": properties, - }, - }, - "schema": { - "type": "array", - "items": {"type": "object"}, - }, - }, - } - - -def _filter_supported_params( - *, - node_type: str, - params: Any, - action_schemas: Mapping[str, Mapping[str, Any]], - action_id: Optional[str] = None, -) -> tuple[Dict[str, Any], List[str]]: - """Keep only supported param fields for the given node type. - - Returns the sanitized params dict and a list of removed field names. - """ - - if not isinstance(params, Mapping): - return {}, [] - - allowed_fields: Optional[set[str]] = None - if node_type == "condition": - allowed_fields = set(CONDITION_PARAM_FIELDS) - elif node_type == "reasoning": - allowed_fields = set(REASONING_PARAM_FIELDS) - elif node_type == "data": - allowed_fields = set(DATA_PARAM_FIELDS) - elif node_type == "switch": - allowed_fields = set(SWITCH_PARAM_FIELDS) - elif node_type == "loop": - allowed_fields = set(LOOP_PARAM_FIELDS) - elif node_type == "action" and action_id: - schema = action_schemas.get(action_id, {}) if isinstance(action_id, str) else {} - properties = schema.get("arg_schema", {}).get("properties") if isinstance(schema.get("arg_schema"), Mapping) else None - if isinstance(properties, Mapping): - allowed_fields = set(properties.keys()) - - if not allowed_fields: - return dict(params), [] - - cleaned: Dict[str, Any] = {k: v for k, v in params.items() if k in allowed_fields} - removed = [k for k in params if k not in allowed_fields] - - return cleaned, removed - - -def _sanitize_builder_node_params( - builder: WorkflowBuilder, node_id: str, action_schemas: Mapping[str, Mapping[str, Any]] -) -> List[str]: - node = builder.nodes.get(node_id) - if not isinstance(node, Mapping): - return [] - - params = node.get("params") or {} - cleaned, removed = _filter_supported_params( - node_type=str(node.get("type")), - params=params, - action_schemas=action_schemas, - action_id=node.get("action_id") if isinstance(node.get("action_id"), str) else None, - ) - - if removed: - node["params"] = cleaned - - return removed - - -def _sanitize_builder_node_fields(builder: WorkflowBuilder, node_id: str) -> List[str]: - node = builder.nodes.get(node_id) - if not isinstance(node, Mapping): - return [] - - node_type = node.get("type") - allowed_fields: Optional[set[str]] = None - if node_type == "action": - allowed_fields = set(ACTION_NODE_FIELDS) - elif node_type == "reasoning": - allowed_fields = set(REASONING_NODE_FIELDS) - elif node_type == "condition": - allowed_fields = set(CONDITION_NODE_FIELDS) - elif node_type == "switch": - allowed_fields = set(SWITCH_NODE_FIELDS) - elif node_type == "loop": - allowed_fields = set(LOOP_NODE_FIELDS) - elif node_type == "data": - allowed_fields = set(DATA_NODE_FIELDS) - - if not allowed_fields: - return [] - - removed_keys = [key for key in list(node.keys()) if key not in allowed_fields] - for key in removed_keys: - node.pop(key, None) - - return removed_keys - - -def _build_action_schema_map(action_registry: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: - action_schemas: Dict[str, Dict[str, Any]] = {} - for action in action_registry: - aid = action.get("action_id") - if not aid: - continue - action_schemas[aid] = { - "name": action.get("name", ""), - "description": action.get("description", ""), - "domain": action.get("domain", ""), - "arg_schema": action.get("arg_schema"), - "output_schema": action.get("output_schema"), - } - return action_schemas - - -def _extract_loop_body_context( - loop_node: Mapping[str, Any], action_schemas: Mapping[str, Mapping[str, Any]] -) -> Dict[str, Any]: - params = loop_node.get("params") if isinstance(loop_node, Mapping) else None - body = params.get("body_subgraph") if isinstance(params, Mapping) else None - if not isinstance(body, Mapping): - return {"nodes": []} - - context_nodes = [] - for child in body.get("nodes", []) or []: - if not isinstance(child, Mapping): - continue - action_id = child.get("action_id") - schema = action_schemas.get(action_id, {}) if isinstance(action_id, str) else {} - context_nodes.append( - { - "id": child.get("id"), - "type": child.get("type"), - "action_id": action_id, - "display_name": child.get("display_name"), - "output_schema": schema.get("output_schema"), - } - ) - - return {"nodes": context_nodes} - - -def _validate_loop_exports( - *, loop_node: Mapping[str, Any], exports: Mapping[str, Any] -) -> List[str]: - params = loop_node.get("params") if isinstance(loop_node.get("params"), Mapping) else {} - body = params.get("body_subgraph") if isinstance(params, Mapping) else None - if not isinstance(body, Mapping): - body = {} - - body_nodes = [bn for bn in body.get("nodes", []) or [] if isinstance(bn, Mapping)] - body_ids = {bn.get("id") for bn in body_nodes if isinstance(bn.get("id"), str)} - - errors: List[str] = [] - - if not isinstance(exports, Mapping): - return ["exports 必须是对象"] - - for key, value in exports.items(): - if not isinstance(key, str): - errors.append("exports 的 key 必须是字符串") - continue - if not isinstance(value, str) or not value.strip(): - errors.append(f"exports.{key} 必须是非空 Jinja 表达式字符串") - continue - - refs = list(_iter_template_references(value)) - if not refs: - errors.append(f"exports.{key} 必须引用 loop body 节点的输出字段") - continue - - has_body_ref = False - for ref in refs: - try: - tokens = parse_field_path(ref) - except Exception: - continue - if len(tokens) < 3 or tokens[0] != "result_of": - continue - ref_node = tokens[1] - if isinstance(ref_node, str) and ref_node in body_ids: - has_body_ref = True - else: - errors.append(f"exports.{key} 只能引用 body_subgraph 内的节点输出") - break - if not has_body_ref and not errors: - errors.append(f"exports.{key} 必须引用 loop body 节点的输出字段") - - return errors - - -def _fallback_loop_exports( - loop_node: Mapping[str, Any], action_schemas: Mapping[str, Mapping[str, Any]] -) -> Optional[Dict[str, Any]]: - params = loop_node.get("params") if isinstance(loop_node, Mapping) else None - if not isinstance(params, Mapping): - return None - body = params.get("body_subgraph") - if not isinstance(body, Mapping): - return None - - body_nodes = [bn for bn in body.get("nodes", []) or [] if isinstance(bn, Mapping)] - body_ids = [bn.get("id") for bn in body_nodes if isinstance(bn.get("id"), str)] - exit_node = body.get("exit") if isinstance(body.get("exit"), str) else None - from_node = exit_node if exit_node in body_ids else (body_ids[0] if body_ids else None) - if not from_node: - return None - - target_node = next((bn for bn in body_nodes if bn.get("id") == from_node), None) - field_name = "status" - if isinstance(target_node, Mapping): - action_id = target_node.get("action_id") - schema = action_schemas.get(action_id, {}) if isinstance(action_id, str) else {} - props = ( - schema.get("output_schema", {}).get("properties") - if isinstance(schema.get("output_schema"), Mapping) - else None - ) - if isinstance(props, Mapping): - field_name = next((k for k in props.keys() if isinstance(k, str)), field_name) - - return { - "items": f"{{{{ result_of.{from_node}.{field_name} }}}}", - } - - -def _ensure_loop_items_fields( - *, - exports: Mapping[str, Any], - loop_node: Mapping[str, Any], - action_schemas: Mapping[str, Mapping[str, Any]], -) -> Dict[str, Any]: - """Placeholder pass-through for loop exports.""" - - return dict(exports) - - def _prepare_skeleton_for_next_stage( *, builder: WorkflowBuilder, @@ -1052,7 +708,7 @@ def plan_workflow_structure_with_llm( builder = WorkflowBuilder() if existing_workflow: _hydrate_builder_from_workflow(builder=builder, workflow=existing_workflow) - action_schemas = _build_action_schema_map(action_registry) + action_schemas = build_action_schema_map(action_registry) last_action_candidates: List[str] = [] all_action_candidates: List[str] = [] all_action_candidates_info: List[Dict[str, Any]] = [] @@ -1440,7 +1096,7 @@ def add_action_node( ) return _return_tool_result("add_action_node", result) - cleaned_params, removed_fields = _filter_supported_params( + cleaned_params, removed_fields = filter_supported_params( node_type="action", params=params or {}, action_schemas=action_schemas, @@ -1470,7 +1126,7 @@ def add_action_node( depends_on=depends_on or [], ) _reset_workflow_check_state() - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"add_action_{id}") if removed_fields or removed_node_fields: result = _build_validation_error( @@ -1522,7 +1178,7 @@ def add_reasoning_node( result = _build_validation_error("reasoning 节点的 params 需要是对象。") return _return_tool_result("add_reasoning_node", result) - cleaned_params, removed_fields = _filter_supported_params( + cleaned_params, removed_fields = filter_supported_params( node_type="reasoning", params=params or {}, action_schemas=action_schemas, @@ -1589,7 +1245,7 @@ def add_reasoning_node( depends_on=depends_on or [], ) _reset_workflow_check_state() - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"add_reasoning_{id}") if removed_fields or removed_node_fields: result = _build_validation_error( @@ -1644,12 +1300,12 @@ def add_data_node( if dataset is not None: params["dataset"] = dataset - cleaned_params, removed_fields = _filter_supported_params( + cleaned_params, removed_fields = filter_supported_params( node_type="data", params=params or {}, action_schemas=action_schemas, ) - out_params_schema = _build_data_node_output_schema(cleaned_params.get("schema")) + out_params_schema = build_data_node_output_schema(cleaned_params.get("schema")) ref_error = _validate_existing_references( node_id=id, params=cleaned_params, depends_on=depends_on or [] ) @@ -1665,7 +1321,7 @@ def add_data_node( depends_on=depends_on or [], ) _reset_workflow_check_state() - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"add_data_{id}") if removed_fields or removed_node_fields: result = _build_validation_error( @@ -1756,7 +1412,7 @@ def add_loop_node( merged_params = dict(params or {}) merged_params.update({"loop_kind": loop_kind, "source": source, "item_alias": item_alias}) - cleaned_params, removed_fields = _filter_supported_params( + cleaned_params, removed_fields = filter_supported_params( node_type="loop", params=merged_params, action_schemas=action_schemas ) ref_error = _validate_existing_references( @@ -1776,7 +1432,7 @@ def add_loop_node( ) _reset_workflow_check_state() _attach_sub_graph_nodes(builder, id, normalized_nodes) - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"add_loop_{id}") if removed_fields or removed_node_fields: result = _build_validation_error( @@ -1850,7 +1506,7 @@ def add_condition_node( ) return _return_tool_result("add_condition_node", result) - cleaned_params, removed_fields = _filter_supported_params( + cleaned_params, removed_fields = filter_supported_params( node_type="condition", params=normalized_params, action_schemas=action_schemas ) ref_error = _validate_existing_references( @@ -1869,7 +1525,7 @@ def add_condition_node( depends_on=depends_on or [], ) _reset_workflow_check_state() - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"add_condition_{id}") if removed_fields or removed_node_fields: result = _build_validation_error( @@ -1954,7 +1610,7 @@ def add_switch_node( result = _build_validation_error("switch 节点的 params 需要是对象。") return _return_tool_result("add_switch_node", result) - cleaned_params, removed_fields = _filter_supported_params( + cleaned_params, removed_fields = filter_supported_params( node_type="switch", params=params or {}, action_schemas=action_schemas ) ref_error = _validate_existing_references( @@ -1973,7 +1629,7 @@ def add_switch_node( depends_on=depends_on or [], ) _reset_workflow_check_state() - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"add_switch_{id}") if removed_fields or removed_node_fields: result = _build_validation_error( @@ -2067,7 +1723,7 @@ def update_action_node( if parent_node_id is not None: updates["parent_node_id"] = parent_node_id if params is not None: - cleaned_params, removed_param_fields = _filter_supported_params( + cleaned_params, removed_param_fields = filter_supported_params( node_type="action", params=params or {}, action_schemas=action_schemas, @@ -2097,8 +1753,8 @@ def update_action_node( return _return_tool_result("update_action_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() - removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_param_fields.extend(sanitize_builder_node_params(builder, id, action_schemas)) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"update_action_{id}") if removed_param_fields or removed_node_fields: result = _build_validation_error( @@ -2155,7 +1811,7 @@ def update_reasoning_node( if parent_node_id is not None: updates["parent_node_id"] = parent_node_id if params is not None: - cleaned_params, removed_param_fields = _filter_supported_params( + cleaned_params, removed_param_fields = filter_supported_params( node_type="reasoning", params=params or {}, action_schemas=action_schemas, @@ -2222,8 +1878,8 @@ def update_reasoning_node( return _return_tool_result("update_reasoning_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() - removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_param_fields.extend(sanitize_builder_node_params(builder, id, action_schemas)) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"update_reasoning_{id}") if removed_param_fields or removed_node_fields: result = _build_validation_error( @@ -2284,13 +1940,13 @@ def update_data_node( if dataset is not None: params["dataset"] = dataset if params: - cleaned_params, removed_param_fields = _filter_supported_params( + cleaned_params, removed_param_fields = filter_supported_params( node_type="data", params=params, action_schemas=action_schemas, ) updates["params"] = cleaned_params - updates["out_params_schema"] = _build_data_node_output_schema(cleaned_params.get("schema")) + updates["out_params_schema"] = build_data_node_output_schema(cleaned_params.get("schema")) else: removed_param_fields = [] @@ -2306,8 +1962,8 @@ def update_data_node( return _return_tool_result("update_data_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() - removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_param_fields.extend(sanitize_builder_node_params(builder, id, action_schemas)) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"update_data_{id}") if removed_param_fields or removed_node_fields: result = _build_validation_error( @@ -2390,7 +2046,7 @@ def update_condition_node( invalid_fields=["expression"], ) return _return_tool_result("update_condition_node", result) - cleaned_params, removed_param_fields = _filter_supported_params( + cleaned_params, removed_param_fields = filter_supported_params( node_type="condition", params=normalized_params, action_schemas=action_schemas ) updates["params"] = cleaned_params @@ -2408,8 +2064,8 @@ def update_condition_node( return _return_tool_result("update_condition_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() - removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_param_fields.extend(sanitize_builder_node_params(builder, id, action_schemas)) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"update_condition_{id}") if removed_param_fields or removed_node_fields: result = _build_validation_error( @@ -2506,7 +2162,7 @@ def update_switch_node( if cases is not None: updates["cases"] = normalized_cases if params is not None: - cleaned_params, removed_param_fields = _filter_supported_params( + cleaned_params, removed_param_fields = filter_supported_params( node_type="switch", params=params or {}, action_schemas=action_schemas ) updates["params"] = cleaned_params @@ -2524,8 +2180,8 @@ def update_switch_node( return _return_tool_result("update_switch_node", ref_error) builder.update_node(id, **updates) _reset_workflow_check_state() - removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_param_fields.extend(sanitize_builder_node_params(builder, id, action_schemas)) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"update_switch_{id}") if removed_param_fields or removed_node_fields: result = _build_validation_error( @@ -2619,7 +2275,7 @@ def update_loop_node( if parent_node_id is not None: updates["parent_node_id"] = parent_node_id if params is not None: - cleaned_params, removed_param_fields = _filter_supported_params( + cleaned_params, removed_param_fields = filter_supported_params( node_type="loop", params=params or {}, action_schemas=action_schemas ) updates["params"] = cleaned_params @@ -2638,8 +2294,8 @@ def update_loop_node( builder.update_node(id, **updates) _reset_workflow_check_state() _attach_sub_graph_nodes(builder, id, normalized_nodes) - removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) - removed_node_fields = _sanitize_builder_node_fields(builder, id) + removed_param_fields.extend(sanitize_builder_node_params(builder, id, action_schemas)) + removed_node_fields = sanitize_builder_node_fields(builder, id) _snapshot(f"update_loop_{id}") if removed_param_fields or removed_node_fields: result = _build_validation_error( @@ -3027,7 +2683,7 @@ def update_node_params(id: str, params: Dict[str, Any]) -> Mapping[str, Any]: if node.type == "reasoning": updates["out_params_schema"] = normalized_params.get("expected_output_format") if node.type == "data": - updates["out_params_schema"] = _build_data_node_output_schema( + updates["out_params_schema"] = build_data_node_output_schema( normalized_params.get("schema") ) builder.update_node(id, **updates)