Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 66 additions & 39 deletions velvetflow/planner/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import copy
import json
import os
from typing import Any, Callable, Dict, List, Mapping, Optional
from typing import Any, Callable, Dict, List, Mapping, Optional, Union

from velvetflow.config import OPENAI_MODEL
from velvetflow.logging_utils import (
Expand All @@ -30,6 +30,9 @@
from velvetflow.models import infer_edges_from_bindings


ToolResponse = Dict[str, object]
ParamsMapping = Mapping[str, object]

CONDITION_PARAM_FIELDS = {"expression"}

SWITCH_PARAM_FIELDS = {
Expand Down Expand Up @@ -97,6 +100,28 @@ def _attach_inferred_edges(workflow: Dict[str, Any]) -> Dict[str, Any]:
return attach_condition_branches(copied)


def _strip_additional_properties(value: Any) -> Any:
"""Recursively drop additionalProperties keys from a JSON schema-like mapping."""

if isinstance(value, Mapping):
return {
key: _strip_additional_properties(val)
for key, val in value.items()
if key != "additionalProperties"
}
if isinstance(value, list):
return [_strip_additional_properties(item) for item in value]
return value


def _clean_tool_schema(tool: Callable[..., Any]) -> Callable[..., Any]:
schema = getattr(tool, "__openai_schema__", None)
if isinstance(schema, Mapping):
cleaned = _strip_additional_properties(schema)
setattr(tool, "__openai_schema__", cleaned)
return tool


def _normalize_sub_graph_nodes(
raw: Any, *, builder: WorkflowBuilder
) -> tuple[List[str], Optional[Dict[str, Any]]]:
Expand Down Expand Up @@ -582,7 +607,7 @@ def _build_validation_error(message: str, **extra: Any) -> Dict[str, Any]:
return payload

@function_tool
def search_business_actions(query: str, top_k: int = 5) -> Mapping[str, Any]:
def search_business_actions(query: str, top_k: int = 5) -> ToolResponse:
actions_raw = search_service.search(query=query, top_k=int(top_k))
candidates = [
{
Expand All @@ -598,7 +623,7 @@ def search_business_actions(query: str, top_k: int = 5) -> Mapping[str, Any]:
return {"status": "ok", "query": query, "actions": actions_raw, "candidates": candidates}

@function_tool
def set_workflow_meta(workflow_name: str, description: Optional[str] = None) -> Mapping[str, Any]:
def set_workflow_meta(workflow_name: str, description: Optional[str] = None) -> ToolResponse:
builder.set_meta(workflow_name, description)
_snapshot("meta_updated")
return {"status": "ok", "type": "meta_set"}
Expand All @@ -609,10 +634,10 @@ def add_action_node(
action_id: str,
display_name: Optional[str] = None,
out_params_schema: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
params: Optional[ParamsMapping] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
if not last_action_candidates:
return _build_validation_error("action 节点必须在调用 search_business_actions 之后创建。")
if action_id not in last_action_candidates:
Expand Down Expand Up @@ -654,14 +679,14 @@ def add_action_node(
def add_loop_node(
id: str,
loop_kind: str,
source: Any,
source: Union[str, ParamsMapping],
item_alias: str,
display_name: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
params: Optional[ParamsMapping] = None,
sub_graph_nodes: Optional[List[str]] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
if parent_node_id is not None and not isinstance(parent_node_id, str):
return _build_validation_error("parent_node_id 需要是字符串或 null。")
invalid_fields: List[str] = []
Expand Down Expand Up @@ -714,10 +739,10 @@ def add_condition_node(
true_to_node: Optional[str],
false_to_node: Optional[str],
display_name: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
params: Optional[ParamsMapping] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
if parent_node_id is not None and not isinstance(parent_node_id, str):
return _build_validation_error("parent_node_id 需要是字符串或 null。")
if true_to_node is not None and not isinstance(true_to_node, str):
Expand Down Expand Up @@ -756,13 +781,13 @@ def add_condition_node(
@function_tool
def add_switch_node(
id: str,
cases: List[Dict[str, Any]],
cases: List[ParamsMapping],
display_name: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
params: Optional[ParamsMapping] = None,
default_to_node: Optional[str] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
if parent_node_id is not None and not isinstance(parent_node_id, str):
return _build_validation_error("parent_node_id 需要是字符串或 null。")
if not isinstance(cases, list):
Expand Down Expand Up @@ -822,12 +847,12 @@ def _update_node_common(node_id: str, expected_type: str) -> Optional[Dict[str,
def update_action_node(
id: str,
display_name: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
out_params_schema: Optional[Dict[str, Any]] = None,
params: Optional[ParamsMapping] = None,
out_params_schema: Optional[Mapping[str, object]] = None,
action_id: Optional[str] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
precheck = _update_node_common(id, "action")
if precheck:
return precheck
Expand Down Expand Up @@ -884,12 +909,12 @@ def update_action_node(
def update_condition_node(
id: str,
display_name: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
params: Optional[ParamsMapping] = None,
true_to_node: Optional[str] = None,
false_to_node: Optional[str] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
precheck = _update_node_common(id, "condition")
if precheck:
return precheck
Expand Down Expand Up @@ -945,12 +970,12 @@ def update_condition_node(
def update_switch_node(
id: str,
display_name: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
cases: Optional[List[Dict[str, Any]]] = None,
params: Optional[ParamsMapping] = None,
cases: Optional[List[ParamsMapping]] = None,
default_to_node: Optional[str] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
precheck = _update_node_common(id, "switch")
if precheck:
return precheck
Expand Down Expand Up @@ -1014,11 +1039,11 @@ def update_switch_node(
def update_loop_node(
id: str,
display_name: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
params: Optional[ParamsMapping] = None,
sub_graph_nodes: Optional[List[str]] = None,
depends_on: Optional[List[str]] = None,
parent_node_id: Optional[str] = None,
) -> Mapping[str, Any]:
) -> ToolResponse:
precheck = _update_node_common(id, "loop")
if precheck:
return precheck
Expand Down Expand Up @@ -1061,7 +1086,7 @@ def update_loop_node(
return {"status": "ok", "type": "node_updated", "node_id": id}

@function_tool
def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mapping[str, Any]:
def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> ToolResponse:
nonlocal latest_skeleton, latest_coverage
skeleton, coverage = _run_coverage_check(
nl_requirement=nl_requirement,
Expand Down Expand Up @@ -1100,7 +1125,7 @@ def finalize_workflow(ready: bool = True, notes: Optional[str] = None) -> Mappin
}

@function_tool
def dump_model() -> Mapping[str, Any]:
def dump_model() -> ToolResponse:
snapshot = _snapshot("dump_model")
return {
"status": "ok",
Expand All @@ -1112,23 +1137,25 @@ def dump_model() -> Mapping[str, Any]:
"workflow": snapshot,
}

tools = [
search_business_actions,
set_workflow_meta,
add_action_node,
add_loop_node,
add_condition_node,
add_switch_node,
update_action_node,
update_condition_node,
update_switch_node,
update_loop_node,
finalize_workflow,
dump_model,
]

agent = Agent(
name="WorkflowStructurePlanner",
instructions=system_prompt,
tools=[
search_business_actions,
set_workflow_meta,
add_action_node,
add_loop_node,
add_condition_node,
add_switch_node,
update_action_node,
update_condition_node,
update_switch_node,
update_loop_node,
finalize_workflow,
dump_model,
],
tools=[_clean_tool_schema(tool) for tool in tools],
model=OPENAI_MODEL,
)

Expand Down
Loading