Skip to content

Commit ba9333c

Browse files
feat: agent automatic context management
1 parent 5fc2cf5 commit ba9333c

File tree

6 files changed

+249
-27
lines changed

6 files changed

+249
-27
lines changed

dynamiq/nodes/agents/agent.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
from dynamiq.nodes.agents.utils import SummarizationConfig, ToolCacheEntry, XMLParser
2626
from dynamiq.nodes.llms.gemini import Gemini
2727
from dynamiq.nodes.node import Node, NodeDependency
28+
from dynamiq.nodes.tools import ContextManagerTool
29+
from dynamiq.nodes.tools.context_manager import _apply_context_manager_tool_effect
2830
from dynamiq.nodes.types import Behavior, InferenceMode
2931
from dynamiq.prompts import Message, MessageRole, VisionMessage, VisionMessageTextContent
3032
from dynamiq.runnables import RunnableConfig
@@ -765,6 +767,19 @@ def validate_inference_mode(self):
765767

766768
return self
767769

770+
@model_validator(mode="after")
771+
def _ensure_context_manager_tool(self):
772+
"""Automatically add ContextManagerTool when summarization is enabled."""
773+
try:
774+
if self.summarization_config.enabled:
775+
has_context_tool = any(isinstance(t, ContextManagerTool) for t in self.tools)
776+
if not has_context_tool:
777+
# Add with a stable name for addressing from the agent
778+
self.tools.append(ContextManagerTool(llm=self.llm, name="context-manager"))
779+
except Exception as e:
780+
logger.error(f"Failed to ensure ContextManagerTool: {e}")
781+
return self
782+
768783
def _parse_thought(self, output: str) -> tuple[str | None, str | None]:
769784
"""Extracts thought from the output string."""
770785
thought_match = re.search(
@@ -981,7 +996,6 @@ def is_token_limit_exceeded(self) -> bool:
981996
def summarize_history(
982997
self,
983998
input_message,
984-
history_offset: int,
985999
summary_offset: int,
9861000
config: RunnableConfig | None = None,
9871001
**kwargs,
@@ -991,7 +1005,6 @@ def summarize_history(
9911005
9921006
Args:
9931007
input_message (Message | VisionMessage): User request message.
994-
history_offset (int): Offset to the first message in the conversation history within the prompt.
9951008
summary_offset (int): Offset to the position of the first message in prompt that was not summarized.
9961009
config (RunnableConfig | None): Configuration for the agent run.
9971010
**kwargs: Additional parameters for running the agent.
@@ -1003,7 +1016,7 @@ def summarize_history(
10031016
messages_history = "\nHistory to extract information from: \n"
10041017
summary_sections = []
10051018

1006-
offset = max(history_offset, summary_offset - self.summarization_config.context_history_length)
1019+
offset = max(self._history_offset, summary_offset - self.summarization_config.context_history_length)
10071020
for index, message in enumerate(self._prompt.messages[offset:]):
10081021
if message.role == MessageRole.USER:
10091022
if (index + offset >= summary_offset) and ("Observation:" in message.content):
@@ -1072,16 +1085,6 @@ def get_clone_attr_initializers(self) -> dict[str, Callable[[Node], Any]]:
10721085
)
10731086
return base
10741087

1075-
def update_system_message(self):
1076-
system_message = Message(
1077-
role=MessageRole.SYSTEM,
1078-
content=self.generate_prompt(
1079-
tools_name=self.tool_names, input_formats=self.generate_input_formats(self.tools)
1080-
),
1081-
static=True,
1082-
)
1083-
self._prompt.messages = [system_message, *self._prompt.messages[1:]]
1084-
10851088
def _run_agent(
10861089
self,
10871090
input_message: Message | VisionMessage,
@@ -1116,7 +1119,8 @@ def _run_agent(
11161119
else:
11171120
self._prompt.messages = [system_message, input_message]
11181121

1119-
summary_offset = history_offset = len(self._prompt.messages)
1122+
summary_offset = self._history_offset = len(self._prompt.messages)
1123+
11201124
stop_sequences = []
11211125
if self.inference_mode == InferenceMode.DEFAULT:
11221126
stop_sequences.extend(["Observation: ", "\nObservation:"])
@@ -1525,6 +1529,9 @@ def _run_agent(
15251529
except RecoverableAgentException as e:
15261530
tool_result = f"{type(e).__name__}: {e}"
15271531

1532+
if isinstance(tool, ContextManagerTool):
1533+
_apply_context_manager_tool_effect(self._prompt, tool_result, self._history_offset)
1534+
15281535
observation = f"\nObservation: {tool_result}\n"
15291536
self._prompt.messages.append(Message(role=MessageRole.USER, content=observation, static=True))
15301537

@@ -1608,9 +1615,7 @@ def _run_agent(
16081615

16091616
if self.summarization_config.enabled:
16101617
if self.is_token_limit_exceeded():
1611-
summary_offset = self.summarize_history(
1612-
input_message, history_offset, summary_offset, config=config, **kwargs
1613-
)
1618+
summary_offset = self.summarize_history(input_message, summary_offset, config=config, **kwargs)
16141619

16151620
if self.behaviour_on_max_loops == Behavior.RAISE:
16161621
error_message = (

dynamiq/nodes/agents/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
)
2424
from dynamiq.nodes.llms import BaseLLM
2525
from dynamiq.nodes.node import NodeDependency, ensure_config
26+
from dynamiq.nodes.tools import ContextManagerTool
2627
from dynamiq.nodes.tools.file_tools import FileListTool, FileReadTool, FileWriteTool
2728
from dynamiq.nodes.tools.mcp import MCPServer
2829
from dynamiq.nodes.tools.python import Python
@@ -385,6 +386,9 @@ class Agent(Node):
385386
_mcp_servers: list[MCPServer] = PrivateAttr(default_factory=list)
386387
_mcp_server_tool_ids: list[str] = PrivateAttr(default_factory=list)
387388
_tool_cache: dict[ToolCacheEntry, Any] = {}
389+
_history_offset: int = PrivateAttr(
390+
default=2, # Offset to the first message (default: 2 — system and initial user messages).
391+
)
388392

389393
model_config = ConfigDict(arbitrary_types_allowed=True)
390394
input_schema: ClassVar[type[AgentInputSchema]] = AgentInputSchema
@@ -929,6 +933,9 @@ def _run_tool(
929933
"""Runs a specific tool with the given input."""
930934
merged_input = tool_input.copy() if isinstance(tool_input, dict) else {"input": tool_input}
931935

936+
if isinstance(tool, ContextManagerTool):
937+
merged_input["history"] = self._prompt.messages[self._history_offset :]
938+
932939
raw_tool_params = kwargs.get("tool_params", ToolParams())
933940
tool_params = (
934941
ToolParams.model_validate(raw_tool_params) if isinstance(raw_tool_params, dict) else raw_tool_params

dynamiq/nodes/tools/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from .context_manager import ContextManagerTool
12
from .e2b_sandbox import E2BInterpreterTool
23
from .exa_search import ExaTool
34
from .file_tools import FileListTool, FileReadTool, FileWriteTool
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
from typing import Any, ClassVar, Literal
2+
3+
from pydantic import BaseModel, ConfigDict, Field
4+
5+
from dynamiq.connections.managers import ConnectionManager
6+
from dynamiq.nodes import ErrorHandling, Node, NodeGroup
7+
from dynamiq.nodes.llms import BaseLLM
8+
from dynamiq.nodes.node import NodeDependency, ensure_config
9+
from dynamiq.prompts import Message, Prompt
10+
from dynamiq.runnables import RunnableConfig, RunnableStatus
11+
from dynamiq.utils.logger import logger
12+
13+
CONTEXT_MANAGER_PROMPT_TEMPLATE = """
14+
You are a context compression assistant for an AI agent.
15+
16+
IMPORTANT: The agent will delete previous message history after this step. You MUST preserve all
17+
essential information needed to continue the task successfully.
18+
19+
Task:
20+
- Produce a detailed summary that replaces the prior message history.
21+
- Keep only what is necessary to proceed: reasoning overview, current subtasks, saved information and files, next steps,
22+
additional notes.
23+
- Omit chit-chat and non-essential details. Use clear, structured formatting.
24+
25+
History to compress:
26+
{history}
27+
28+
Output strictly in this structure:
29+
30+
## Reasoning overview of what is reasoning flow
31+
32+
## Current Subtasks
33+
- [ordered bullets: subtask -> status]
34+
35+
## Saved information and files
36+
- Inform about filesystem state and files that are saved (if available)
37+
38+
## Next Steps
39+
- [ordered bullets: next step -> status]
40+
41+
## Additional Notes:
42+
Any other information that is important to keep in mind and not lost.
43+
44+
"""
45+
46+
47+
class ContextManagerInputSchema(BaseModel):
48+
"""Input for ContextManagerTool.
49+
50+
- history: The recent conversation/messages to compress. Can be a single string or list of strings.
51+
- is_history_preserved: Preserve the history with summarization. If False, the history will not be preserved,
52+
only notes will.
53+
- notes: Verbatim content that must be preserved as-is (not processed by LLM) and prepended to the result.
54+
"""
55+
56+
history: list[Message] | None = Field(
57+
..., description="Conversation history to be summarized and used to replace prior messages"
58+
)
59+
60+
is_history_preserved: bool = Field(
61+
default=True,
62+
description="Preserve the history with summarization. If False, the history will not be preserved,"
63+
" only notes will.",
64+
)
65+
66+
notes: str | None = Field(
67+
default=None,
68+
description=(
69+
"Verbatim content to preserve as-is (e.g., IDs, filenames, critical details). "
70+
"This will be prepended unchanged to the output and NOT sent to the LLM."
71+
),
72+
)
73+
74+
75+
class ContextManagerTool(Node):
76+
"""
77+
A tool to prune previous message history and replace it with a concise summary.
78+
79+
IMPORTANT: Before calling this tool, ensure any necessary details are explicitly saved
80+
(e.g., files, pinned notes, or artifacts). This tool is intended to remove previous messages
81+
and keep only a structured summary to tighten context and focus on the active subtask.
82+
83+
Attributes:
84+
group (Literal[NodeGroup.TOOLS]): The group this node belongs to.
85+
name (str): The name of the tool.
86+
description (str): Tool description with usage warning.
87+
llm (BaseLLM): The LLM used to produce the compressed summary.
88+
error_handling (ErrorHandling): Configuration for error handling.
89+
prompt_template (str): Prompt template guiding the summarization.
90+
"""
91+
92+
group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS
93+
name: str = "Context Manager Tool"
94+
description: str = (
95+
"Cleans prior message history and replaces it with a concise, self-contained summary.\n\n"
96+
"WARNING: Before calling this tool, the agent must save any necessary information (f.e in FileStore),\n"
97+
"because previous messages will be removed and replaced by the summary. "
98+
"You can also provide notes to the tool to preserve important information without being processed by the LLM. "
99+
"Make sure to provide all necessary information for the agent to stay on track and"
100+
" not lose any important details. "
101+
"You can also disable history preservation, only notes will be preserved. "
102+
"Disable history when you don't care about the history and only want to preserve notes."
103+
)
104+
105+
llm: BaseLLM = Field(..., description="LLM used to produce the compressed context summary")
106+
error_handling: ErrorHandling = Field(default_factory=lambda: ErrorHandling(timeout_seconds=600))
107+
prompt_template: str = Field(
108+
default=CONTEXT_MANAGER_PROMPT_TEMPLATE, description="Prompt template for context compression"
109+
)
110+
111+
model_config = ConfigDict(arbitrary_types_allowed=True)
112+
input_schema: ClassVar[type[ContextManagerInputSchema]] = ContextManagerInputSchema
113+
114+
def init_components(self, connection_manager: ConnectionManager | None = None) -> None:
115+
"""Initialize components for the tool."""
116+
connection_manager = connection_manager or ConnectionManager()
117+
super().init_components(connection_manager)
118+
if self.llm.is_postponed_component_init:
119+
self.llm.init_components(connection_manager)
120+
121+
def reset_run_state(self):
122+
"""Reset the intermediate steps (run_depends) of the node."""
123+
self._run_depends = []
124+
125+
@property
126+
def to_dict_exclude_params(self) -> dict:
127+
"""Exclude LLM object during serialization."""
128+
return super().to_dict_exclude_params | {"llm": True}
129+
130+
def to_dict(self, **kwargs) -> dict:
131+
data = super().to_dict(**kwargs)
132+
data["llm"] = self.llm.to_dict(**kwargs)
133+
return data
134+
135+
def _build_prompt(self, history: list[Message]) -> str:
136+
formatted_history = "\n\n---\n\n".join([f"{m.role}: {str(m.content)}" for m in history])
137+
return self.prompt_template.format(history=formatted_history)
138+
139+
def _summarize_history(self, history: list[Message], config: RunnableConfig, **kwargs) -> str:
140+
prompt_content = self._build_prompt(history)
141+
142+
result = self.llm.run(
143+
input_data={},
144+
prompt=Prompt(messages=[Message(role="user", content=prompt_content, static=True)]),
145+
config=config,
146+
**(kwargs | {"parent_run_id": kwargs.get("run_id"), "run_depends": []}),
147+
)
148+
149+
self._run_depends = [NodeDependency(node=self.llm).to_dict(for_tracing=True)]
150+
151+
if result.status != RunnableStatus.SUCCESS:
152+
raise ValueError("LLM execution failed during context compression")
153+
154+
return result.output.get("content", "").strip()
155+
156+
def execute(
157+
self, input_data: ContextManagerInputSchema, config: RunnableConfig | None = None, **kwargs
158+
) -> dict[str, Any]:
159+
"""
160+
Summarize the provided history and emit an instruction to replace prior messages with the summary.
161+
162+
Returns:
163+
dict[str, Any]:
164+
- content: human-readable status message
165+
- summary: the compressed summary text
166+
- keep_last_n: advisory hint for UI/agent to keep last N messages
167+
- replacement_message: suggested system message to insert as new context root
168+
- instructions_for_agent: explicit instructions for applying the change
169+
"""
170+
config = ensure_config(config)
171+
self.reset_run_state()
172+
self.run_on_node_execute_run(config.callbacks, **kwargs)
173+
174+
summary = ""
175+
176+
if input_data.is_history_preserved:
177+
summary = self._summarize_history(input_data.history, config, **kwargs)
178+
summary = f"\nContext compressed; Summary:\n {summary}"
179+
180+
if input_data.notes:
181+
summary = f"Notes: {input_data.notes}\n\n{summary}"
182+
183+
logger.debug(f"Tool {self.name} - {self.id}: context compression completed, summary length: {len(summary)}")
184+
185+
return {"content": summary}
186+
187+
188+
def _apply_context_manager_tool_effect(prompt: Prompt, tool_result: Any, history_offset: int) -> None:
189+
"""Apply context cleaning effect after ContextManagerTool call.
190+
191+
Keeps default prefix (up to history_offset), replaces the rest with a copy of the last prefix message,
192+
and appends an observation with the tool_result summary.
193+
"""
194+
195+
try:
196+
new_messages = prompt.messages[:history_offset]
197+
if new_messages:
198+
new_messages.append(prompt.messages[-1].copy())
199+
prompt.messages = new_messages
200+
201+
except Exception as e:
202+
logger.error(f"Error applying context manager tool effect: {e}")

examples/components/core/dag/agent_file_storage.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ nodes:
3333
backend:
3434
type: dynamiq.storages.file.InMemoryFileStore
3535
agent_file_write_enabled: true
36-
max_loops: 5
36+
max_loops: 15
3737
inference_mode: XML
38+
summarization_config:
39+
enabled: true
3840

3941
flows:
4042
memory-agent-flow:

0 commit comments

Comments
 (0)