From c20256a11b9a2f480a78752399abf60bab5d832b Mon Sep 17 00:00:00 2001 From: maksDev123 Date: Wed, 22 Oct 2025 11:59:53 +0300 Subject: [PATCH 1/5] feat: agent automatic context managment --- dynamiq/nodes/agents/agent.py | 47 +++-- dynamiq/nodes/tools/__init__.py | 1 + dynamiq/nodes/tools/context_manager.py | 172 ++++++++++++++++++ .../agents_use_cases/agent_deep_scraping.py | 18 +- 4 files changed, 220 insertions(+), 18 deletions(-) create mode 100644 dynamiq/nodes/tools/context_manager.py diff --git a/dynamiq/nodes/agents/agent.py b/dynamiq/nodes/agents/agent.py index ec9760d78..628195e5f 100644 --- a/dynamiq/nodes/agents/agent.py +++ b/dynamiq/nodes/agents/agent.py @@ -25,6 +25,7 @@ from dynamiq.nodes.agents.utils import SummarizationConfig, ToolCacheEntry, XMLParser from dynamiq.nodes.llms.gemini import Gemini from dynamiq.nodes.node import Node, NodeDependency +from dynamiq.nodes.tools import ContextManagerTool from dynamiq.nodes.types import Behavior, InferenceMode from dynamiq.prompts import Message, MessageRole, VisionMessage, VisionMessageTextContent from dynamiq.runnables import RunnableConfig @@ -765,6 +766,19 @@ def validate_inference_mode(self): return self + @model_validator(mode="after") + def _ensure_context_manager_tool(self): + """Automatically add ContextManagerTool when summarization is enabled.""" + try: + if self.summarization_config.enabled: + has_context_tool = any(isinstance(t, ContextManagerTool) for t in self.tools) + if not has_context_tool: + # Add with a stable name for addressing from the agent + self.tools.append(ContextManagerTool(llm=self.llm, name="context-manager")) + except Exception as e: + logger.error(f"Failed to ensure ContextManagerTool: {e}") + return self + def _parse_thought(self, output: str) -> tuple[str | None, str | None]: """Extracts thought from the output string.""" thought_match = re.search( @@ -1072,15 +1086,23 @@ def get_clone_attr_initializers(self) -> dict[str, Callable[[Node], Any]]: ) return base - def update_system_message(self): - system_message = Message( - role=MessageRole.SYSTEM, - content=self.generate_prompt( - tools_name=self.tool_names, input_formats=self.generate_input_formats(self.tools) - ), - static=True, - ) - self._prompt.messages = [system_message, *self._prompt.messages[1:]] + def _apply_context_manager_tool_effect(self, tool_result: Any, history_offset: int) -> None: + """Apply context cleaning effect after ContextManagerTool call. + + Keeps default prefix (up to history_offset), replaces the rest with a copy of the last prefix message, + and appends an observation with the tool_result summary. + """ + try: + new_messages = self._prompt.messages[:history_offset] + if new_messages: + new_messages.append(new_messages[-1].copy()) + self._prompt.messages = new_messages + + observation = f"\nObservation: {tool_result}\n" + self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) + except Exception: + observation = f"\nObservation: {tool_result}\n" + self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) def _run_agent( self, @@ -1518,8 +1540,11 @@ def _run_agent( except RecoverableAgentException as e: tool_result = f"{type(e).__name__}: {e}" - observation = f"\nObservation: {tool_result}\n" - self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) + if isinstance(tool, ContextManagerTool): + self._apply_context_manager_tool_effect(tool_result, history_offset) + else: + observation = f"\nObservation: {tool_result}\n" + self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: if tool is not None: diff --git a/dynamiq/nodes/tools/__init__.py b/dynamiq/nodes/tools/__init__.py index 8a0878f38..ccb0ebfe3 100644 --- a/dynamiq/nodes/tools/__init__.py +++ b/dynamiq/nodes/tools/__init__.py @@ -1,3 +1,4 @@ +from .context_manager import ContextManagerTool from .e2b_sandbox import E2BInterpreterTool from .exa_search import ExaTool from .file_tools import FileListTool, FileReadTool, FileWriteTool diff --git a/dynamiq/nodes/tools/context_manager.py b/dynamiq/nodes/tools/context_manager.py new file mode 100644 index 000000000..1354a9f8f --- /dev/null +++ b/dynamiq/nodes/tools/context_manager.py @@ -0,0 +1,172 @@ +from typing import Any, ClassVar, Literal + +from pydantic import BaseModel, ConfigDict, Field + +from dynamiq.connections.managers import ConnectionManager +from dynamiq.nodes import ErrorHandling, Node, NodeGroup +from dynamiq.nodes.llms import BaseLLM +from dynamiq.nodes.node import NodeDependency, ensure_config +from dynamiq.prompts import Message, Prompt +from dynamiq.runnables import RunnableConfig, RunnableStatus +from dynamiq.utils.logger import logger + +CONTEXT_MANAGER_PROMPT_TEMPLATE = """ +You are a context compression assistant for an AI agent. + +IMPORTANT: The agent will delete previous message history after this step. You MUST preserve all +essential information needed to continue the task successfully. + +Task: +- Produce a detailed summary that replaces the prior message history. +- Keep only what is necessary to proceed: reasoning overview, current subtasks, saved information and files, next steps, + additional notes. +- Omit chit-chat and non-essential details. Use clear, structured formatting. + +Output strictly in this structure: + +## Reasoning overview of what is reasoning flow + +## Current Subtasks +- [ordered bullets: subtask -> status] + +## Saved information and files +- Inform about filesystem state and files that are saved (if available) + +## Next Steps +- [ordered bullets: next step -> status] + +## Additional Notes: +Any other information that is important to keep in mind and not lost. + +""" + + +class ContextManagerInputSchema(BaseModel): + """Input for ContextManagerTool. + + - history: The recent conversation/messages to compress. Can be a single string or list of strings. + - notes: Verbatim content that must be preserved as-is (not processed by LLM) and prepended to the result. + """ + + history: list[str] | str = Field( + ..., description="Conversation history to be summarized and used to replace prior messages" + ) + notes: str | None = Field( + default=None, + description=( + "Verbatim content to preserve as-is (e.g., IDs, filenames, critical details). " + "This will be prepended unchanged to the output and NOT sent to the LLM." + ), + ) + + +class ContextManagerTool(Node): + """ + A tool to prune previous message history and replace it with a concise summary. + + IMPORTANT: Before calling this tool, ensure any necessary details are explicitly saved + (e.g., files, pinned notes, or artifacts). This tool is intended to remove previous messages + and keep only a structured summary to tighten context and focus on the active subtask. + + Attributes: + group (Literal[NodeGroup.TOOLS]): The group this node belongs to. + name (str): The name of the tool. + description (str): Tool description with usage warning. + llm (BaseLLM): The LLM used to produce the compressed summary. + error_handling (ErrorHandling): Configuration for error handling. + prompt_template (str): Prompt template guiding the summarization. + """ + + group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS + name: str = "Context Manager Tool" + description: str = ( + "Cleans prior message history and replaces it with a concise, self-contained summary.\n\n" + "WARNING: Before calling this tool, the agent must save any necessary information (f.e in FileStore),\n" + "because previous messages will be removed and replaced by the summary." + "You can also provide notes to the tool to preserve important information without being processed by the LLM." + "Make sure to provide all necessary information for the agent to stay on track and" + " not lose any important details." + ) + + llm: BaseLLM = Field(..., description="LLM used to produce the compressed context summary") + error_handling: ErrorHandling = Field(default_factory=lambda: ErrorHandling(timeout_seconds=600)) + prompt_template: str = Field( + default=CONTEXT_MANAGER_PROMPT_TEMPLATE, description="Prompt template for context compression" + ) + + model_config = ConfigDict(arbitrary_types_allowed=True) + input_schema: ClassVar[type[ContextManagerInputSchema]] = ContextManagerInputSchema + + def init_components(self, connection_manager: ConnectionManager | None = None) -> None: + """Initialize components for the tool.""" + connection_manager = connection_manager or ConnectionManager() + super().init_components(connection_manager) + if self.llm.is_postponed_component_init: + self.llm.init_components(connection_manager) + + def reset_run_state(self): + """Reset the intermediate steps (run_depends) of the node.""" + self._run_depends = [] + + @property + def to_dict_exclude_params(self) -> dict: + """Exclude LLM object during serialization.""" + return super().to_dict_exclude_params | {"llm": True} + + def to_dict(self, **kwargs) -> dict: + data = super().to_dict(**kwargs) + data["llm"] = self.llm.to_dict(**kwargs) + return data + + def _build_prompt(self, history: str) -> str: + return self.prompt_template.format(history=history) + + def _normalize_history(self, history: list[str] | str) -> str: + if isinstance(history, list): + return "\n\n---\n\n".join(history) + return str(history) + + def execute( + self, input_data: ContextManagerInputSchema, config: RunnableConfig | None = None, **kwargs + ) -> dict[str, Any]: + """ + Summarize the provided history and emit an instruction to replace prior messages with the summary. + + Returns: + dict[str, Any]: + - content: human-readable status message + - summary: the compressed summary text + - keep_last_n: advisory hint for UI/agent to keep last N messages + - replacement_message: suggested system message to insert as new context root + - instructions_for_agent: explicit instructions for applying the change + """ + config = ensure_config(config) + self.reset_run_state() + self.run_on_node_execute_run(config.callbacks, **kwargs) + + history_text = self._normalize_history(input_data.history) + prompt_content = self._build_prompt(history_text) + + logger.debug(f"Tool {self.name} - {self.id}: starting context compression") + + result = self.llm.run( + input_data={}, + prompt=Prompt(messages=[Message(role="user", content=prompt_content, static=True)]), + config=config, + **(kwargs | {"parent_run_id": kwargs.get("run_id")}), + ) + + self._run_depends = [NodeDependency(node=self.llm).to_dict(for_tracing=True)] + + if result.status != RunnableStatus.SUCCESS: + raise ValueError("LLM execution failed during context compression") + + summary = result.output.get("content", "").strip() + summary = f"\nContext compressed; Summary:\n {summary}" + + if input_data.notes: + summary = f"Notes: {input_data.notes}\n\n{summary}" + + logger.debug(f"Tool {self.name} - {self.id}: context compression completed, summary length: {len(summary)}") + + return {"content": summary, "keep_last_n": input_data.keep_last_n} diff --git a/examples/use_cases/agents_use_cases/agent_deep_scraping.py b/examples/use_cases/agents_use_cases/agent_deep_scraping.py index df86b1997..3c0ea1fb3 100644 --- a/examples/use_cases/agents_use_cases/agent_deep_scraping.py +++ b/examples/use_cases/agents_use_cases/agent_deep_scraping.py @@ -1,9 +1,11 @@ -from dynamiq.connections import Firecrawl +from dynamiq.connections.connections import E2B, ScaleSerp from dynamiq.nodes.agents import Agent from dynamiq.nodes.agents.utils import SummarizationConfig -from dynamiq.nodes.tools.firecrawl import FirecrawlTool +from dynamiq.nodes.tools.e2b_sandbox import E2BInterpreterTool +from dynamiq.nodes.tools.scale_serp import ScaleSerpTool from dynamiq.nodes.types import InferenceMode from dynamiq.storages.file import InMemoryFileStore +from dynamiq.storages.file.base import FileStoreConfig from dynamiq.utils.logger import logger from examples.llm_setup import setup_llm @@ -18,10 +20,12 @@ PROMPT2 = """Create long research on state of AI in EU. Give report for each country.""" if __name__ == "__main__": - connection_firecrawl = Firecrawl() + connection_scale_serp = ScaleSerp() - tool_scrape = FirecrawlTool(connection=connection_firecrawl) - llm = setup_llm(model_provider="claude", model_name="claude-3-7-sonnet-20250219", temperature=0) + tool_scrape = ScaleSerpTool(connection=connection_scale_serp) + e2b = E2B() + tool_code = E2BInterpreterTool(connection=e2b) + llm = setup_llm(model_provider="gpt", model_name="gpt-5", temperature=0) storage = InMemoryFileStore() @@ -29,11 +33,11 @@ name="Agent", id="Agent", llm=llm, - tools=[tool_scrape], + tools=[tool_scrape, tool_code], role=AGENT_ROLE, max_loops=30, inference_mode=InferenceMode.XML, - file_store=storage, + file_store=FileStoreConfig(enabled=True, backend=storage, agent_file_write_enabled=True), summarization_config=SummarizationConfig(enabled=True, max_token_context_length=100000), ) From 9ad923dbcf8a44198be5ffe8e48b4c037e5ad04d Mon Sep 17 00:00:00 2001 From: maksDev123 Date: Mon, 27 Oct 2025 16:30:21 +0200 Subject: [PATCH 2/5] feat: add agentic context management --- dynamiq/nodes/agents/agent.py | 31 ++------ dynamiq/nodes/agents/base.py | 5 ++ dynamiq/nodes/tools/context_manager.py | 77 +++++++++++++------ .../core/dag/agent_file_storage.yaml | 4 +- .../agents_use_cases/agent_deep_scraping.py | 7 +- 5 files changed, 72 insertions(+), 52 deletions(-) diff --git a/dynamiq/nodes/agents/agent.py b/dynamiq/nodes/agents/agent.py index 628195e5f..c496283c2 100644 --- a/dynamiq/nodes/agents/agent.py +++ b/dynamiq/nodes/agents/agent.py @@ -26,6 +26,7 @@ from dynamiq.nodes.llms.gemini import Gemini from dynamiq.nodes.node import Node, NodeDependency from dynamiq.nodes.tools import ContextManagerTool +from dynamiq.nodes.tools.context_manager import _apply_context_manager_tool_effect from dynamiq.nodes.types import Behavior, InferenceMode from dynamiq.prompts import Message, MessageRole, VisionMessage, VisionMessageTextContent from dynamiq.runnables import RunnableConfig @@ -1086,24 +1087,6 @@ def get_clone_attr_initializers(self) -> dict[str, Callable[[Node], Any]]: ) return base - def _apply_context_manager_tool_effect(self, tool_result: Any, history_offset: int) -> None: - """Apply context cleaning effect after ContextManagerTool call. - - Keeps default prefix (up to history_offset), replaces the rest with a copy of the last prefix message, - and appends an observation with the tool_result summary. - """ - try: - new_messages = self._prompt.messages[:history_offset] - if new_messages: - new_messages.append(new_messages[-1].copy()) - self._prompt.messages = new_messages - - observation = f"\nObservation: {tool_result}\n" - self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) - except Exception: - observation = f"\nObservation: {tool_result}\n" - self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) - def _run_agent( self, input_message: Message | VisionMessage, @@ -1502,7 +1485,9 @@ def _run_agent( **kwargs, ) - tool_result, tool_files = self._run_tool(tool, action_input, config, **kwargs) + tool_result, tool_files = self._run_tool( + tool, action_input, config, history_offset=history_offset, **kwargs + ) except RecoverableAgentException as e: tool_result = f"{type(e).__name__}: {e}" @@ -1541,10 +1526,10 @@ def _run_agent( tool_result = f"{type(e).__name__}: {e}" if isinstance(tool, ContextManagerTool): - self._apply_context_manager_tool_effect(tool_result, history_offset) - else: - observation = f"\nObservation: {tool_result}\n" - self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) + _apply_context_manager_tool_effect(self._prompt, tool_result, history_offset) + + observation = f"\nObservation: {tool_result}\n" + self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) if self.streaming.enabled and self.streaming.mode == StreamingMode.ALL: if tool is not None: diff --git a/dynamiq/nodes/agents/base.py b/dynamiq/nodes/agents/base.py index b563adeaf..220286ae2 100644 --- a/dynamiq/nodes/agents/base.py +++ b/dynamiq/nodes/agents/base.py @@ -23,6 +23,7 @@ ) from dynamiq.nodes.llms import BaseLLM from dynamiq.nodes.node import NodeDependency, ensure_config +from dynamiq.nodes.tools import ContextManagerTool from dynamiq.nodes.tools.file_tools import FileListTool, FileReadTool, FileWriteTool from dynamiq.nodes.tools.mcp import MCPServer from dynamiq.nodes.tools.python import Python @@ -923,12 +924,16 @@ def _run_tool( tool_input: dict, config, update_run_depends: bool = True, + history_offset: int = None, collect_dependency: bool = False, **kwargs, ) -> Any: """Runs a specific tool with the given input.""" merged_input = tool_input.copy() if isinstance(tool_input, dict) else {"input": tool_input} + if isinstance(tool, ContextManagerTool): + merged_input["history"] = self._prompt.messages[history_offset:] + raw_tool_params = kwargs.get("tool_params", ToolParams()) tool_params = ( ToolParams.model_validate(raw_tool_params) if isinstance(raw_tool_params, dict) else raw_tool_params diff --git a/dynamiq/nodes/tools/context_manager.py b/dynamiq/nodes/tools/context_manager.py index 1354a9f8f..33c8637a7 100644 --- a/dynamiq/nodes/tools/context_manager.py +++ b/dynamiq/nodes/tools/context_manager.py @@ -45,12 +45,21 @@ class ContextManagerInputSchema(BaseModel): """Input for ContextManagerTool. - history: The recent conversation/messages to compress. Can be a single string or list of strings. + - is_history_preserved: Preserve the history with summarization. If False, the history will not be preserved, + only notes will. - notes: Verbatim content that must be preserved as-is (not processed by LLM) and prepended to the result. """ - history: list[str] | str = Field( + history: list[Message] | None = Field( ..., description="Conversation history to be summarized and used to replace prior messages" ) + + is_history_preserved: bool = Field( + default=True, + description="Preserve the history with summarization. If False, the history will not be preserved," + " only notes will.", + ) + notes: str | None = Field( default=None, description=( @@ -86,6 +95,8 @@ class ContextManagerTool(Node): "You can also provide notes to the tool to preserve important information without being processed by the LLM." "Make sure to provide all necessary information for the agent to stay on track and" " not lose any important details." + "You can also disable history preservation, only notes will be preserved." + "Disable history when you don't care about the history and only want to preserve notes." ) llm: BaseLLM = Field(..., description="LLM used to produce the compressed context summary") @@ -118,13 +129,26 @@ def to_dict(self, **kwargs) -> dict: data["llm"] = self.llm.to_dict(**kwargs) return data - def _build_prompt(self, history: str) -> str: - return self.prompt_template.format(history=history) + def _build_prompt(self, history: list[Message]) -> str: + formatted_history = "\n\n---\n\n".join([f"{m.role}: {str(m.content)}" for m in history]) + return self.prompt_template.format(history=formatted_history) + + def _summarize_history(self, history: list[Message], config: RunnableConfig, **kwargs) -> str: + prompt_content = self._build_prompt(history) + + result = self.llm.run( + input_data={}, + prompt=Prompt(messages=[Message(role="user", content=prompt_content, static=True)]), + config=config, + **(kwargs | {"parent_run_id": kwargs.get("run_id"), "run_depends": []}), + ) + + self._run_depends = [NodeDependency(node=self.llm).to_dict(for_tracing=True)] + + if result.status != RunnableStatus.SUCCESS: + raise ValueError("LLM execution failed during context compression") - def _normalize_history(self, history: list[str] | str) -> str: - if isinstance(history, list): - return "\n\n---\n\n".join(history) - return str(history) + return result.output.get("content", "").strip() def execute( self, input_data: ContextManagerInputSchema, config: RunnableConfig | None = None, **kwargs @@ -144,29 +168,32 @@ def execute( self.reset_run_state() self.run_on_node_execute_run(config.callbacks, **kwargs) - history_text = self._normalize_history(input_data.history) - prompt_content = self._build_prompt(history_text) + summary = "" - logger.debug(f"Tool {self.name} - {self.id}: starting context compression") + if input_data.is_history_preserved: + summary = self._summarize_history(input_data.history, config, **kwargs) + summary = f"\nContext compressed; Summary:\n {summary}" - result = self.llm.run( - input_data={}, - prompt=Prompt(messages=[Message(role="user", content=prompt_content, static=True)]), - config=config, - **(kwargs | {"parent_run_id": kwargs.get("run_id")}), - ) + if input_data.notes: + summary = f"Notes: {input_data.notes}\n\n{summary}" - self._run_depends = [NodeDependency(node=self.llm).to_dict(for_tracing=True)] + logger.debug(f"Tool {self.name} - {self.id}: context compression completed, summary length: {len(summary)}") - if result.status != RunnableStatus.SUCCESS: - raise ValueError("LLM execution failed during context compression") + return {"content": summary} - summary = result.output.get("content", "").strip() - summary = f"\nContext compressed; Summary:\n {summary}" - if input_data.notes: - summary = f"Notes: {input_data.notes}\n\n{summary}" +def _apply_context_manager_tool_effect(prompt: Prompt, tool_result: Any, history_offset: int) -> None: + """Apply context cleaning effect after ContextManagerTool call. - logger.debug(f"Tool {self.name} - {self.id}: context compression completed, summary length: {len(summary)}") + Keeps default prefix (up to history_offset), replaces the rest with a copy of the last prefix message, + and appends an observation with the tool_result summary. + """ + + try: + new_messages = prompt.messages[:history_offset] + if new_messages: + new_messages.append(prompt.messages[-1].copy()) + prompt.messages = new_messages - return {"content": summary, "keep_last_n": input_data.keep_last_n} + except Exception as e: + logger.error(f"Error applying context manager tool effect: {e}") diff --git a/examples/components/core/dag/agent_file_storage.yaml b/examples/components/core/dag/agent_file_storage.yaml index 5d9ea2422..b6a38c4fc 100644 --- a/examples/components/core/dag/agent_file_storage.yaml +++ b/examples/components/core/dag/agent_file_storage.yaml @@ -33,8 +33,10 @@ nodes: backend: type: dynamiq.storages.file.InMemoryFileStore agent_file_write_enabled: true - max_loops: 5 + max_loops: 15 inference_mode: XML + summarization_config: + enabled: true flows: memory-agent-flow: diff --git a/examples/use_cases/agents_use_cases/agent_deep_scraping.py b/examples/use_cases/agents_use_cases/agent_deep_scraping.py index 3c0ea1fb3..453b7f57a 100644 --- a/examples/use_cases/agents_use_cases/agent_deep_scraping.py +++ b/examples/use_cases/agents_use_cases/agent_deep_scraping.py @@ -17,7 +17,8 @@ and generate csv like file with this structure Company Name,Rating,Reviews,Location,Minimum Project Size,Hourly Rate,Company Size,Services Focus.""" -PROMPT2 = """Create long research on state of AI in EU. Give report for each country.""" +PROMPT2 = """Create long research on state of AI in EU. Give report for each country. +Once you saved usefull information you can clean context""" if __name__ == "__main__": connection_scale_serp = ScaleSerp() @@ -25,7 +26,7 @@ tool_scrape = ScaleSerpTool(connection=connection_scale_serp) e2b = E2B() tool_code = E2BInterpreterTool(connection=e2b) - llm = setup_llm(model_provider="gpt", model_name="gpt-5", temperature=0) + llm = setup_llm(model_provider="gpt", model_name="gpt-4o", temperature=0) storage = InMemoryFileStore() @@ -41,7 +42,7 @@ summarization_config=SummarizationConfig(enabled=True, max_token_context_length=100000), ) - result = agent.run(input_data={"input": PROMPT1, "files": None}) + result = agent.run(input_data={"input": PROMPT2, "files": None}) output_content = result.output.get("content") logger.info("RESULT") From 33504dc9e87adf002a7c527abc106787e73995ec Mon Sep 17 00:00:00 2001 From: maksDev123 Date: Mon, 27 Oct 2025 16:37:30 +0200 Subject: [PATCH 3/5] fix: template --- dynamiq/nodes/tools/context_manager.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dynamiq/nodes/tools/context_manager.py b/dynamiq/nodes/tools/context_manager.py index 33c8637a7..bca51704d 100644 --- a/dynamiq/nodes/tools/context_manager.py +++ b/dynamiq/nodes/tools/context_manager.py @@ -22,6 +22,9 @@ additional notes. - Omit chit-chat and non-essential details. Use clear, structured formatting. +History to compress: +{history} + Output strictly in this structure: ## Reasoning overview of what is reasoning flow @@ -91,11 +94,11 @@ class ContextManagerTool(Node): description: str = ( "Cleans prior message history and replaces it with a concise, self-contained summary.\n\n" "WARNING: Before calling this tool, the agent must save any necessary information (f.e in FileStore),\n" - "because previous messages will be removed and replaced by the summary." - "You can also provide notes to the tool to preserve important information without being processed by the LLM." + "because previous messages will be removed and replaced by the summary. " + "You can also provide notes to the tool to preserve important information without being processed by the LLM. " "Make sure to provide all necessary information for the agent to stay on track and" - " not lose any important details." - "You can also disable history preservation, only notes will be preserved." + " not lose any important details. " + "You can also disable history preservation, only notes will be preserved. " "Disable history when you don't care about the history and only want to preserve notes." ) From a5ea31027ae9637869720b4fd7552bf430cd2d95 Mon Sep 17 00:00:00 2001 From: maksDev123 Date: Mon, 27 Oct 2025 20:15:19 +0200 Subject: [PATCH 4/5] fix: history offset --- dynamiq/nodes/agents/agent.py | 17 ++++++----------- dynamiq/nodes/agents/base.py | 8 ++++++-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/dynamiq/nodes/agents/agent.py b/dynamiq/nodes/agents/agent.py index c05282833..b676c216e 100644 --- a/dynamiq/nodes/agents/agent.py +++ b/dynamiq/nodes/agents/agent.py @@ -996,7 +996,6 @@ def is_token_limit_exceeded(self) -> bool: def summarize_history( self, input_message, - history_offset: int, summary_offset: int, config: RunnableConfig | None = None, **kwargs, @@ -1006,7 +1005,6 @@ def summarize_history( Args: input_message (Message | VisionMessage): User request message. - history_offset (int): Offset to the first message in the conversation history within the prompt. summary_offset (int): Offset to the position of the first message in prompt that was not summarized. config (RunnableConfig | None): Configuration for the agent run. **kwargs: Additional parameters for running the agent. @@ -1018,7 +1016,7 @@ def summarize_history( messages_history = "\nHistory to extract information from: \n" summary_sections = [] - offset = max(history_offset, summary_offset - self.summarization_config.context_history_length) + offset = max(self._history_offset, summary_offset - self.summarization_config.context_history_length) for index, message in enumerate(self._prompt.messages[offset:]): if message.role == MessageRole.USER: if (index + offset >= summary_offset) and ("Observation:" in message.content): @@ -1121,7 +1119,8 @@ def _run_agent( else: self._prompt.messages = [system_message, input_message] - summary_offset = history_offset = len(self._prompt.messages) + summary_offset = self._history_offset = len(self._prompt.messages) + stop_sequences = [] if self.inference_mode == InferenceMode.DEFAULT: stop_sequences.extend(["Observation: ", "\nObservation:"]) @@ -1492,9 +1491,7 @@ def _run_agent( **kwargs, ) - tool_result, tool_files = self._run_tool( - tool, action_input, config, history_offset=history_offset, **kwargs - ) + tool_result, tool_files = self._run_tool(tool, action_input, config, **kwargs) except RecoverableAgentException as e: tool_result = f"{type(e).__name__}: {e}" @@ -1533,7 +1530,7 @@ def _run_agent( tool_result = f"{type(e).__name__}: {e}" if isinstance(tool, ContextManagerTool): - _apply_context_manager_tool_effect(self._prompt, tool_result, history_offset) + _apply_context_manager_tool_effect(self._prompt, tool_result, self._history_offset) observation = f"\nObservation: {tool_result}\n" self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True)) @@ -1618,9 +1615,7 @@ def _run_agent( if self.summarization_config.enabled: if self.is_token_limit_exceeded(): - summary_offset = self.summarize_history( - input_message, history_offset, summary_offset, config=config, **kwargs - ) + summary_offset = self.summarize_history(input_message, summary_offset, config=config, **kwargs) if self.behaviour_on_max_loops == Behavior.RAISE: error_message = ( diff --git a/dynamiq/nodes/agents/base.py b/dynamiq/nodes/agents/base.py index 220286ae2..45870f238 100644 --- a/dynamiq/nodes/agents/base.py +++ b/dynamiq/nodes/agents/base.py @@ -386,6 +386,11 @@ class Agent(Node): _mcp_servers: list[MCPServer] = PrivateAttr(default_factory=list) _mcp_server_tool_ids: list[str] = PrivateAttr(default_factory=list) _tool_cache: dict[ToolCacheEntry, Any] = {} + _history_offset: int = PrivateAttr( + default=2, + description="Offset to the first message in the conversation history within the prompt. " + "By default, it is 2, which is the offset to the system message and the first user message. ", + ) model_config = ConfigDict(arbitrary_types_allowed=True) input_schema: ClassVar[type[AgentInputSchema]] = AgentInputSchema @@ -924,7 +929,6 @@ def _run_tool( tool_input: dict, config, update_run_depends: bool = True, - history_offset: int = None, collect_dependency: bool = False, **kwargs, ) -> Any: @@ -932,7 +936,7 @@ def _run_tool( merged_input = tool_input.copy() if isinstance(tool_input, dict) else {"input": tool_input} if isinstance(tool, ContextManagerTool): - merged_input["history"] = self._prompt.messages[history_offset:] + merged_input["history"] = self._prompt.messages[self._history_offset :] raw_tool_params = kwargs.get("tool_params", ToolParams()) tool_params = ( From e5e5a190af521507aaf6cd6c45e29e28ec8c8393 Mon Sep 17 00:00:00 2001 From: maksDev123 Date: Mon, 27 Oct 2025 21:39:20 +0200 Subject: [PATCH 5/5] fix: attribute description --- dynamiq/nodes/agents/base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dynamiq/nodes/agents/base.py b/dynamiq/nodes/agents/base.py index 45870f238..79646dc54 100644 --- a/dynamiq/nodes/agents/base.py +++ b/dynamiq/nodes/agents/base.py @@ -387,9 +387,7 @@ class Agent(Node): _mcp_server_tool_ids: list[str] = PrivateAttr(default_factory=list) _tool_cache: dict[ToolCacheEntry, Any] = {} _history_offset: int = PrivateAttr( - default=2, - description="Offset to the first message in the conversation history within the prompt. " - "By default, it is 2, which is the offset to the system message and the first user message. ", + default=2, # Offset to the first message (default: 2 — system and initial user messages). ) model_config = ConfigDict(arbitrary_types_allowed=True)