diff --git a/dynamiq/nodes/agents/agent.py b/dynamiq/nodes/agents/agent.py index 3f363ebb7..b676c216e 100644 --- a/dynamiq/nodes/agents/agent.py +++ b/dynamiq/nodes/agents/agent.py @@ -25,6 +25,8 @@ from dynamiq.nodes.agents.utils import SummarizationConfig, ToolCacheEntry, XMLParser from dynamiq.nodes.llms.gemini import Gemini from dynamiq.nodes.node import Node, NodeDependency +from dynamiq.nodes.tools import ContextManagerTool +from dynamiq.nodes.tools.context_manager import _apply_context_manager_tool_effect from dynamiq.nodes.types import Behavior, InferenceMode from dynamiq.prompts import Message, MessageRole, VisionMessage, VisionMessageTextContent from dynamiq.runnables import RunnableConfig @@ -765,6 +767,19 @@ def validate_inference_mode(self): return self + @model_validator(mode="after") + def _ensure_context_manager_tool(self): + """Automatically add ContextManagerTool when summarization is enabled.""" + try: + if self.summarization_config.enabled: + has_context_tool = any(isinstance(t, ContextManagerTool) for t in self.tools) + if not has_context_tool: + # Add with a stable name for addressing from the agent + self.tools.append(ContextManagerTool(llm=self.llm, name="context-manager")) + except Exception as e: + logger.error(f"Failed to ensure ContextManagerTool: {e}") + return self + def _parse_thought(self, output: str) -> tuple[str | None, str | None]: """Extracts thought from the output string.""" thought_match = re.search( @@ -981,7 +996,6 @@ def is_token_limit_exceeded(self) -> bool: def summarize_history( self, input_message, - history_offset: int, summary_offset: int, config: RunnableConfig | None = None, **kwargs, @@ -991,7 +1005,6 @@ def summarize_history( Args: input_message (Message | VisionMessage): User request message. - history_offset (int): Offset to the first message in the conversation history within the prompt. summary_offset (int): Offset to the position of the first message in prompt that was not summarized. config (RunnableConfig | None): Configuration for the agent run. **kwargs: Additional parameters for running the agent. @@ -1003,7 +1016,7 @@ def summarize_history( messages_history = "\nHistory to extract information from: \n" summary_sections = [] - offset = max(history_offset, summary_offset - self.summarization_config.context_history_length) + offset = max(self._history_offset, summary_offset - self.summarization_config.context_history_length) for index, message in enumerate(self._prompt.messages[offset:]): if message.role == MessageRole.USER: if (index + offset >= summary_offset) and ("Observation:" in message.content): @@ -1072,16 +1085,6 @@ def get_clone_attr_initializers(self) -> dict[str, Callable[[Node], Any]]: ) return base - def update_system_message(self): - system_message = Message( - role=MessageRole.SYSTEM, - content=self.generate_prompt( - tools_name=self.tool_names, input_formats=self.generate_input_formats(self.tools) - ), - static=True, - ) - self._prompt.messages = [system_message, *self._prompt.messages[1:]] - def _run_agent( self, input_message: Message | VisionMessage, @@ -1116,7 +1119,8 @@ def _run_agent( else: self._prompt.messages = [system_message, input_message] - summary_offset = history_offset = len(self._prompt.messages) + summary_offset = self._history_offset = len(self._prompt.messages) + stop_sequences = [] if self.inference_mode == InferenceMode.DEFAULT: stop_sequences.extend(["Observation: ", "\nObservation:"]) @@ -1525,6 +1529,9 @@ def _run_agent( except RecoverableAgentException as e: tool_result = f"{type(e).__name__}: {e}" + if isinstance(tool, ContextManagerTool): + _apply_context_manager_tool_effect(self._prompt, tool_result, self._history_offset) + observation = f"\nObservation: {tool_result}\n" self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) @@ -1608,9 +1615,7 @@ def _run_agent( if self.summarization_config.enabled: if self.is_token_limit_exceeded(): - summary_offset = self.summarize_history( - input_message, history_offset, summary_offset, config=config, **kwargs - ) + summary_offset = self.summarize_history(input_message, summary_offset, config=config, **kwargs) if self.behaviour_on_max_loops == Behavior.RAISE: error_message = ( diff --git a/dynamiq/nodes/agents/base.py b/dynamiq/nodes/agents/base.py index b563adeaf..79646dc54 100644 --- a/dynamiq/nodes/agents/base.py +++ b/dynamiq/nodes/agents/base.py @@ -23,6 +23,7 @@ ) from dynamiq.nodes.llms import BaseLLM from dynamiq.nodes.node import NodeDependency, ensure_config +from dynamiq.nodes.tools import ContextManagerTool from dynamiq.nodes.tools.file_tools import FileListTool, FileReadTool, FileWriteTool from dynamiq.nodes.tools.mcp import MCPServer from dynamiq.nodes.tools.python import Python @@ -385,6 +386,9 @@ class Agent(Node): _mcp_servers: list[MCPServer] = PrivateAttr(default_factory=list) _mcp_server_tool_ids: list[str] = PrivateAttr(default_factory=list) _tool_cache: dict[ToolCacheEntry, Any] = {} + _history_offset: int = PrivateAttr( + default=2, # Offset to the first message (default: 2 — system and initial user messages). + ) model_config = ConfigDict(arbitrary_types_allowed=True) input_schema: ClassVar[type[AgentInputSchema]] = AgentInputSchema @@ -929,6 +933,9 @@ def _run_tool( """Runs a specific tool with the given input.""" merged_input = tool_input.copy() if isinstance(tool_input, dict) else {"input": tool_input} + if isinstance(tool, ContextManagerTool): + merged_input["history"] = self._prompt.messages[self._history_offset :] + raw_tool_params = kwargs.get("tool_params", ToolParams()) tool_params = ( ToolParams.model_validate(raw_tool_params) if isinstance(raw_tool_params, dict) else raw_tool_params diff --git a/dynamiq/nodes/tools/__init__.py b/dynamiq/nodes/tools/__init__.py index 8a0878f38..ccb0ebfe3 100644 --- a/dynamiq/nodes/tools/__init__.py +++ b/dynamiq/nodes/tools/__init__.py @@ -1,3 +1,4 @@ +from .context_manager import ContextManagerTool from .e2b_sandbox import E2BInterpreterTool from .exa_search import ExaTool from .file_tools import FileListTool, FileReadTool, FileWriteTool diff --git a/dynamiq/nodes/tools/context_manager.py b/dynamiq/nodes/tools/context_manager.py new file mode 100644 index 000000000..bca51704d --- /dev/null +++ b/dynamiq/nodes/tools/context_manager.py @@ -0,0 +1,202 @@ +from typing import Any, ClassVar, Literal + +from pydantic import BaseModel, ConfigDict, Field + +from dynamiq.connections.managers import ConnectionManager +from dynamiq.nodes import ErrorHandling, Node, NodeGroup +from dynamiq.nodes.llms import BaseLLM +from dynamiq.nodes.node import NodeDependency, ensure_config +from dynamiq.prompts import Message, Prompt +from dynamiq.runnables import RunnableConfig, RunnableStatus +from dynamiq.utils.logger import logger + +CONTEXT_MANAGER_PROMPT_TEMPLATE = """ +You are a context compression assistant for an AI agent. + +IMPORTANT: The agent will delete previous message history after this step. You MUST preserve all +essential information needed to continue the task successfully. + +Task: +- Produce a detailed summary that replaces the prior message history. +- Keep only what is necessary to proceed: reasoning overview, current subtasks, saved information and files, next steps, + additional notes. +- Omit chit-chat and non-essential details. Use clear, structured formatting. + +History to compress: +{history} + +Output strictly in this structure: + +## Reasoning overview of what is reasoning flow + +## Current Subtasks +- [ordered bullets: subtask -> status] + +## Saved information and files +- Inform about filesystem state and files that are saved (if available) + +## Next Steps +- [ordered bullets: next step -> status] + +## Additional Notes: +Any other information that is important to keep in mind and not lost. + +""" + + +class ContextManagerInputSchema(BaseModel): + """Input for ContextManagerTool. + + - history: The recent conversation/messages to compress. Can be a single string or list of strings. + - is_history_preserved: Preserve the history with summarization. If False, the history will not be preserved, + only notes will. + - notes: Verbatim content that must be preserved as-is (not processed by LLM) and prepended to the result. + """ + + history: list[Message] | None = Field( + ..., description="Conversation history to be summarized and used to replace prior messages" + ) + + is_history_preserved: bool = Field( + default=True, + description="Preserve the history with summarization. If False, the history will not be preserved," + " only notes will.", + ) + + notes: str | None = Field( + default=None, + description=( + "Verbatim content to preserve as-is (e.g., IDs, filenames, critical details). " + "This will be prepended unchanged to the output and NOT sent to the LLM." + ), + ) + + +class ContextManagerTool(Node): + """ + A tool to prune previous message history and replace it with a concise summary. + + IMPORTANT: Before calling this tool, ensure any necessary details are explicitly saved + (e.g., files, pinned notes, or artifacts). This tool is intended to remove previous messages + and keep only a structured summary to tighten context and focus on the active subtask. + + Attributes: + group (Literal[NodeGroup.TOOLS]): The group this node belongs to. + name (str): The name of the tool. + description (str): Tool description with usage warning. + llm (BaseLLM): The LLM used to produce the compressed summary. + error_handling (ErrorHandling): Configuration for error handling. + prompt_template (str): Prompt template guiding the summarization. + """ + + group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS + name: str = "Context Manager Tool" + description: str = ( + "Cleans prior message history and replaces it with a concise, self-contained summary.\n\n" + "WARNING: Before calling this tool, the agent must save any necessary information (f.e in FileStore),\n" + "because previous messages will be removed and replaced by the summary. " + "You can also provide notes to the tool to preserve important information without being processed by the LLM. " + "Make sure to provide all necessary information for the agent to stay on track and" + " not lose any important details. " + "You can also disable history preservation, only notes will be preserved. " + "Disable history when you don't care about the history and only want to preserve notes." + ) + + llm: BaseLLM = Field(..., description="LLM used to produce the compressed context summary") + error_handling: ErrorHandling = Field(default_factory=lambda: ErrorHandling(timeout_seconds=600)) + prompt_template: str = Field( + default=CONTEXT_MANAGER_PROMPT_TEMPLATE, description="Prompt template for context compression" + ) + + model_config = ConfigDict(arbitrary_types_allowed=True) + input_schema: ClassVar[type[ContextManagerInputSchema]] = ContextManagerInputSchema + + def init_components(self, connection_manager: ConnectionManager | None = None) -> None: + """Initialize components for the tool.""" + connection_manager = connection_manager or ConnectionManager() + super().init_components(connection_manager) + if self.llm.is_postponed_component_init: + self.llm.init_components(connection_manager) + + def reset_run_state(self): + """Reset the intermediate steps (run_depends) of the node.""" + self._run_depends = [] + + @property + def to_dict_exclude_params(self) -> dict: + """Exclude LLM object during serialization.""" + return super().to_dict_exclude_params | {"llm": True} + + def to_dict(self, **kwargs) -> dict: + data = super().to_dict(**kwargs) + data["llm"] = self.llm.to_dict(**kwargs) + return data + + def _build_prompt(self, history: list[Message]) -> str: + formatted_history = "\n\n---\n\n".join([f"{m.role}: {str(m.content)}" for m in history]) + return self.prompt_template.format(history=formatted_history) + + def _summarize_history(self, history: list[Message], config: RunnableConfig, **kwargs) -> str: + prompt_content = self._build_prompt(history) + + result = self.llm.run( + input_data={}, + prompt=Prompt(messages=[Message(role="user", content=prompt_content, static=True)]), + config=config, + **(kwargs | {"parent_run_id": kwargs.get("run_id"), "run_depends": []}), + ) + + self._run_depends = [NodeDependency(node=self.llm).to_dict(for_tracing=True)] + + if result.status != RunnableStatus.SUCCESS: + raise ValueError("LLM execution failed during context compression") + + return result.output.get("content", "").strip() + + def execute( + self, input_data: ContextManagerInputSchema, config: RunnableConfig | None = None, **kwargs + ) -> dict[str, Any]: + """ + Summarize the provided history and emit an instruction to replace prior messages with the summary. + + Returns: + dict[str, Any]: + - content: human-readable status message + - summary: the compressed summary text + - keep_last_n: advisory hint for UI/agent to keep last N messages + - replacement_message: suggested system message to insert as new context root + - instructions_for_agent: explicit instructions for applying the change + """ + config = ensure_config(config) + self.reset_run_state() + self.run_on_node_execute_run(config.callbacks, **kwargs) + + summary = "" + + if input_data.is_history_preserved: + summary = self._summarize_history(input_data.history, config, **kwargs) + summary = f"\nContext compressed; Summary:\n {summary}" + + if input_data.notes: + summary = f"Notes: {input_data.notes}\n\n{summary}" + + logger.debug(f"Tool {self.name} - {self.id}: context compression completed, summary length: {len(summary)}") + + return {"content": summary} + + +def _apply_context_manager_tool_effect(prompt: Prompt, tool_result: Any, history_offset: int) -> None: + """Apply context cleaning effect after ContextManagerTool call. + + Keeps default prefix (up to history_offset), replaces the rest with a copy of the last prefix message, + and appends an observation with the tool_result summary. + """ + + try: + new_messages = prompt.messages[:history_offset] + if new_messages: + new_messages.append(prompt.messages[-1].copy()) + prompt.messages = new_messages + + except Exception as e: + logger.error(f"Error applying context manager tool effect: {e}") diff --git a/examples/components/core/dag/agent_file_storage.yaml b/examples/components/core/dag/agent_file_storage.yaml index 5d9ea2422..b6a38c4fc 100644 --- a/examples/components/core/dag/agent_file_storage.yaml +++ b/examples/components/core/dag/agent_file_storage.yaml @@ -33,8 +33,10 @@ nodes: backend: type: dynamiq.storages.file.InMemoryFileStore agent_file_write_enabled: true - max_loops: 5 + max_loops: 15 inference_mode: XML + summarization_config: + enabled: true flows: memory-agent-flow: diff --git a/examples/use_cases/agents_use_cases/agent_deep_scraping.py b/examples/use_cases/agents_use_cases/agent_deep_scraping.py index df86b1997..453b7f57a 100644 --- a/examples/use_cases/agents_use_cases/agent_deep_scraping.py +++ b/examples/use_cases/agents_use_cases/agent_deep_scraping.py @@ -1,9 +1,11 @@ -from dynamiq.connections import Firecrawl +from dynamiq.connections.connections import E2B, ScaleSerp from dynamiq.nodes.agents import Agent from dynamiq.nodes.agents.utils import SummarizationConfig -from dynamiq.nodes.tools.firecrawl import FirecrawlTool +from dynamiq.nodes.tools.e2b_sandbox import E2BInterpreterTool +from dynamiq.nodes.tools.scale_serp import ScaleSerpTool from dynamiq.nodes.types import InferenceMode from dynamiq.storages.file import InMemoryFileStore +from dynamiq.storages.file.base import FileStoreConfig from dynamiq.utils.logger import logger from examples.llm_setup import setup_llm @@ -15,13 +17,16 @@ and generate csv like file with this structure Company Name,Rating,Reviews,Location,Minimum Project Size,Hourly Rate,Company Size,Services Focus.""" -PROMPT2 = """Create long research on state of AI in EU. Give report for each country.""" +PROMPT2 = """Create long research on state of AI in EU. Give report for each country. +Once you saved usefull information you can clean context""" if __name__ == "__main__": - connection_firecrawl = Firecrawl() + connection_scale_serp = ScaleSerp() - tool_scrape = FirecrawlTool(connection=connection_firecrawl) - llm = setup_llm(model_provider="claude", model_name="claude-3-7-sonnet-20250219", temperature=0) + tool_scrape = ScaleSerpTool(connection=connection_scale_serp) + e2b = E2B() + tool_code = E2BInterpreterTool(connection=e2b) + llm = setup_llm(model_provider="gpt", model_name="gpt-4o", temperature=0) storage = InMemoryFileStore() @@ -29,15 +34,15 @@ name="Agent", id="Agent", llm=llm, - tools=[tool_scrape], + tools=[tool_scrape, tool_code], role=AGENT_ROLE, max_loops=30, inference_mode=InferenceMode.XML, - file_store=storage, + file_store=FileStoreConfig(enabled=True, backend=storage, agent_file_write_enabled=True), summarization_config=SummarizationConfig(enabled=True, max_token_context_length=100000), ) - result = agent.run(input_data={"input": PROMPT1, "files": None}) + result = agent.run(input_data={"input": PROMPT2, "files": None}) output_content = result.output.get("content") logger.info("RESULT")