diff --git a/examples/search_news_summarize/workflow_dag.jpg b/examples/search_news_summarize/workflow_dag.jpg deleted file mode 100644 index 80f14110..00000000 Binary files a/examples/search_news_summarize/workflow_dag.jpg and /dev/null differ diff --git a/examples/search_news_summarize/workflow_output.json b/examples/search_news_summarize/workflow_output.json deleted file mode 100644 index 4e884770..00000000 --- a/examples/search_news_summarize/workflow_output.json +++ /dev/null @@ -1,230 +0,0 @@ -{ - "workflow_name": "Nvidia和Google新闻搜索总结及发送", - "description": "自动搜索Nvidia和Google相关新闻,逐篇总结,汇总整合所有总结,并将结果通过邮件发送给用户。", - "nodes": [ - { - "id": "search_nvidia_news", - "type": "action", - "action_id": "common.search_news.v1", - "display_name": "搜索Nvidia相关新闻", - "params": { - "query": "Nvidia" - }, - "out_params_schema": { - "type": "object", - "properties": { - "results": { - "type": "array", - "items": { - "type": "object", - "properties": { - "title": { - "type": "string" - }, - "url": { - "type": "string" - }, - "snippet": { - "type": "string", - "description": "Short summary of the news result" - } - }, - "required": [ - "title", - "url", - "snippet" - ] - } - } - }, - "required": [ - "results" - ] - } - }, - { - "id": "search_google_news", - "type": "action", - "action_id": "common.search_news.v1", - "display_name": "搜索Google相关新闻", - "params": { - "query": "Google" - }, - "out_params_schema": { - "type": "object", - "properties": { - "results": { - "type": "array", - "items": { - "type": "object", - "properties": { - "title": { - "type": "string" - }, - "url": { - "type": "string" - }, - "snippet": { - "type": "string", - "description": "Short summary of the news result" - } - }, - "required": [ - "title", - "url", - "snippet" - ] - } - } - }, - "required": [ - "results" - ] - } - }, - { - "id": "summarize_nvidia_0", - "type": "action", - "action_id": "common.summarize.v1", - "display_name": "总结Nvidia新闻0", - "params": { - "text": "{{search_nvidia_news.results[0].snippet}}", - "max_sentences": 3 - }, - "out_params_schema": { - "type": "object", - "properties": { - "summary": { - "type": "string" - }, - "sentence_count": { - "type": "integer" - } - }, - "required": [ - "summary" - ] - } - }, - { - "id": "summarize_nvidia_1", - "type": "action", - "action_id": "common.summarize.v1", - "display_name": "总结Nvidia新闻1", - "params": { - "text": "{{search_nvidia_news.results[1].snippet}}", - "max_sentences": 3 - }, - "out_params_schema": { - "type": "object", - "properties": { - "summary": { - "type": "string" - }, - "sentence_count": { - "type": "integer" - } - }, - "required": [ - "summary" - ] - } - }, - { - "id": "summarize_google_0", - "type": "action", - "action_id": "common.summarize.v1", - "display_name": "总结Google新闻0", - "params": { - "text": "{{search_google_news.results[0].snippet}}", - "max_sentences": 3 - }, - "out_params_schema": { - "type": "object", - "properties": { - "summary": { - "type": "string" - }, - "sentence_count": { - "type": "integer" - } - }, - "required": [ - "summary" - ] - } - }, - { - "id": "summarize_google_1", - "type": "action", - "action_id": "common.summarize.v1", - "display_name": "总结Google新闻1", - "params": { - "text": "{{search_google_news.results[1].snippet}}", - "max_sentences": 3 - }, - "out_params_schema": { - "type": "object", - "properties": { - "summary": { - "type": "string" - }, - "sentence_count": { - "type": "integer" - } - }, - "required": [ - "summary" - ] - } - }, - { - "id": "combine_summaries", - "type": "action", - "action_id": "common.summarize.v1", - "display_name": "汇总整合所有总结", - "params": { - "text": "Nvidia新闻总结1: {{summarize_nvidia_0.summary}}\nNvidia新闻总结2: {{summarize_nvidia_1.summary}}\nGoogle新闻总结1: {{summarize_google_0.summary}}\nGoogle新闻总结2: {{summarize_google_1.summary}}", - "max_sentences": 5 - }, - "out_params_schema": { - "type": "object", - "properties": { - "summary": { - "type": "string" - }, - "sentence_count": { - "type": "integer" - } - }, - "required": [ - "summary" - ] - } - }, - { - "id": "send_summary_email", - "type": "action", - "action_id": "productivity.compose_outlook_email.v1", - "display_name": "发送总结邮件", - "params": { - "email_content": "以下是关于Nvidia和Google的新闻总结:\n{{combine_summaries.summary}}", - "emailTo": "user@example.com" - }, - "out_params_schema": { - "type": "object", - "properties": { - "status": { - "type": "string" - }, - "message": { - "type": "string" - } - }, - "required": [ - "status" - ] - } - } - ] -} \ No newline at end of file diff --git a/tools/builtin.py b/tools/builtin.py index fa98cc8b..09e98546 100644 --- a/tools/builtin.py +++ b/tools/builtin.py @@ -20,6 +20,7 @@ from openai import OpenAI from velvetflow.config import OPENAI_MODEL +from velvetflow.logging_utils import log_info, log_json from tools.base import Tool from tools.business import ( @@ -311,7 +312,7 @@ def ask_ai( tools_spec.append(schema) tool_lookup[tool_name] = item - for _ in range(max_rounds): + for round_idx in range(1, max_rounds + 1): response = client.chat.completions.create( model=chat_model, messages=messages, @@ -329,6 +330,16 @@ def ask_ai( "tool_calls": message.tool_calls, } messages.append(assistant_msg) + log_info(f"[ask_ai] round={round_idx} assistant response received") + if assistant_msg.get("content"): + log_json("[ask_ai] assistant content", assistant_msg.get("content")) + else: + reminder = ( + "Your previous response had no content. " + "Please provide a concise response that satisfies the user's requirements." + ) + messages.append({"role": "user", "content": reminder}) + log_info("[ask_ai] assistant content empty; prompting for a complete response") tool_calls = getattr(message, "tool_calls", None) or [] if tool_calls: @@ -359,6 +370,9 @@ def ask_ai( "content": json.dumps(tool_result, ensure_ascii=False), } ) + log_info(f"[ask_ai] round={round_idx} tool_call={func_name}") + log_json("[ask_ai] tool_args", parsed_args) + log_json("[ask_ai] tool_result", tool_result) continue final_content = (message.content or "").strip() @@ -371,11 +385,12 @@ def ask_ai( except json.JSONDecodeError: results = {"answer": final_content} - return {"status": "ok", "results": results} + return {"status": "ok", "results": results, "messages": messages} return { "status": "error", "results": {"message": "ask_ai could not produce an answer within the allowed rounds"}, + "messages": messages, } diff --git a/tools/business_actions/common.json b/tools/business_actions/common.json index dc0657cb..7b61478a 100644 --- a/tools/business_actions/common.json +++ b/tools/business_actions/common.json @@ -1,231 +1,109 @@ [ { - "action_id": "common.ask_ai.v1", - "name": "AskAI LLM orchestrator", + "action_id": "common.web_scraper.llm_crawl.v1", + "name": "LLM-guided web scraper (multi-URL)", "domain": "common", - "description": "LLM fallback that reasons over upstream tool context, optionally calls provided business tools, and returns structured JSON answers when no direct tool fits.", - "tool_name": "ask_ai", + "description": "从一个或多个起始链接出发,使用 LLM 逐步抓取站内页面并回答业务需求。", + "tool_name": "web_scraper.llm_crawl", "tags": [ - "fallback", + "web-scraping", + "crawler", "llm", - "reasoning", - "tool-orchestration", - "context" + "research" ], "arg_schema": { "type": "object", "properties": { - "system_prompt": { - "type": "string", - "description": "Role or instruction prompt describing the assistant persona for this node." - }, - "prompt": { - "type": "string", - "description": "The full user prompt with contextual details for the current node." - }, - "question": { - "type": "string", - "description": "Optional short-form question when a full prompt is not supplied." - }, - "context": { - "type": "object", - "description": "Collected context from upstream nodes, including tools, inputs, and outputs.", - "additionalProperties": true - }, - "expected_format": { - "oneOf": [ - { - "type": "string" - }, - { - "type": "object", - "additionalProperties": true - } - ], - "description": "Required guidance for the JSON result schema (string or object)." - }, - "tool": { + "urls": { "type": "array", - "description": "Subset of business tools available for this node (e.g., recent search_business_actions results).", "items": { - "type": "object", - "additionalProperties": true - } + "type": "string" + }, + "minItems": 1, + "description": "起始 URL 列表,工具会在这些站点内逐步爬取。" }, - "model": { + "goal": { "type": "string", - "description": "Override model name for chat completion." + "description": "用户需要解决的自然语言需求或研究目标。" }, - "max_tokens": { + "max_pages": { "type": "integer", - "default": 256, - "description": "Maximum tokens to generate per response." + "default": 40, + "description": "最多抓取的页面数量上限。" + }, + "same_domain_only": { + "type": "boolean", + "default": true, + "description": "是否只在起始站点域名内继续爬取。" }, - "max_rounds": { + "concurrency": { "type": "integer", - "default": 3, - "description": "Maximum chat-tool rounds before returning an error." - } - }, - "required": [ - "expected_format" - ], - "oneOf": [ - { - "required": [ - "expected_format", - "prompt" - ] - }, - { - "required": [ - "expected_format", - "question" - ] - } - ] - }, - "output_schema": { - "type": "object", - "properties": { - "status": { - "type": "string", - "description": "Execution status returned by AskAI, such as ok or error." + "default": 2, + "description": "并发抓取页面数量。" }, - "results": { - "type": "object", - "description": "Structured JSON answer synthesized by the LLM (or tool call results).", - "additionalProperties": true - } - }, - "required": [ - "status", - "results" - ] - }, - "enabled": true - }, - { - "action_id": "common.classify_text.v1", - "name": "Classify text with LLM labels", - "domain": "common", - "description": "Assign one or more labels to input text using the configured LLM model for routing, triage, or content tagging.", - "tool_name": "classify_text", - "tags": [ - "classification", - "llm", - "labels", - "text-analysis" - ], - "arg_schema": { - "type": "object", - "properties": { - "text": { - "type": "string", - "description": "The content to classify" + "politeness_delay": { + "type": "number", + "default": 0.8, + "description": "相邻请求之间的延迟(秒)。" }, - "labels": { - "type": "array", - "items": { - "type": "string" - }, - "description": "Candidate labels" + "llm_model": { + "type": "string", + "default": "gpt-4o-mini", + "description": "用于决策与总结的 LLM 模型。" }, - "instruction": { + "openai_api_key": { "type": "string", - "description": "Optional guidance to steer the classification" + "description": "可选的 OpenAI API Key(未提供时读取环境变量)。" }, - "allow_multiple": { - "type": "boolean", - "default": false + "page_md_chars": { + "type": "integer", + "default": 9000, + "description": "单页提取 Markdown 的最大字符数。" }, - "model": { - "type": "string" + "goal_check_interval": { + "type": "integer", + "default": 10, + "description": "每多少页让 LLM 判断是否已满足需求。" + }, + "min_enqueue_score": { + "type": "integer", + "default": 55, + "description": "队列入选链接的最小打分阈值。" }, - "max_tokens": { + "timeout_ms": { "type": "integer", - "default": 128 + "default": 20000, + "description": "单页抓取超时时间(毫秒)。" } }, "required": [ - "text", - "labels" + "urls", + "goal" ] }, "output_schema": { "type": "object", "properties": { - "labels": { - "type": "array", - "items": { - "type": "string" - } - }, - "reason": { - "type": "string" + "satisfied": { + "type": "boolean", + "description": "LLM 是否判定需求已满足。" }, - "model": { - "type": "string" + "answer": { + "type": "string", + "description": "基于已抓取页面生成的 Markdown 答案。" }, - "finish_reason": { - "type": "string" - } - }, - "required": [ - "labels" - ] - }, - "enabled": true - }, - { - "action_id": "common.web_scraper.llm_crawl.v1", - "name": "LLM-guided web scraper (multi-URL)", - "domain": "common", - "description": "从一个或多个起始链接出发,使用 LLM 逐步抓取站内页面并回答业务需求。", - "tool_name": "web_scraper.llm_crawl", - "tags": [ - "web-scraping", - "crawler", - "llm", - "research" - ], - "arg_schema": { - "type": "object", - "properties": { - "urls": { - "type": "array", - "items": {"type": "string"}, - "minItems": 1, - "description": "起始 URL 列表,工具会在这些站点内逐步爬取。" + "pages_crawled": { + "type": "integer", + "description": "实际抓取的页面数量。" }, - "goal": { + "state_path": { "type": "string", - "description": "用户需要解决的自然语言需求或研究目标。" + "description": "持久化的抓取摘要状态文件路径。" }, - "max_pages": {"type": "integer", "default": 40, "description": "最多抓取的页面数量上限。"}, - "same_domain_only": {"type": "boolean", "default": true, "description": "是否只在起始站点域名内继续爬取。"}, - "concurrency": {"type": "integer", "default": 2, "description": "并发抓取页面数量。"}, - "politeness_delay": {"type": "number", "default": 0.8, "description": "相邻请求之间的延迟(秒)。"}, - "llm_model": {"type": "string", "default": "gpt-4o-mini", "description": "用于决策与总结的 LLM 模型。"}, - "openai_api_key": {"type": "string", "description": "可选的 OpenAI API Key(未提供时读取环境变量)。"}, - "page_md_chars": {"type": "integer", "default": 9000, "description": "单页提取 Markdown 的最大字符数。"}, - "goal_check_interval": {"type": "integer", "default": 10, "description": "每多少页让 LLM 判断是否已满足需求。"}, - "min_enqueue_score": {"type": "integer", "default": 55, "description": "队列入选链接的最小打分阈值。"}, - "timeout_ms": {"type": "integer", "default": 20000, "description": "单页抓取超时时间(毫秒)。"} - }, - "required": [ - "urls", - "goal" - ] - }, - "output_schema": { - "type": "object", - "properties": { - "satisfied": {"type": "boolean", "description": "LLM 是否判定需求已满足。"}, - "answer": {"type": "string", "description": "基于已抓取页面生成的 Markdown 答案。"}, - "pages_crawled": {"type": "integer", "description": "实际抓取的页面数量。"}, - "state_path": {"type": "string", "description": "持久化的抓取摘要状态文件路径。"}, - "pages_path": {"type": "string", "description": "已抓取页面内容的 JSONL 文件路径。"} + "pages_path": { + "type": "string", + "description": "已抓取页面内容的 JSONL 文件路径。" + } }, "required": [ "satisfied", @@ -259,16 +137,55 @@ "default": 5, "description": "从搜索结果中提取的最大 URL 数量。" }, - "max_pages": {"type": "integer", "default": 40, "description": "LLM 爬虫最多抓取的页面数量。"}, - "same_domain_only": {"type": "boolean", "default": true, "description": "是否限制只抓取同域名链接。"}, - "concurrency": {"type": "integer", "default": 2, "description": "爬虫的并发抓取数。"}, - "politeness_delay": {"type": "number", "default": 0.8, "description": "相邻抓取请求之间的延迟(秒)。"}, - "llm_model": {"type": "string", "default": "gpt-4o-mini", "description": "用于决策与总结的 LLM 模型。"}, - "openai_api_key": {"type": "string", "description": "可选的 OpenAI API Key,缺省时读取环境变量。"}, - "page_md_chars": {"type": "integer", "default": 9000, "description": "单页提取 Markdown 的最大字符数。"}, - "goal_check_interval": {"type": "integer", "default": 10, "description": "每抓取多少页让 LLM 判断是否满足目标。"}, - "min_enqueue_score": {"type": "integer", "default": 55, "description": "入队下一步抓取链接的最低得分阈值。"}, - "timeout_ms": {"type": "integer", "default": 20000, "description": "单页抓取的超时时间(毫秒)。"} + "max_pages": { + "type": "integer", + "default": 40, + "description": "LLM 爬虫最多抓取的页面数量。" + }, + "same_domain_only": { + "type": "boolean", + "default": true, + "description": "是否限制只抓取同域名链接。" + }, + "concurrency": { + "type": "integer", + "default": 2, + "description": "爬虫的并发抓取数。" + }, + "politeness_delay": { + "type": "number", + "default": 0.8, + "description": "相邻抓取请求之间的延迟(秒)。" + }, + "llm_model": { + "type": "string", + "default": "gpt-4o-mini", + "description": "用于决策与总结的 LLM 模型。" + }, + "openai_api_key": { + "type": "string", + "description": "可选的 OpenAI API Key,缺省时读取环境变量。" + }, + "page_md_chars": { + "type": "integer", + "default": 9000, + "description": "单页提取 Markdown 的最大字符数。" + }, + "goal_check_interval": { + "type": "integer", + "default": 10, + "description": "每抓取多少页让 LLM 判断是否满足目标。" + }, + "min_enqueue_score": { + "type": "integer", + "default": 55, + "description": "入队下一步抓取链接的最低得分阈值。" + }, + "timeout_ms": { + "type": "integer", + "default": 20000, + "description": "单页抓取的超时时间(毫秒)。" + } }, "required": [ "goal" @@ -277,10 +194,15 @@ "output_schema": { "type": "object", "properties": { - "query": {"type": "string", "description": "规范化后的用户需求。"}, + "query": { + "type": "string", + "description": "规范化后的用户需求。" + }, "seed_urls": { "type": "array", - "items": {"type": "string"}, + "items": { + "type": "string" + }, "description": "从搜索结果抽取的起始 URL 列表。" }, "search_results": { @@ -288,9 +210,16 @@ "items": { "type": "object", "properties": { - "title": {"type": "string"}, - "url": {"type": "string"}, - "snippet": {"type": "string", "description": "搜索结果的摘要。"} + "title": { + "type": "string" + }, + "url": { + "type": "string" + }, + "snippet": { + "type": "string", + "description": "搜索结果的摘要。" + } }, "required": [ "title", @@ -313,116 +242,6 @@ }, "enabled": true }, - { - "action_id": "common.join_list.v1", - "name": "Join list of strings", - "domain": "common", - "description": "Concatenate a list of strings using a separator to produce a single formatted string for prompts or output.", - "tool_name": "join_list", - "tags": [ - "string", - "list", - "join", - "formatting" - ], - "arg_schema": { - "type": "object", - "properties": { - "items": { - "type": "array", - "items": { - "type": "string" - } - }, - "separator": { - "type": "string", - "default": "," - } - }, - "required": [ - "items" - ] - }, - "output_schema": { - "type": "object", - "properties": { - "result": { - "type": "string" - } - }, - "required": [ - "result" - ] - }, - "enabled": true - }, - { - "action_id": "common.list_files.v1", - "name": "List directory entries with metadata", - "domain": "common", - "description": "Enumerate files and folders under a directory (optionally including hidden items) with path and type metadata for workflow navigation.", - "tool_name": "list_files", - "tags": [ - "filesystem", - "discovery", - "listing", - "metadata" - ], - "arg_schema": { - "type": "object", - "properties": { - "directory": { - "type": "string", - "default": "." - }, - "include_hidden": { - "type": "boolean", - "default": false - }, - "max_entries": { - "type": "integer", - "default": 200 - } - } - }, - "output_schema": { - "type": "object", - "properties": { - "entries": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "path": { - "type": "string" - }, - "is_dir": { - "type": "boolean" - }, - "size": { - "type": "integer" - }, - "modified": { - "type": "number" - } - }, - "required": [ - "name", - "path", - "is_dir" - ] - } - } - }, - "required": [ - "entries" - ] - }, - "enabled": true - }, { "action_id": "common.read_file.v1", "name": "Read text file with safety limits", @@ -603,92 +422,5 @@ ] }, "enabled": true - }, - { - "action_id": "common.split_list.v1", - "name": "Split string into list", - "domain": "common", - "description": "Split an input string by a separator into a list of values for downstream iteration or validation.", - "tool_name": "split_list", - "tags": [ - "string", - "list", - "split", - "parsing" - ], - "arg_schema": { - "type": "object", - "properties": { - "text": { - "type": "string" - }, - "separator": { - "type": "string", - "default": "," - } - }, - "required": [ - "text" - ] - }, - "output_schema": { - "type": "object", - "properties": { - "items": { - "type": "array", - "items": { - "type": "string" - } - } - }, - "required": [ - "items" - ] - }, - "enabled": true - }, - { - "action_id": "common.summarize.v1", - "name": "Summarize text with LLM", - "domain": "common", - "description": "Generate a concise summary of the provided text using the LLM, optionally constrained by sentence count.", - "tool_name": "summarize", - "tags": [ - "text", - "summarize", - "nlp", - "abstraction", - "llm" - ], - "arg_schema": { - "type": "object", - "properties": { - "text": { - "type": "string" - }, - "max_sentences": { - "type": "integer", - "default": 3 - } - }, - "required": [ - "text" - ] - }, - "output_schema": { - "type": "object", - "properties": { - "summary": { - "type": "string" - }, - "sentence_count": { - "type": "integer" - } - }, - "required": [ - "summary" - ] - }, - "enabled": true } -] \ No newline at end of file +] diff --git a/velvetflow/bindings.py b/velvetflow/bindings.py index 333f0da3..3cb57480 100644 --- a/velvetflow/bindings.py +++ b/velvetflow/bindings.py @@ -852,6 +852,10 @@ def _append_token(token: Any) -> None: continue if p == "count": + if isinstance(cur, Mapping) and "count" in cur: + cur = cur["count"] + _append_token(p) + continue try: cur = len(cur) except Exception: diff --git a/velvetflow/executor/dynamic_executor.py b/velvetflow/executor/dynamic_executor.py index 74631e07..a45a6376 100644 --- a/velvetflow/executor/dynamic_executor.py +++ b/velvetflow/executor/dynamic_executor.py @@ -1,9 +1,10 @@ """Dynamic workflow executor implementation.""" from __future__ import annotations +import json from typing import Any, Dict, List, Mapping, Optional, Set, Union -from velvetflow.action_registry import get_action_by_id +from velvetflow.action_registry import BUSINESS_ACTIONS, get_action_by_id from velvetflow.bindings import BindingContext, eval_node_params from velvetflow.logging_utils import ( TraceContext, @@ -19,6 +20,8 @@ ) from velvetflow.models import Node, Workflow, infer_depends_on_from_edges +from tools import ask_ai + from .async_runtime import ( ExecutionCheckpoint, GLOBAL_ASYNC_RESULT_STORE, @@ -98,6 +101,59 @@ def _validate_registered_actions(self) -> None: "workflow 中存在未注册或缺失的 action_id,请在构建阶段修复: " + details ) + def _build_reasoning_prompt( + self, + *, + task_prompt: str | None, + context: Any, + expected_output_format: Any, + ) -> str: + prompt_parts: List[str] = [] + if task_prompt: + prompt_parts.append(f"Task:\n{task_prompt}") + if context is not None: + if isinstance(context, Mapping) or isinstance(context, list): + context_text = json.dumps(context, ensure_ascii=False, indent=2) + else: + context_text = str(context) + prompt_parts.append(f"Context:\n{context_text}") + if expected_output_format is not None: + if isinstance(expected_output_format, Mapping) or isinstance(expected_output_format, list): + expected_text = json.dumps(expected_output_format, ensure_ascii=False, indent=2) + else: + expected_text = str(expected_output_format) + prompt_parts.append(f"Expected Output Format:\n{expected_text}") + return "\n\n".join(prompt_parts).strip() + + def _resolve_reasoning_toolset(self, toolset: Any) -> List[Dict[str, Any]]: + if not isinstance(toolset, list): + return [] + + actions: List[Dict[str, Any]] = [] + for item in toolset: + if isinstance(item, Mapping): + actions.append(dict(item)) + continue + if not isinstance(item, str): + continue + action_def = get_action_by_id(item) + if action_def: + actions.append(action_def) + continue + matched = next( + ( + action + for action in BUSINESS_ACTIONS + if action.get("tool_name") == item + ), + None, + ) + if matched: + actions.append(matched) + else: + log_warn(f"[reasoning] toolset item '{item}' not found in action registry") + return actions + def _build_checkpoint( self, workflow_dict: Dict[str, Any], @@ -300,6 +356,66 @@ def _execute_graph( ) continue + if ntype == "reasoning": + resolved_params = eval_node_params(node_model, binding_ctx) + log_json("resolved params", resolved_params) + + system_prompt = ( + resolved_params.get("system_prompt") + if isinstance(resolved_params.get("system_prompt"), str) + else None + ) + task_prompt = ( + resolved_params.get("task_prompt") + if isinstance(resolved_params.get("task_prompt"), str) + else None + ) + context = resolved_params.get("context") + expected_output_format = resolved_params.get("expected_output_format") + toolset = resolved_params.get("toolset") + + prompt_text = self._build_reasoning_prompt( + task_prompt=task_prompt, + context=context, + expected_output_format=expected_output_format, + ) + if not prompt_text and task_prompt: + prompt_text = task_prompt + if not prompt_text: + prompt_text = "Follow the expected output format and return the result." + + tools_for_reasoning = self._resolve_reasoning_toolset(toolset) + result = ask_ai( + system_prompt=system_prompt, + prompt=prompt_text, + tool=tools_for_reasoning, + ) + + result_payload = result.get("results") if isinstance(result, Mapping) else None + if isinstance(result_payload, Mapping): + results[nid] = dict(result_payload) + else: + results[nid] = {"output": result_payload} + + self._record_node_metrics(result) + next_ids = self._next_nodes(edges, nid, nodes_data=nodes_data) + for nxt in next_ids: + if nxt not in visited: + reachable.add(nxt) + log_event( + "node_end", + { + "node_id": nid, + "type": ntype, + "resolved_params": resolved_params, + "result": result, + "next_nodes": next_ids, + }, + node_id=nid, + action_id=action_id, + ) + continue + if ntype == "condition": cond_eval = self._eval_condition(node, binding_ctx, include_debug=True) if isinstance(cond_eval, tuple) and len(cond_eval) == 2: diff --git a/velvetflow/models.py b/velvetflow/models.py index 9e91265c..da353f8b 100644 --- a/velvetflow/models.py +++ b/velvetflow/models.py @@ -369,7 +369,7 @@ def model_validate(cls, data: Any) -> "Node": if not isinstance(node_type, str): errors.append({"loc": ("type",), "msg": "节点类型必须是字符串"}) else: - allowed = {"action", "condition", "switch", "loop", "parallel"} + allowed = {"action", "condition", "switch", "loop", "parallel", "reasoning"} if node_type not in allowed: errors.append({ "loc": ("type",), @@ -422,6 +422,26 @@ def model_validate(cls, data: Any) -> "Node": depends_on=depends_on, ) + if node_type == "reasoning": + out_params_schema = data.get("out_params_schema") + if out_params_schema is not None and not isinstance(out_params_schema, (Mapping, str)): + errors.append( + { + "loc": ("out_params_schema",), + "msg": "out_params_schema 必须是对象或字符串", + } + ) + if errors: + raise PydanticValidationError(errors) + + return ReasoningNode( + id=node_id, + display_name=data.get("display_name"), + params=dict(params), + out_params_schema=out_params_schema, + depends_on=depends_on, + ) + if node_type == "condition": true_to_node = data.get("true_to_node") false_to_node = data.get("false_to_node") @@ -539,6 +559,19 @@ def model_dump(self, *, by_alias: bool = False) -> Dict[str, Any]: return data +@dataclass +class ReasoningNode(Node): + """LLM reasoning node.""" + + type: Literal["reasoning"] = "reasoning" + out_params_schema: Optional[Any] = None + + def model_dump(self, *, by_alias: bool = False) -> Dict[str, Any]: + data = super().model_dump(by_alias=by_alias) + data["out_params_schema"] = self.out_params_schema + return data + + @dataclass class Edge: """Directed edge between workflow nodes.""" diff --git a/velvetflow/planner/params_tools.py b/velvetflow/planner/params_tools.py index d77b4849..e2c5a6f2 100644 --- a/velvetflow/planner/params_tools.py +++ b/velvetflow/planner/params_tools.py @@ -99,6 +99,39 @@ def _loop_params_schema() -> Dict[str, Any]: } +def _reasoning_params_schema() -> Dict[str, Any]: + return { + "type": "object", + "properties": { + "system_prompt": { + "type": "string", + "description": "用于设置 LLM 行为与语气的系统提示词。", + }, + "task_prompt": { + "type": "string", + "description": "需要 LLM 完成的具体任务描述。", + }, + "context": { + "description": "可选上下文信息,建议使用对象结构或 Jinja 模板引用。", + "type": ["object", "string", "array"], + "additionalProperties": True, + }, + "expected_output_format": { + "description": "期望输出格式说明,可为 JSON Schema 或文本说明。", + "type": ["object", "string"], + "additionalProperties": True, + }, + "toolset": { + "type": "array", + "description": "允许 LLM 调用的工具列表(action_id 或 tool_name)。", + "items": {"type": "string"}, + }, + }, + "required": ["task_prompt"], + "additionalProperties": True, + } + + def _params_schema_for_node( node: Node, action_schemas: Dict[str, Mapping[str, Any]] ) -> Dict[str, Any]: @@ -112,6 +145,9 @@ def _params_schema_for_node( if node.type == "loop": return _loop_params_schema() + if node.type == "reasoning": + return _reasoning_params_schema() + return {"type": "object", "additionalProperties": True} diff --git a/velvetflow/planner/structure.py b/velvetflow/planner/structure.py index d20ed59a..7323e50c 100644 --- a/velvetflow/planner/structure.py +++ b/velvetflow/planner/structure.py @@ -41,6 +41,14 @@ CONDITION_PARAM_FIELDS = {"expression"} +REASONING_PARAM_FIELDS = { + "system_prompt", + "task_prompt", + "context", + "expected_output_format", + "toolset", +} + SWITCH_PARAM_FIELDS = { "source", "field", @@ -66,6 +74,16 @@ "depends_on", } +REASONING_NODE_FIELDS = { + "id", + "type", + "display_name", + "params", + "out_params_schema", + "parent_node_id", + "depends_on", +} + CONDITION_NODE_FIELDS = { "id", "type", @@ -377,6 +395,8 @@ def _filter_supported_params( allowed_fields: Optional[set[str]] = None if node_type == "condition": allowed_fields = set(CONDITION_PARAM_FIELDS) + elif node_type == "reasoning": + allowed_fields = set(REASONING_PARAM_FIELDS) elif node_type == "switch": allowed_fields = set(SWITCH_PARAM_FIELDS) elif node_type == "loop": @@ -426,6 +446,8 @@ def _sanitize_builder_node_fields(builder: WorkflowBuilder, node_id: str) -> Lis allowed_fields: Optional[set[str]] = None if node_type == "action": allowed_fields = set(ACTION_NODE_FIELDS) + elif node_type == "reasoning": + allowed_fields = set(REASONING_NODE_FIELDS) elif node_type == "condition": allowed_fields = set(CONDITION_NODE_FIELDS) elif node_type == "switch": @@ -603,8 +625,9 @@ def _build_combined_prompt() -> str: "[Workflow DSL syntax and semantics (must follow)]\n" "- workflow = {workflow_name, description, nodes: []}; only return valid JSON (edges will be automatically inferred by the system based on node bindings, no need to generate them).\n" "- Node basic structure: {id, type, display_name, params, depends_on, action_id?, out_params_schema?, loop/subgraph/branches?}.\n" - " type allows action/condition/switch/loop/parallel.\n" + " type allows action/condition/switch/loop/parallel/reasoning.\n" " Action nodes must specify action_id (from the action library) and params; only action nodes allow out_params_schema.\n" + " Reasoning nodes accept params including system_prompt/task_prompt/context/expected_output_format/toolset to drive LLM reasoning, and mirror expected_output_format into out_params_schema.\n" " Condition node params can only include expression (a single Jinja expression returning a boolean); true_to_node/false_to_node must be top-level fields (string or null), not inside params.\n" " Loop nodes contain loop_kind/iter/source/body_subgraph/exports. Values in exports must reference fields of nodes inside body_subgraph (e.g., {{ result_of.node.field }}); outside the loop you may only reference exports.. body_subgraph only needs the nodes array—no entry/exit/edges.\n" " Branches of a parallel node are a non-empty array, each element containing id/entry_node/sub_graph_nodes.\n" @@ -617,12 +640,13 @@ def _build_combined_prompt() -> str: "2) When business actions are needed, you must first call search_business_actions to query candidates; add_action_node.action_id must come from the most recent candidates.id.\n" "3) Before adding a new node, check whether a similar node already exists; if so, do not add a duplicate node.\n" "4) When condition/switch/loop nodes are needed, you must first create them with add_condition_node/add_switch_node/add_loop_node; expressions and references in params must strictly follow Jinja template syntax.\n" - "5) Call update_node_params to complete and validate params for created nodes.\n" - "6) If an existing node needs to be modified (adding display_name/params/branch targets/parent nodes, etc.), call update_action_node or update_condition_node with the fields to overwrite; after calling, be sure to check whether related upstream/downstream nodes also need updates to stay consistent.\n" - "7) Condition nodes must explicitly provide true_to_node and false_to_node. Values can be a node id (continue execution) or null (indicating that branch ends); express dependencies through input/output references in node params—no need to draw edges explicitly.\n" - "8) Maintain depends_on (array of strings) for each node, listing its direct upstream dependencies; when a node is targeted by condition.true_to_node/false_to_node, you must add that condition node to the target node's depends_on.\n" - "9) After the structure is complete, continue to fill params for all nodes and ensure update_node_params validation passes.\n\n" - "10) When you believe the workflow is complete, call check_workflow to validate upstream references; only after check_workflow succeeds should you call finalize_workflow to finish.\n\n" + "5) When LLM reasoning tasks are needed, create them with add_reasoning_node and fill system_prompt/task_prompt/context/expected_output_format/toolset in params.\n" + "6) Call update_node_params to complete and validate params for created nodes.\n" + "7) If an existing node needs to be modified (adding display_name/params/branch targets/parent nodes, etc.), call update_action_node/update_reasoning_node or update_condition_node with the fields to overwrite; after calling, be sure to check whether related upstream/downstream nodes also need updates to stay consistent.\n" + "8) Condition nodes must explicitly provide true_to_node and false_to_node. Values can be a node id (continue execution) or null (indicating that branch ends); express dependencies through input/output references in node params—no need to draw edges explicitly.\n" + "9) Maintain depends_on (array of strings) for each node, listing its direct upstream dependencies; when a node is targeted by condition.true_to_node/false_to_node, you must add that condition node to the target node's depends_on.\n" + "10) After the structure is complete, continue to fill params for all nodes and ensure update_node_params validation passes.\n\n" + "11) When you believe the workflow is complete, call check_workflow to validate upstream references; only after check_workflow succeeds should you call finalize_workflow to finish.\n\n" "Important note: Only action nodes need out_params_schema; condition nodes do not have this property. The format of out_params_schema should be {\"param_name\": \"type\"}; list only the names and types of output parameters of the business action, without extra descriptions or examples.\n\n" "[Very important principles]\n" "1. You must design the workflow strictly around the natural language requirement in the current conversation:\n" @@ -1368,6 +1392,86 @@ def add_action_node( result = {"status": "ok", "type": "node_added", "node_id": id} return _return_tool_result("add_action_node", result) + @function_tool(strict_mode=False) + def add_reasoning_node( + id: str, + display_name: Optional[str] = None, + params: Optional[Dict[str, Any]] = None, + depends_on: Optional[List[str]] = None, + parent_node_id: Optional[str] = None, + ) -> Mapping[str, Any]: + """Add a reasoning node for LLM-driven tasks. + + Use case: Create a node that calls an LLM to perform reasoning tasks such as + summarization, translation, drafting, or analysis. + + Args: + id: Unique node identifier. + display_name: Optional node display name. + params: Optional reasoning parameter dictionary. + depends_on: Optional list of upstream node IDs. + parent_node_id: Optional parent node ID for subgraphs or loop scenarios. + + Returns: + A result dictionary containing the status, type, and node ID; returns an error + message on failure. + """ + _log_tool_call( + "add_reasoning_node", + {"id": id, "parent_node_id": parent_node_id}, + ) + duplicate_error = _reject_duplicate_node_id(id) + if duplicate_error: + return _return_tool_result("add_reasoning_node", duplicate_error) + if parent_node_id is not None and not isinstance(parent_node_id, str): + result = _build_validation_error("parent_node_id 需要是字符串或 null。") + return _return_tool_result("add_reasoning_node", result) + if params is not None and not isinstance(params, Mapping): + result = _build_validation_error("reasoning 节点的 params 需要是对象。") + return _return_tool_result("add_reasoning_node", result) + + cleaned_params, removed_fields = _filter_supported_params( + node_type="reasoning", + params=params or {}, + action_schemas=action_schemas, + ) + out_params_schema = cleaned_params.get("expected_output_format") + ref_error = _validate_existing_references( + node_id=id, params=cleaned_params, depends_on=depends_on or [] + ) + if ref_error: + return _return_tool_result("add_reasoning_node", ref_error) + reference_issues, available_nodes = _collect_param_reference_issues(cleaned_params) + if reference_issues: + result = _build_validation_error( + "reasoning params 引用了不存在的节点字段,请修正后重新调用 add_reasoning_node。", + invalid_references=reference_issues, + available_nodes=available_nodes, + ) + return _return_tool_result("add_reasoning_node", result) + builder.add_node( + node_id=id, + node_type="reasoning", + display_name=display_name, + params=cleaned_params, + out_params_schema=out_params_schema, + parent_node_id=parent_node_id if isinstance(parent_node_id, str) else None, + depends_on=depends_on or [], + ) + _reset_workflow_check_state() + removed_node_fields = _sanitize_builder_node_fields(builder, id) + _snapshot(f"add_reasoning_{id}") + if removed_fields or removed_node_fields: + result = _build_validation_error( + "reasoning 节点仅支持 id/type/display_name/params/out_params_schema 字段,params 仅支持 system_prompt/task_prompt/context/expected_output_format/toolset。", + removed_param_fields=removed_fields, + removed_node_fields=removed_node_fields, + node_id=id, + ) + return _return_tool_result("add_reasoning_node", result) + result = {"status": "ok", "type": "node_added", "node_id": id} + return _return_tool_result("add_reasoning_node", result) + @function_tool(strict_mode=False) def add_loop_node( id: str, @@ -1801,6 +1905,92 @@ def update_action_node( result = {"status": "ok", "type": "node_updated", "node_id": id} return _return_tool_result("update_action_node", result) + @function_tool(strict_mode=False) + def update_reasoning_node( + id: str, + display_name: Optional[str] = None, + params: Optional[Dict[str, Any]] = None, + depends_on: Optional[List[str]] = None, + parent_node_id: Optional[str] = None, + ) -> Mapping[str, Any]: + """Update an existing reasoning node. + + Use case: Adjust prompts, expected outputs, or toolset for LLM reasoning tasks. + + Args: + id: Unique node identifier. + display_name: Optional node display name. + params: Optional reasoning parameter dictionary. + depends_on: Optional list of upstream node IDs. + parent_node_id: Optional parent node ID. + + Returns: + A result dictionary containing the status, type, and node ID; returns an error + message on failure. + """ + _log_tool_call( + "update_reasoning_node", + {"id": id, "parent_node_id": parent_node_id}, + ) + precheck = _update_node_common(id, "reasoning") + if precheck: + return _return_tool_result("update_reasoning_node", precheck) + if parent_node_id is not None and not isinstance(parent_node_id, str): + result = _build_validation_error("parent_node_id 需要是字符串或 null。") + return _return_tool_result("update_reasoning_node", result) + if params is not None and not isinstance(params, Mapping): + result = _build_validation_error("reasoning 节点的 params 需要是对象。") + return _return_tool_result("update_reasoning_node", result) + + updates: Dict[str, Any] = {} + if display_name is not None: + updates["display_name"] = display_name + if parent_node_id is not None: + updates["parent_node_id"] = parent_node_id + if params is not None: + cleaned_params, removed_param_fields = _filter_supported_params( + node_type="reasoning", + params=params or {}, + action_schemas=action_schemas, + ) + updates["params"] = cleaned_params + updates["out_params_schema"] = cleaned_params.get("expected_output_format") + reference_issues, available_nodes = _collect_param_reference_issues(cleaned_params) + if reference_issues: + result = _build_validation_error( + "reasoning params 引用了不存在的节点字段,请修正后重新调用 update_reasoning_node。", + invalid_references=reference_issues, + available_nodes=available_nodes, + ) + return _return_tool_result("update_reasoning_node", result) + else: + removed_param_fields = [] + if depends_on is not None: + updates["depends_on"] = depends_on + + ref_error = _validate_existing_references( + node_id=id, + params=cleaned_params if params is not None else None, + depends_on=depends_on or [], + ) + if ref_error: + return _return_tool_result("update_reasoning_node", ref_error) + builder.update_node(id, **updates) + _reset_workflow_check_state() + removed_param_fields.extend(_sanitize_builder_node_params(builder, id, action_schemas)) + removed_node_fields = _sanitize_builder_node_fields(builder, id) + _snapshot(f"update_reasoning_{id}") + if removed_param_fields or removed_node_fields: + result = _build_validation_error( + "reasoning 节点仅支持 id/type/display_name/params/out_params_schema 字段,params 仅支持 system_prompt/task_prompt/context/expected_output_format/toolset。", + removed_fields=removed_param_fields, + removed_node_fields=removed_node_fields, + node_id=id, + ) + return _return_tool_result("update_reasoning_node", result) + result = {"status": "ok", "type": "node_updated", "node_id": id} + return _return_tool_result("update_reasoning_node", result) + @function_tool(strict_mode=False) def update_condition_node( id: str, @@ -2504,7 +2694,10 @@ def update_node_params(id: str, params: Dict[str, Any]) -> Mapping[str, Any]: filled_params[id] = dict(normalized_params) if id not in validated_node_ids: validated_node_ids.append(id) - builder.update_node(id, params=dict(normalized_params)) + updates: Dict[str, Any] = {"params": dict(normalized_params)} + if node.type == "reasoning": + updates["out_params_schema"] = normalized_params.get("expected_output_format") + builder.update_node(id, **updates) _reset_workflow_check_state() _snapshot(f"validate_params_{id}") result = {"status": "ok", "params": normalized_params} @@ -2518,10 +2711,12 @@ def update_node_params(id: str, params: Dict[str, Any]) -> Mapping[str, Any]: list_retrieved_business_action, set_workflow_meta, add_action_node, + add_reasoning_node, add_loop_node, add_condition_node, add_switch_node, update_action_node, + update_reasoning_node, update_condition_node, update_switch_node, update_loop_node, diff --git a/velvetflow/planner/workflow_builder.py b/velvetflow/planner/workflow_builder.py index b0bec3aa..b1d86b2a 100644 --- a/velvetflow/planner/workflow_builder.py +++ b/velvetflow/planner/workflow_builder.py @@ -159,6 +159,16 @@ def add_node( "depends_on": depends_on or [], } ) + elif node_type == "reasoning": + node.update( + { + "display_name": display_name, + "params": params or {}, + "out_params_schema": out_params_schema, + "parent_node_id": parent_node_id, + "depends_on": depends_on or [], + } + ) else: node.update( { diff --git a/velvetflow/verification/binding_checks.py b/velvetflow/verification/binding_checks.py index 0144f711..f4f881ec 100644 --- a/velvetflow/verification/binding_checks.py +++ b/velvetflow/verification/binding_checks.py @@ -54,6 +54,17 @@ def _get_node_output_schema( if isinstance(node_schema, Mapping): return node_schema + if node.get("type") == "reasoning": + params = node.get("params") if isinstance(node.get("params"), Mapping) else {} + expected_format = params.get("expected_output_format") if isinstance(params, Mapping) else None + schema = ( + _schema_from_out_params_schema(expected_format) + if isinstance(expected_format, Mapping) + else None + ) + if isinstance(schema, Mapping): + return schema + action_id = node.get("action_id") if isinstance(action_id, str): action_def = actions_by_id.get(action_id) @@ -197,6 +208,12 @@ def _check_output_path_against_schema( return None return _schema_path_error(schema, list(rest_path)) + if target_type == "reasoning": + schema = _get_node_output_schema(target, actions_by_id) or {} + if not rest_path: + return None + return _schema_path_error(schema, list(rest_path)) + if rest_path: pretty_path = ".".join(str(p) for p in rest_path) return ( diff --git a/velvetflow/verification/node_rules.py b/velvetflow/verification/node_rules.py index 21e204b9..120491fd 100644 --- a/velvetflow/verification/node_rules.py +++ b/velvetflow/verification/node_rules.py @@ -121,6 +121,14 @@ def _collect_collection_attribute_refs(expr: str) -> List[Dict[str, str]]: CONDITION_PARAM_FIELDS = {"expression"} +REASONING_PARAM_FIELDS = { + "system_prompt", + "task_prompt", + "context", + "expected_output_format", + "toolset", +} + LOOP_PARAM_FIELDS = { "loop_kind", "source", @@ -145,6 +153,8 @@ def _filter_params_by_supported_fields( if node_type == "condition": allowed_fields = set(CONDITION_PARAM_FIELDS) + elif node_type == "reasoning": + allowed_fields = set(REASONING_PARAM_FIELDS) elif node_type == "loop": allowed_fields = set(LOOP_PARAM_FIELDS) elif node_type == "action": diff --git a/velvetflow/verification/workflow_validation.py b/velvetflow/verification/workflow_validation.py index 07f3780c..09ca4db5 100644 --- a/velvetflow/verification/workflow_validation.py +++ b/velvetflow/verification/workflow_validation.py @@ -201,20 +201,6 @@ def validate_completed_workflow( schema_to_set = schema - if ( - node.get("action_id") == "common.ask_ai.v1" - and isinstance(schema, Mapping) - and isinstance(params, Mapping) - ): - expected_schema = _parse_expected_format_schema(params.get("expected_format")) - if expected_schema: - schema_to_set = copy.deepcopy(schema) - properties = schema_to_set.setdefault("properties", {}) - properties["results"] = expected_schema - required = schema_to_set.setdefault("required", []) - if "results" not in required: - required.append("results") - if isinstance(schema_to_set, Mapping) and node.get("out_params_schema") != schema_to_set: node["out_params_schema"] = schema_to_set diff --git a/velvetflow/visualization.py b/velvetflow/visualization.py index 89a7afe7..44033125 100644 --- a/velvetflow/visualization.py +++ b/velvetflow/visualization.py @@ -23,6 +23,7 @@ "switch": (121, 85, 72), "loop": (96, 125, 139), "parallel": (0, 150, 136), + "reasoning": (156, 39, 176), } BACKGROUND: RGB = (245, 247, 250)