Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions dynamiq/nodes/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from dynamiq.nodes.agents.utils import SummarizationConfig, ToolCacheEntry, XMLParser
from dynamiq.nodes.llms.gemini import Gemini
from dynamiq.nodes.node import Node, NodeDependency
from dynamiq.nodes.tools import ContextManagerTool
from dynamiq.nodes.tools.context_manager import _apply_context_manager_tool_effect
from dynamiq.nodes.types import Behavior, InferenceMode
from dynamiq.prompts import Message, MessageRole, VisionMessage, VisionMessageTextContent
from dynamiq.runnables import RunnableConfig
Expand Down Expand Up @@ -765,6 +767,19 @@ def validate_inference_mode(self):

return self

@model_validator(mode="after")
def _ensure_context_manager_tool(self):
"""Automatically add ContextManagerTool when summarization is enabled."""
try:
if self.summarization_config.enabled:
has_context_tool = any(isinstance(t, ContextManagerTool) for t in self.tools)
if not has_context_tool:
# Add with a stable name for addressing from the agent
self.tools.append(ContextManagerTool(llm=self.llm, name="context-manager"))
except Exception as e:
logger.error(f"Failed to ensure ContextManagerTool: {e}")
return self

def _parse_thought(self, output: str) -> tuple[str | None, str | None]:
"""Extracts thought from the output string."""
thought_match = re.search(
Expand Down Expand Up @@ -981,7 +996,6 @@ def is_token_limit_exceeded(self) -> bool:
def summarize_history(
self,
input_message,
history_offset: int,
summary_offset: int,
config: RunnableConfig | None = None,
**kwargs,
Expand All @@ -991,7 +1005,6 @@ def summarize_history(

Args:
input_message (Message | VisionMessage): User request message.
history_offset (int): Offset to the first message in the conversation history within the prompt.
summary_offset (int): Offset to the position of the first message in prompt that was not summarized.
config (RunnableConfig | None): Configuration for the agent run.
**kwargs: Additional parameters for running the agent.
Expand All @@ -1003,7 +1016,7 @@ def summarize_history(
messages_history = "\nHistory to extract information from: \n"
summary_sections = []

offset = max(history_offset, summary_offset - self.summarization_config.context_history_length)
offset = max(self._history_offset, summary_offset - self.summarization_config.context_history_length)
for index, message in enumerate(self._prompt.messages[offset:]):
if message.role == MessageRole.USER:
if (index + offset >= summary_offset) and ("Observation:" in message.content):
Expand Down Expand Up @@ -1072,16 +1085,6 @@ def get_clone_attr_initializers(self) -> dict[str, Callable[[Node], Any]]:
)
return base

def update_system_message(self):
system_message = Message(
role=MessageRole.SYSTEM,
content=self.generate_prompt(
tools_name=self.tool_names, input_formats=self.generate_input_formats(self.tools)
),
static=True,
)
self._prompt.messages = [system_message, *self._prompt.messages[1:]]

def _run_agent(
self,
input_message: Message | VisionMessage,
Expand Down Expand Up @@ -1116,7 +1119,8 @@ def _run_agent(
else:
self._prompt.messages = [system_message, input_message]

summary_offset = history_offset = len(self._prompt.messages)
summary_offset = self._history_offset = len(self._prompt.messages)

stop_sequences = []
if self.inference_mode == InferenceMode.DEFAULT:
stop_sequences.extend(["Observation: ", "\nObservation:"])
Expand Down Expand Up @@ -1525,6 +1529,9 @@ def _run_agent(
except RecoverableAgentException as e:
tool_result = f"{type(e).__name__}: {e}"

if isinstance(tool, ContextManagerTool):
_apply_context_manager_tool_effect(self._prompt, tool_result, self._history_offset)

observation = f"\nObservation: {tool_result}\n"
self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True))

Expand Down Expand Up @@ -1608,9 +1615,7 @@ def _run_agent(

if self.summarization_config.enabled:
if self.is_token_limit_exceeded():
summary_offset = self.summarize_history(
input_message, history_offset, summary_offset, config=config, **kwargs
)
summary_offset = self.summarize_history(input_message, summary_offset, config=config, **kwargs)

if self.behaviour_on_max_loops == Behavior.RAISE:
error_message = (
Expand Down
7 changes: 7 additions & 0 deletions dynamiq/nodes/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from dynamiq.nodes.llms import BaseLLM
from dynamiq.nodes.node import NodeDependency, ensure_config
from dynamiq.nodes.tools import ContextManagerTool
from dynamiq.nodes.tools.file_tools import FileListTool, FileReadTool, FileWriteTool
from dynamiq.nodes.tools.mcp import MCPServer
from dynamiq.nodes.tools.python import Python
Expand Down Expand Up @@ -385,6 +386,9 @@ class Agent(Node):
_mcp_servers: list[MCPServer] = PrivateAttr(default_factory=list)
_mcp_server_tool_ids: list[str] = PrivateAttr(default_factory=list)
_tool_cache: dict[ToolCacheEntry, Any] = {}
_history_offset: int = PrivateAttr(
default=2, # Offset to the first message (default: 2 — system and initial user messages).
)

model_config = ConfigDict(arbitrary_types_allowed=True)
input_schema: ClassVar[type[AgentInputSchema]] = AgentInputSchema
Expand Down Expand Up @@ -929,6 +933,9 @@ def _run_tool(
"""Runs a specific tool with the given input."""
merged_input = tool_input.copy() if isinstance(tool_input, dict) else {"input": tool_input}

if isinstance(tool, ContextManagerTool):
merged_input["history"] = self._prompt.messages[self._history_offset :]

raw_tool_params = kwargs.get("tool_params", ToolParams())
tool_params = (
ToolParams.model_validate(raw_tool_params) if isinstance(raw_tool_params, dict) else raw_tool_params
Expand Down
1 change: 1 addition & 0 deletions dynamiq/nodes/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .context_manager import ContextManagerTool
from .e2b_sandbox import E2BInterpreterTool
from .exa_search import ExaTool
from .file_tools import FileListTool, FileReadTool, FileWriteTool
Expand Down
202 changes: 202 additions & 0 deletions dynamiq/nodes/tools/context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
from typing import Any, ClassVar, Literal

from pydantic import BaseModel, ConfigDict, Field

from dynamiq.connections.managers import ConnectionManager
from dynamiq.nodes import ErrorHandling, Node, NodeGroup
from dynamiq.nodes.llms import BaseLLM
from dynamiq.nodes.node import NodeDependency, ensure_config
from dynamiq.prompts import Message, Prompt
from dynamiq.runnables import RunnableConfig, RunnableStatus
from dynamiq.utils.logger import logger

CONTEXT_MANAGER_PROMPT_TEMPLATE = """
You are a context compression assistant for an AI agent.

IMPORTANT: The agent will delete previous message history after this step. You MUST preserve all
essential information needed to continue the task successfully.

Task:
- Produce a detailed summary that replaces the prior message history.
- Keep only what is necessary to proceed: reasoning overview, current subtasks, saved information and files, next steps,
additional notes.
- Omit chit-chat and non-essential details. Use clear, structured formatting.

History to compress:
{history}

Output strictly in this structure:

## Reasoning overview of what is reasoning flow

## Current Subtasks
- [ordered bullets: subtask -> status]

## Saved information and files
- Inform about filesystem state and files that are saved (if available)

## Next Steps
- [ordered bullets: next step -> status]

## Additional Notes:
Any other information that is important to keep in mind and not lost.

"""


class ContextManagerInputSchema(BaseModel):
"""Input for ContextManagerTool.

- history: The recent conversation/messages to compress. Can be a single string or list of strings.
- is_history_preserved: Preserve the history with summarization. If False, the history will not be preserved,
only notes will.
- notes: Verbatim content that must be preserved as-is (not processed by LLM) and prepended to the result.
"""

history: list[Message] | None = Field(
..., description="Conversation history to be summarized and used to replace prior messages"
)

is_history_preserved: bool = Field(
default=True,
description="Preserve the history with summarization. If False, the history will not be preserved,"
" only notes will.",
)

notes: str | None = Field(
default=None,
description=(
"Verbatim content to preserve as-is (e.g., IDs, filenames, critical details). "
"This will be prepended unchanged to the output and NOT sent to the LLM."
),
)


class ContextManagerTool(Node):
"""
A tool to prune previous message history and replace it with a concise summary.

IMPORTANT: Before calling this tool, ensure any necessary details are explicitly saved
(e.g., files, pinned notes, or artifacts). This tool is intended to remove previous messages
and keep only a structured summary to tighten context and focus on the active subtask.

Attributes:
group (Literal[NodeGroup.TOOLS]): The group this node belongs to.
name (str): The name of the tool.
description (str): Tool description with usage warning.
llm (BaseLLM): The LLM used to produce the compressed summary.
error_handling (ErrorHandling): Configuration for error handling.
prompt_template (str): Prompt template guiding the summarization.
"""

group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS
name: str = "Context Manager Tool"
description: str = (
"Cleans prior message history and replaces it with a concise, self-contained summary.\n\n"
"WARNING: Before calling this tool, the agent must save any necessary information (f.e in FileStore),\n"
"because previous messages will be removed and replaced by the summary. "
"You can also provide notes to the tool to preserve important information without being processed by the LLM. "
"Make sure to provide all necessary information for the agent to stay on track and"
" not lose any important details. "
"You can also disable history preservation, only notes will be preserved. "
"Disable history when you don't care about the history and only want to preserve notes."
)

llm: BaseLLM = Field(..., description="LLM used to produce the compressed context summary")
error_handling: ErrorHandling = Field(default_factory=lambda: ErrorHandling(timeout_seconds=600))
prompt_template: str = Field(
default=CONTEXT_MANAGER_PROMPT_TEMPLATE, description="Prompt template for context compression"
)

model_config = ConfigDict(arbitrary_types_allowed=True)
input_schema: ClassVar[type[ContextManagerInputSchema]] = ContextManagerInputSchema

def init_components(self, connection_manager: ConnectionManager | None = None) -> None:
"""Initialize components for the tool."""
connection_manager = connection_manager or ConnectionManager()
super().init_components(connection_manager)
if self.llm.is_postponed_component_init:
self.llm.init_components(connection_manager)

def reset_run_state(self):
"""Reset the intermediate steps (run_depends) of the node."""
self._run_depends = []

@property
def to_dict_exclude_params(self) -> dict:
"""Exclude LLM object during serialization."""
return super().to_dict_exclude_params | {"llm": True}

def to_dict(self, **kwargs) -> dict:
data = super().to_dict(**kwargs)
data["llm"] = self.llm.to_dict(**kwargs)
return data

def _build_prompt(self, history: list[Message]) -> str:
formatted_history = "\n\n---\n\n".join([f"{m.role}: {str(m.content)}" for m in history])
return self.prompt_template.format(history=formatted_history)

def _summarize_history(self, history: list[Message], config: RunnableConfig, **kwargs) -> str:
prompt_content = self._build_prompt(history)

result = self.llm.run(
input_data={},
prompt=Prompt(messages=[Message(role="user", content=prompt_content, static=True)]),
config=config,
**(kwargs | {"parent_run_id": kwargs.get("run_id"), "run_depends": []}),
)

self._run_depends = [NodeDependency(node=self.llm).to_dict(for_tracing=True)]

if result.status != RunnableStatus.SUCCESS:
raise ValueError("LLM execution failed during context compression")

return result.output.get("content", "").strip()

def execute(
self, input_data: ContextManagerInputSchema, config: RunnableConfig | None = None, **kwargs
) -> dict[str, Any]:
"""
Summarize the provided history and emit an instruction to replace prior messages with the summary.

Returns:
dict[str, Any]:
- content: human-readable status message
- summary: the compressed summary text
- keep_last_n: advisory hint for UI/agent to keep last N messages
- replacement_message: suggested system message to insert as new context root
- instructions_for_agent: explicit instructions for applying the change
"""
config = ensure_config(config)
self.reset_run_state()
self.run_on_node_execute_run(config.callbacks, **kwargs)

summary = ""

if input_data.is_history_preserved:
summary = self._summarize_history(input_data.history, config, **kwargs)
summary = f"\nContext compressed; Summary:\n {summary}"

if input_data.notes:
summary = f"Notes: {input_data.notes}\n\n{summary}"

logger.debug(f"Tool {self.name} - {self.id}: context compression completed, summary length: {len(summary)}")

return {"content": summary}


def _apply_context_manager_tool_effect(prompt: Prompt, tool_result: Any, history_offset: int) -> None:
"""Apply context cleaning effect after ContextManagerTool call.

Keeps default prefix (up to history_offset), replaces the rest with a copy of the last prefix message,
and appends an observation with the tool_result summary.
"""

try:
new_messages = prompt.messages[:history_offset]
if new_messages:
new_messages.append(prompt.messages[-1].copy())
prompt.messages = new_messages

except Exception as e:
logger.error(f"Error applying context manager tool effect: {e}")
4 changes: 3 additions & 1 deletion examples/components/core/dag/agent_file_storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ nodes:
backend:
type: dynamiq.storages.file.InMemoryFileStore
agent_file_write_enabled: true
max_loops: 5
max_loops: 15
inference_mode: XML
summarization_config:
enabled: true

flows:
memory-agent-flow:
Expand Down
Loading
Loading