diff --git a/cookbook/91_tools/workspace_tools/README.md b/cookbook/91_tools/workspace_tools/README.md new file mode 100644 index 0000000000..7314da8249 --- /dev/null +++ b/cookbook/91_tools/workspace_tools/README.md @@ -0,0 +1,83 @@ +# Workspace + +A polished local-machine toolkit. Read / write / edit / move / delete / search / +shell, scoped to a `root` directory (paths that resolve outside it are rejected). +Destructive operations require human confirmation by default — AgentOS renders +these as approval cards in the run timeline; in a plain console you drive the +loop yourself. + +This is a path-scoping boundary, not a process sandbox — the agent can still +read env vars, hit the network via shell, etc. For untrusted code, run the +agent inside a real sandbox (container, VM, Daytona). + +## Quick reference + +```python +from agno.tools.workspace import Workspace + +# Default: reads auto-pass, writes/edits/moves/deletes/shell require confirmation. +tools = [Workspace(".")] + +# Explicit partition for clarity (recommended for the homepage demo style): +tools = [ + Workspace( + ".", + allowed=["read", "list", "search"], + confirm=["write", "edit", "delete", "shell"], + ) +] + +# Read-only: +tools = [Workspace(".", allowed=["read", "list", "search"])] + +# Defensive: also block writes-to-files-the-agent-hasn't-read: +tools = [Workspace(".", require_read_before_write=True)] +``` + +## Permission model + +`allowed` and `confirm` are mutually exclusive partitions of short +aliases. An alias in `allowed` runs silently, an alias in `confirm` +requires approval, an alias in neither isn't registered, and an alias in both +raises `ValueError`. The full alias mapping: + +| Alias | Registered tool name | What it does | +| -------- | -------------------- | --------------------------------------- | +| `read` | `read_file` | Read a file (line-numbered, optional range) | +| `list` | `list_files` | List a directory (optional glob, optional recursive with `max_depth`) | +| `search` | `search_content` | Recursive content grep | +| `write` | `write_file` | Create or overwrite a file (atomic) | +| `edit` | `edit_file` | Replace a substring (with `replace_all`)| +| `move` | `move_file` | Move or rename a file | +| `delete` | `delete_file` | Delete a file | +| `shell` | `run_command` | Run a shell command in `root` | + +The aliases keep snippets compact; the registered tool names stay descriptive +so the LLM tool spec is self-explanatory. + +## Notable behaviors + +- **`read_file` returns line-numbered output** (`cat -n` style). Numbers reflect + actual file lines, so the agent can chain into `edit_file` precisely. +- **`list_files` returns rich entries**: each is `{path, type, size}`. Use + `recursive=True` (default `max_depth=3`) to walk the tree. +- **`edit_file` defaults to unique-or-fail**, with `replace_all=True` for renames. +- **`write_file` is atomic** — writes to `.tmp`, then `os.replace`. +- **`run_command` strips ANSI codes** and tails to the last 100 lines (configurable). +- **`require_read_before_write=True`** (opt-in) blocks `write_file` / `edit_file` / + `move_file` / `delete_file` on existing files until the agent has read them + this session. Catches the "agent hallucinated the file's contents" bug. + +## Examples in this folder + +- `basic_usage.py` — agent reads a tmp file and writes a summary, with + confirmations disabled so the demo runs end-to-end. +- `with_confirmation.py` — same agent with the default safety on; you + approve each write at the console. + +## Running + +```bash +.venvs/demo/bin/python cookbook/91_tools/workspace_tools/basic_usage.py +.venvs/demo/bin/python cookbook/91_tools/workspace_tools/with_confirmation.py +``` diff --git a/cookbook/91_tools/workspace_tools/TEST_LOG.md b/cookbook/91_tools/workspace_tools/TEST_LOG.md new file mode 100644 index 0000000000..44b358973b --- /dev/null +++ b/cookbook/91_tools/workspace_tools/TEST_LOG.md @@ -0,0 +1,41 @@ +# Test Log — workspace_tools + +Tracks ad-hoc runs of the cookbooks in this folder. Update after each test +session. See the parent `cookbook/91_tools/TEST_LOG.md` for the format +convention. + +--- + +### basic_usage.py + +**Status:** PASS + +**Date:** 2026-04-25 + +**Description:** Agent reads a tmp README.md, writes NOTES.md with a 2-line +summary, then lists files. Uses `confirm=[]` so all tool calls auto-pass. + +**Result:** Agent called `read_file`, `write_file`, and `list_files` in the +expected order. `write_file` returned `Wrote 95 chars to NOTES.md`. Final +message confirmed both files exist. Tool call rendering in the run timeline +showed readable args (e.g. `path=README.md, start_line=1, end_line=200`). + +--- + +### with_confirmation.py + +**Status:** PASS + +**Date:** 2026-04-25 + +**Description:** Agent reads a tmp draft.md and edits a typo. Uses default +partitions, so `read_file` auto-passes but `edit_file` pauses for approval. +Smoke-tested with `yes y | python …` to drain confirmation prompts. + +**Result:** `read_file` ran silently. `edit_file` paused, surfaced +`tool_name=edit_file` and the proposed `tool_args` (path, old_str, new_str) +through the `active_requirements` API. After confirm, the edit applied: +`taht` → `that`. `requirement.confirm()` and `agent.continue_run(...)` +flow worked as expected. + +--- diff --git a/cookbook/91_tools/workspace_tools/basic_usage.py b/cookbook/91_tools/workspace_tools/basic_usage.py new file mode 100644 index 0000000000..7e3dd00a57 --- /dev/null +++ b/cookbook/91_tools/workspace_tools/basic_usage.py @@ -0,0 +1,46 @@ +""" +Workspace — basic usage +======================= + +A polished local-machine toolkit: read/write/edit/delete/search/shell, scoped to +a local directory (path-scoped to a `root`). Destructive operations require +confirmation by default — +see ``with_confirmation.py`` for the pause/resume flow. + +This example uses ``confirm=[]`` to disable confirmation so the agent +runs end-to-end without prompts. For production, leave the defaults on. +""" + +import tempfile +from pathlib import Path + +from agno.agent import Agent +from agno.models.openai import OpenAIResponses +from agno.tools.workspace import Workspace + +# Use a clean tmp directory so the demo doesn't touch real files. +workspace = Path(tempfile.mkdtemp(prefix="workspace_demo_")) +(workspace / "README.md").write_text( + "# Demo workspace\n\n" + "This file lives in a tmp directory.\n" + "The agent below will read it and produce a summary file.\n" +) + +agent = Agent( + model=OpenAIResponses(id="gpt-5.4"), + tools=[ + Workspace( + str(workspace), + allowed=Workspace.ALL_TOOLS, + confirm=[], + ) + ], + markdown=True, +) + +if __name__ == "__main__": + agent.print_response( + "Read README.md, then write a 2-line summary to NOTES.md. " + "After that, list the files to confirm both exist." + ) + print(f"\nWorkspace: {workspace}") diff --git a/cookbook/91_tools/workspace_tools/workspace_tools_with_confirmation.py b/cookbook/91_tools/workspace_tools/workspace_tools_with_confirmation.py new file mode 100644 index 0000000000..1aff6cfde2 --- /dev/null +++ b/cookbook/91_tools/workspace_tools/workspace_tools_with_confirmation.py @@ -0,0 +1,70 @@ +""" +Workspace — human-in-the-loop confirmation +========================================== + +This is the default safety story. Reads run silently; writes/edits/deletes/shell +pause the run and surface a confirmation request. AgentOS renders these as +approval cards in its run timeline. In a plain console, you handle the loop +yourself — that's what this example shows. + +Run this in a terminal so you can answer y/n at the prompts. +""" + +import tempfile +from pathlib import Path + +from agno.agent import Agent +from agno.db.sqlite import SqliteDb +from agno.models.openai import OpenAIResponses +from agno.tools.workspace import Workspace +from agno.utils import pprint +from rich.console import Console +from rich.prompt import Prompt + +console = Console() + +workspace = Path(tempfile.mkdtemp(prefix="workspace_hitl_")) +(workspace / "draft.md").write_text( + "# Draft\n\nThis draft has typos taht need fixing.\n" +) + +agent = Agent( + model=OpenAIResponses(id="gpt-5.4"), + tools=[ + Workspace( + str(workspace), + # Default partition: reads auto-pass, writes need approval. + ) + ], + db=SqliteDb(db_file="tmp/workspace_hitl.db"), + markdown=True, +) + + +def _drain_pauses(run_response): + """Approve every pending tool call until the run completes.""" + while run_response.is_paused: + for requirement in run_response.active_requirements: + if requirement.needs_confirmation: + te = requirement.tool_execution + console.print( + f"\n[yellow]Tool[/] [bold blue]{te.tool_name}[/] wants to run with args:\n {te.tool_args}" + ) + choice = Prompt.ask("Confirm?", choices=["y", "n"], default="y") + if choice.strip().lower() == "n": + requirement.reject() + else: + requirement.confirm() + run_response = agent.continue_run( + run_id=run_response.run_id, + requirements=run_response.requirements, + ) + return run_response + + +if __name__ == "__main__": + initial = agent.run("Read draft.md and fix the typo on the line about typos.") + final = _drain_pauses(initial) + pprint.pprint_run_response(final) + print(f"\nWorkspace: {workspace}") + print(f"draft.md after edit:\n{(workspace / 'draft.md').read_text()}") diff --git a/cookbook/99_docs/__init__.py b/cookbook/99_docs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/cookbook/99_docs/first-agent/__init__.py b/cookbook/99_docs/first-agent/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/cookbook/99_docs/first-agent/workbench.py b/cookbook/99_docs/first-agent/workbench.py new file mode 100644 index 0000000000..2602221c64 --- /dev/null +++ b/cookbook/99_docs/first-agent/workbench.py @@ -0,0 +1,18 @@ +from agno.agent import Agent +from agno.db.sqlite import SqliteDb +from agno.os import AgentOS +from agno.tools.workspace import Workspace + +workbench = Agent( + name="Workbench", + model="openai:gpt-5.4", + db=SqliteDb(db_file="agno.db"), # session storage + tools=[Workspace(".")], # read/write/edit/shell in this directory + enable_agentic_memory=True, # remembers across sessions + add_history_to_context=True, # include past runs + num_history_runs=3, # last 3 conversations +) + +# Serve via AgentOS → streaming, auth, session isolation, API endpoints +agent_os = AgentOS(agents=[workbench], tracing=True) +app = agent_os.get_app() diff --git a/cookbook/99_docs/index/__init__.py b/cookbook/99_docs/index/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/cookbook/99_docs/index/agno_agent.py b/cookbook/99_docs/index/agno_agent.py new file mode 100644 index 0000000000..0db2049aed --- /dev/null +++ b/cookbook/99_docs/index/agno_agent.py @@ -0,0 +1,23 @@ +from agno.agent import Agent +from agno.db.sqlite import SqliteDb +from agno.os import AgentOS +from agno.tools.workspace import Workspace + +agent = Agent( + name="Agno Agent", + model="openai:gpt-5.4", + tools=[ + Workspace( + root=".", + allowed=["read", "list", "search"], + confirm=["write", "edit", "delete", "shell"], + ) + ], +) + +agent_os = AgentOS( + agents=[agent], + tracing=True, + db=SqliteDb(db_file="tmp/agentos.db"), +) +app = agent_os.get_app() diff --git a/cookbook/99_docs/index/claude_agent.py b/cookbook/99_docs/index/claude_agent.py new file mode 100644 index 0000000000..31e0bff4cc --- /dev/null +++ b/cookbook/99_docs/index/claude_agent.py @@ -0,0 +1,17 @@ +from agno.agents.claude import ClaudeAgent +from agno.db.sqlite import SqliteDb +from agno.os import AgentOS + +agent = ClaudeAgent( + name="Claude Agent", + model="claude-opus-4-7", + allowed_tools=["Read", "Bash"], + permission_mode="acceptEdits", +) + +agent_os = AgentOS( + agents=[agent], + tracing=True, + db=SqliteDb(db_file="tmp/agentos.db"), +) +app = agent_os.get_app() diff --git a/cookbook/99_docs/index/dspy_agent.py b/cookbook/99_docs/index/dspy_agent.py new file mode 100644 index 0000000000..9c1c5e2451 --- /dev/null +++ b/cookbook/99_docs/index/dspy_agent.py @@ -0,0 +1,18 @@ +import dspy +from agno.agents.dspy import DSPyAgent +from agno.db.sqlite import SqliteDb +from agno.os import AgentOS + +dspy.configure(lm=dspy.LM("openai/gpt-5.4")) + +agent = DSPyAgent( + name="DSPy Assistant", + program=dspy.ChainOfThought("question -> answer"), +) + +agent_os = AgentOS( + agents=[agent], + tracing=True, + db=SqliteDb(db_file="tmp/agentos.db"), +) +app = agent_os.get_app() diff --git a/cookbook/99_docs/index/langgraph_agent.py b/cookbook/99_docs/index/langgraph_agent.py new file mode 100644 index 0000000000..e461776b3c --- /dev/null +++ b/cookbook/99_docs/index/langgraph_agent.py @@ -0,0 +1,27 @@ +from agno.agents.langgraph import LangGraphAgent +from agno.db.sqlite import SqliteDb +from agno.os import AgentOS +from langchain_openai import ChatOpenAI +from langgraph.graph import MessagesState, StateGraph + + +def chatbot(state: MessagesState): + return {"messages": [ChatOpenAI(model="gpt-5.4").invoke(state["messages"])]} + + +graph = StateGraph(MessagesState) +graph.add_node("chatbot", chatbot) +graph.set_entry_point("chatbot") +compiled = graph.compile() + +agent = LangGraphAgent( + name="LangGraph Chatbot", + graph=compiled, +) + +agent_os = AgentOS( + agents=[agent], + tracing=True, + db=SqliteDb(db_file="tmp/agentos.db"), +) +app = agent_os.get_app() diff --git a/libs/agno/agno/tools/workspace.py b/libs/agno/agno/tools/workspace.py new file mode 100644 index 0000000000..1c9e7b66c7 --- /dev/null +++ b/libs/agno/agno/tools/workspace.py @@ -0,0 +1,941 @@ +"""Workspace — read, write, edit, search, and run shell commands in a local working directory. + +Destructive operations (write/edit/move/delete/shell) require human confirmation by +default, which AgentOS renders as approval prompts in the run timeline. + +Quick start: + + from agno.agent import Agent + from agno.tools.workspace import Workspace + + agent = Agent( + model="openai:gpt-5.4", + tools=[ + Workspace( + ".", + allowed=["read", "list", "search"], + confirm=["write", "edit", "move", "delete", "shell"], + ) + ], + ) +""" + +import asyncio +import json +import os +import re +import subprocess +from fnmatch import fnmatch +from pathlib import Path +from typing import Dict, List, Optional, Set, Tuple, Union + +from agno.tools import Toolkit +from agno.utils.log import log_debug, log_error, log_info, log_warning + +TEXT_EXTENSIONS = { + # Markup and data + ".md", + ".txt", + ".csv", + ".json", + ".yaml", + ".yml", + ".xml", + ".html", + ".rst", + ".log", + # Config + ".toml", + ".cfg", + ".ini", + ".env", + ".editorconfig", + # Python + ".py", + ".pyi", + # JavaScript / TypeScript + ".js", + ".ts", + ".tsx", + ".jsx", + ".mjs", + ".cjs", + # Web + ".css", + ".scss", + ".less", + ".vue", + ".svelte", + # Systems languages + ".c", + ".h", + ".cpp", + ".hpp", + ".cc", + ".cxx", + ".rs", + ".go", + ".zig", + # JVM + ".java", + ".kt", + ".kts", + ".scala", + ".groovy", + ".gradle", + # .NET + ".cs", + ".fs", + ".csproj", + ".fsproj", + # Ruby / PHP / Perl + ".rb", + ".php", + ".pl", + ".pm", + # Functional / BEAM + ".ex", + ".exs", + ".erl", + ".hs", + ".ml", + ".mli", + # Other languages + ".swift", + ".m", + ".r", + ".R", + ".lua", + ".dart", + ".jl", + # Shell and scripting + ".sh", + ".bash", + ".zsh", + ".fish", + ".ps1", + # SQL and query + ".sql", + ".graphql", + ".gql", + # Infrastructure / IaC + ".tf", + ".hcl", + ".dockerfile", + # Serialization / schema + ".proto", + ".avsc", + ".thrift", + # Build / CI + ".makefile", + ".cmake", + ".bazel", + ".bzl", +} + +DEFAULT_EXCLUDE_PATTERNS = [ + # Environments and secrets + ".venv", + "venv", + ".env*", + "*.env", + # Version control + ".git", + ".hg", + ".svn", + # Python caches and build artifacts + "__pycache__", + ".mypy_cache", + ".ruff_cache", + ".pytest_cache", + ".tox", + ".nox", + ".ipynb_checkpoints", + "dist", + "build", + "*.egg-info", + # JavaScript and TypeScript + "node_modules", + ".next", + ".turbo", + ".nuxt", + ".svelte-kit", + ".docusaurus", + ".parcel-cache", + ".nyc_output", + "*.tsbuildinfo", + ".serverless", + # JVM (Java, Kotlin, Android, Gradle) + ".gradle", + ".kotlin", + "*.class", + # Dart and Flutter + ".dart_tool", + ".flutter-plugins", + ".flutter-plugins-dependencies", + # Swift and Xcode + ".build", + "xcuserdata", + "*.xcuserstate", + # Ruby + ".bundle", + "*.gem", + ".yardoc", + # Elixir + "_build", + ".elixir_ls", + # .NET / Visual Studio + ".vs", + # Infrastructure as Code + ".terraform", + "*.tfstate", + "*.tfstate.*", + ".terragrunt-cache", + # OS artifacts + ".DS_Store", +] + +# Strips ANSI CSI sequences (color codes, cursor moves) from terminal output. +_ANSI_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + + +def _strip_ansi(text: str) -> str: + """Remove ANSI escape sequences (color codes, cursor moves) from text.""" + return _ANSI_RE.sub("", text) + + +def _format_size(size: float) -> str: + """Format a file size in bytes to a human-readable string.""" + for unit in ("B", "KB", "MB"): + if size < 1024: + return f"{int(size)}{unit}" if unit == "B" else f"{size:.1f}{unit}" + size /= 1024 + return f"{size:.1f}GB" + + +def _extract_snippet(content: str, query: str, context_chars: int = 200) -> str: + """Extract a snippet of content around the first case-insensitive match of query.""" + lower_content = content.lower() + lower_query = query.lower() + idx = lower_content.find(lower_query) + if idx == -1: + return "" + start = max(0, idx - context_chars) + end = min(len(content), idx + len(query) + context_chars) + snippet = content[start:end] + if start > 0: + snippet = "..." + snippet + if end < len(content): + snippet = snippet + "..." + return snippet + + +def _format_with_line_numbers(text: str, start_line: int = 1) -> str: + """Prefix each line with its 1-indexed number, ``cat -n`` style. + + The numbers reflect the actual line in the source file: when reading a chunk + starting at line 50, the first returned line is numbered 50. + """ + lines = text.split("\n") + # Drop the trailing empty element produced by a terminal newline. + if lines and lines[-1] == "": + lines = lines[:-1] + return "\n".join(f"{i + start_line:6d}\t{line}" for i, line in enumerate(lines)) + + +class Workspace(Toolkit): + """Local-machine toolkit for read/write/edit/search/shell access to a directory tree. + + All file operations are scoped to ``root`` — paths that resolve outside it are + rejected with an error. Shell commands run with ``cwd=root``. This is a path-scoping + boundary, not a process sandbox: the agent can still read environment variables, + make network calls via shell tools, and use whatever the host process can use. For + untrusted code execution, run the agent inside a real sandbox (container, VM, or + a service like Daytona). + + Permission model — ``allowed`` and ``confirm`` are mutually exclusive + partitions of short aliases: + + - An alias in ``allowed`` runs silently. + - An alias in ``confirm`` requires user approval (Agno's HITL pause/resume). + - An alias in **neither** list is not registered with the toolkit — the LLM doesn't see it. + - An alias in **both** lists raises ``ValueError``. + + Aliases (the strings you put in the lists) are short for the snippet; the actual method + names registered with the LLM are descriptive so the tool spec is self-explanatory: + + | Alias | Registered tool name | What it does | + | -------- | -------------------- | --------------------------------------- | + | ``read`` | ``read_file`` | Read a file (line-numbered) | + | ``list`` | ``list_files`` | List a directory (recursive option) | + | ``search`` | ``search_content`` | Recursive content grep | + | ``write`` | ``write_file`` | Create or overwrite a file (atomic) | + | ``edit`` | ``edit_file`` | Replace a substring (with ``replace_all``)| + | ``move`` | ``move_file`` | Move or rename a file | + | ``delete`` | ``delete_file`` | Delete a file | + | ``shell`` | ``run_command`` | Run a shell command in ``root`` | + + Defaults: + + - When both lists are ``None``: reads (``read``, ``list``, ``search``) auto-pass, + writes (``write``, ``edit``, ``move``, ``delete``, ``shell``) require confirmation. + This is the safe-by-default surface meant for the homepage demo. + - When only one is set: the other defaults to ``[]`` — you've taken control, + and the surface is exactly what you specified. + + Listing results from ``list_files`` and ``search_content`` skip common noise directories + (``.venv``, ``.git``, ``__pycache__``, ``node_modules``, etc.) by default. Pass + ``exclude_patterns=[]`` to disable, or ``exclude_patterns=[...]`` to override. + + Optional ``require_read_before_write=True`` blocks ``write_file`` / ``edit_file`` / + ``move_file`` / ``delete_file`` on existing files until the agent has read them in + this session. Catches the "agent hallucinated the file's contents" bug class. + """ + + READ_TOOLS: List[str] = ["read", "list", "search"] + WRITE_TOOLS: List[str] = ["write", "edit", "move", "delete", "shell"] + ALL_TOOLS: List[str] = READ_TOOLS + WRITE_TOOLS + + # Alias → registered tool name (the descriptive name the LLM sees in the tool spec). + _ALIASES: Dict[str, str] = { + "read": "read_file", + "list": "list_files", + "search": "search_content", + "write": "write_file", + "edit": "edit_file", + "move": "move_file", + "delete": "delete_file", + "shell": "run_command", + } + + def __init__( + self, + root: Optional[Union[str, Path]] = None, + allowed: Optional[List[str]] = None, + confirm: Optional[List[str]] = None, + require_read_before_write: bool = False, + max_file_lines: int = 100_000, + max_file_length: int = 10_000_000, + exclude_patterns: Optional[List[str]] = None, + **kwargs, + ): + # Resolve root to an absolute path once — never re-read cwd later (reload-safe). + if root is None: + self.root: Path = Path.cwd().resolve() + else: + self.root = Path(root).resolve() + + self.max_file_lines = max_file_lines + self.max_file_length = max_file_length + self.require_read_before_write = require_read_before_write + self.exclude_patterns: List[str] = ( + exclude_patterns if exclude_patterns is not None else list(DEFAULT_EXCLUDE_PATTERNS) + ) + # Tracks which paths have been read this session — used by require_read_before_write. + # Resolved absolute paths so move/rename interactions are unambiguous. + self._read_paths: Set[Path] = set() + + resolved_allowed_aliases, resolved_confirm_aliases = self._resolve_partitions(allowed, confirm) + + # Translate aliases → method names. The LLM sees the descriptive names. + resolved_allowed_methods = [self._ALIASES[a] for a in resolved_allowed_aliases] + resolved_confirm_methods = [self._ALIASES[a] for a in resolved_confirm_aliases] + + registered = resolved_allowed_methods + resolved_confirm_methods + sync_tools = [getattr(self, name) for name in registered] + async_tools = [(getattr(self, "a" + name), name) for name in registered] + + super().__init__( + name="workspace", + tools=sync_tools, + async_tools=async_tools, + requires_confirmation_tools=resolved_confirm_methods, + instructions=( + "Always read_file before editing — the line-numbered output gives you " + "the exact substring to pass to edit_file's old_str parameter. " + "Do not guess file contents or pass line numbers to edit_file." + ), + add_instructions=True, + **kwargs, + ) + + # Surface-drift guard: every alias must resolve to both a sync method and async + # sibling on the class. Catches contributor bugs (added a method but forgot to + # add the alias, or vice versa). + for alias, method_name in self._ALIASES.items(): + assert callable(getattr(self, method_name, None)), ( + f"Workspace missing sync method '{method_name}' for alias '{alias}'" + ) + assert callable(getattr(self, "a" + method_name, None)), ( + f"Workspace missing async method 'a{method_name}' for alias '{alias}'" + ) + + @classmethod + def _resolve_partitions( + cls, + allowed: Optional[List[str]], + confirm: Optional[List[str]], + ) -> Tuple[List[str], List[str]]: + """Resolve allowed / confirm alias lists into mutually-exclusive lists. + + See the class docstring for the resolution rules. Both lists hold *aliases* + (e.g. ``"read"``, not ``"read_file"``). + """ + # Reject obvious misuse early — these are short kwarg names so users may + # reach for confirm=True or confirm="write" by mistake. + for arg_name, arg_value in (("allowed", allowed), ("confirm", confirm)): + if arg_value is not None and not isinstance(arg_value, list): + raise TypeError( + f"`{arg_name}` must be a list of aliases, got {type(arg_value).__name__}: " + f"{arg_value!r}. Valid aliases: {cls.ALL_TOOLS}" + ) + + # Both None → safe defaults. + if allowed is None and confirm is None: + return list(cls.READ_TOOLS), list(cls.WRITE_TOOLS) + + # If one is set, the other defaults to [] — explicit user control means no + # surprise mixing. + if allowed is None: + allowed = [] + if confirm is None: + confirm = [] + + valid = set(cls.ALL_TOOLS) + unknown_allowed = set(allowed) - valid + if unknown_allowed: + raise ValueError( + f"Unknown alias(es) in `allowed`: {sorted(unknown_allowed)}. Valid aliases: {cls.ALL_TOOLS}" + ) + unknown_confirm = set(confirm) - valid + if unknown_confirm: + raise ValueError( + f"Unknown alias(es) in `confirm`: {sorted(unknown_confirm)}. Valid aliases: {cls.ALL_TOOLS}" + ) + overlap = set(allowed) & set(confirm) + if overlap: + raise ValueError( + f"Alias(es) appear in both `allowed` and `confirm`: {sorted(overlap)}. " + "They must be mutually exclusive — items in `allowed` auto-pass; " + "items in `confirm` require approval." + ) + return list(allowed), list(confirm) + + def _is_excluded(self, path: Path) -> bool: + """Return True if any component of ``path`` (relative to ``root``) matches an exclude pattern.""" + if not self.exclude_patterns: + return False + try: + rel = path.relative_to(self.root) + except ValueError: + return False + return any(fnmatch(part, pattern) for part in rel.parts for pattern in self.exclude_patterns) + + def _check_read_before_write(self, file_path: Path, op: str) -> Optional[str]: + """If require_read_before_write is on, verify the file was read this session. + + Returns an error string if the check fails, or ``None`` if it passes (or + the file is being newly created, which doesn't need a prior read). + """ + if not self.require_read_before_write: + return None + if not file_path.exists(): + # Creating a new file is fine without a prior read. + return None + if file_path in self._read_paths: + return None + return ( + f"Error: require_read_before_write is enabled and {file_path.name} hasn't " + f"been read this session. Call read_file first to confirm contents before " + f"the {op}." + ) + + # ------------------------------------------------------------------ + # Read operations (auto-pass by default) + # ------------------------------------------------------------------ + + def read_file( + self, + path: str, + start_line: Optional[int] = None, + end_line: Optional[int] = None, + encoding: str = "utf-8", + ) -> str: + """Read a file from the workspace, returning ``cat -n`` style line-numbered output. + + Each line is prefixed with its 1-indexed line number followed by a tab. The + numbers reflect the actual line in the file — if you read lines 50-60, the + first returned line is numbered 50. Pass these line numbers back to ``edit_file`` + when you want to make a targeted change. + + :param path: File path relative to the workspace root. + :param start_line: Optional 1-indexed first line to return. If omitted with + end_line, returns the entire file (subject to size limits). + :param end_line: Optional 1-indexed last line to return (inclusive). + :param encoding: Text encoding (default utf-8). + :return: Line-numbered file contents (or selected range), or an error message + starting with "Error". + """ + try: + log_debug(f"read_file: {path}") + safe, file_path = self._check_path(path, self.root) + if not safe: + log_error(f"Path escapes workspace: {path}") + return "Error: path escapes workspace root" + if not file_path.is_file(): + return f"Error: file not found: {path}" + contents = file_path.read_text(encoding=encoding) + self._read_paths.add(file_path) + if start_line is None and end_line is None: + if len(contents) > self.max_file_length: + return ( + f"Error: file too long ({len(contents)} chars > {self.max_file_length}). " + "Use start_line/end_line to read a chunk, " + "or use search_content to find specific text first." + ) + line_count = contents.count("\n") + 1 + if line_count > self.max_file_lines: + return ( + f"Error: file too long ({line_count} lines > {self.max_file_lines}). " + "Use start_line/end_line to read a chunk, " + "or use search_content to find specific text first." + ) + return _format_with_line_numbers(contents, start_line=1) + lines = contents.split("\n") + start = start_line if start_line is not None else 1 + end = end_line if end_line is not None else len(lines) + start_idx = max(0, start - 1) + end_idx = min(len(lines), end) + chunk = "\n".join(lines[start_idx:end_idx]) + return _format_with_line_numbers(chunk, start_line=start) + except Exception as e: + log_error(f"read_file failed: {e}") + return f"Error reading file: {e}" + + def list_files( + self, + directory: str = ".", + pattern: Optional[str] = None, + recursive: bool = False, + max_depth: int = 3, + ) -> str: + """List entries in a workspace directory. + + Each entry is returned as ``{"path", "type", "size"}`` so you can decide which + files to read without a second call. ``type`` is ``"file"`` or ``"dir"``; + ``size`` is a human-readable string for files and ``null`` for directories. + + For tree-style exploration of a project use ``recursive=True`` (defaults to + depth 3). Default-excluded directories (``.venv``, ``.git``, ``node_modules``, + etc.) are always pruned. + + :param directory: Subdirectory relative to the workspace root (default "."). + :param pattern: Optional glob pattern to filter by (e.g. ``"*.py"``). When + ``recursive=False`` patterns can include ``**`` for cross-directory globs. + When ``recursive=True`` the pattern is matched against each entry name. + :param recursive: If True, walk the directory tree up to ``max_depth`` levels deep. + :param max_depth: Depth limit when ``recursive=True`` (default 3). + :return: JSON string with keys ``directory``, ``pattern``, ``recursive``, and + ``files`` (list of entry objects). + """ + try: + safe, d = self._check_path(directory, self.root) + if not safe: + return "Error: directory escapes workspace root" + if not d.is_dir(): + return f"Error: not a directory: {directory}" + + entries: List[Path] = [] + if recursive: + # max_depth controls how many levels deep we return entries. + # At the boundary (rel_depth == max_depth) we still enumerate + # files and dirs at that level but stop recursing further. + base_depth = len(d.parts) + for dirpath, dirnames, filenames in os.walk(d): + rel_depth = len(Path(dirpath).parts) - base_depth + if rel_depth >= max_depth: + # Stop recursion but keep dir names for enumeration below. + visible_dirs = [name for name in dirnames if not self._is_excluded(Path(dirpath) / name)] + dirnames[:] = [] + else: + dirnames[:] = [name for name in dirnames if not self._is_excluded(Path(dirpath) / name)] + visible_dirs = list(dirnames) + for name in filenames + visible_dirs: + full = Path(dirpath) / name + if self._is_excluded(full): + continue + if pattern and not fnmatch(name, pattern): + continue + entries.append(full) + elif pattern: + entries = [p for p in d.glob(pattern) if not self._is_excluded(p)] + else: + entries = [p for p in d.iterdir() if not self._is_excluded(p)] + + files = [] + for p in sorted(entries): + try: + is_dir = p.is_dir() + size = None if is_dir else _format_size(p.stat().st_size) + except OSError: + # Broken symlink or vanished file — skip silently. + continue + files.append( + { + "path": str(p.relative_to(self.root)), + "type": "dir" if is_dir else "file", + "size": size, + } + ) + + return json.dumps( + { + "directory": directory, + "pattern": pattern, + "recursive": recursive, + "files": files, + }, + indent=2, + ) + except Exception as e: + log_error(f"list_files failed: {e}") + return f"Error listing files: {e}" + + def search_content(self, query: str, directory: str = ".", limit: int = 10) -> str: + """Recursive case-insensitive content grep across text files in the workspace. + + Only text files (by extension) under 500KB are searched. Returns the first ``limit`` + matching files with a snippet around the first match in each. + + :param query: Substring to search for (case-insensitive). + :param directory: Subdirectory to scope the search to (default "."). + :param limit: Maximum number of matching files to return (default 10). + :return: JSON string with keys ``query``, ``matches_found``, and ``files`` (a list of + ``{"file", "size", "snippet"}`` objects). + """ + try: + if not query or not query.strip(): + return "Error: query cannot be empty" + safe, search_dir = self._check_path(directory, self.root) + if not safe: + return "Error: directory escapes workspace root" + if not search_dir.is_dir(): + return f"Error: not a directory: {directory}" + + lower_query = query.lower() + matches: List[dict] = [] + max_file_size = 500 * 1024 + walk_done = False + + for dirpath, dirnames, filenames in os.walk(search_dir): + if walk_done: + break + dirnames[:] = [name for name in dirnames if not self._is_excluded(Path(dirpath) / name)] + for filename in filenames: + if len(matches) >= limit: + walk_done = True + break + file_path = Path(dirpath) / filename + if self._is_excluded(file_path): + continue + if file_path.suffix.lower() not in TEXT_EXTENSIONS: + continue + try: + if file_path.stat().st_size > max_file_size: + continue + except OSError: + continue + try: + content = file_path.read_text(encoding="utf-8", errors="ignore") + except Exception: + continue + if lower_query in content.lower(): + rel_path = str(file_path.relative_to(self.root)) + matches.append( + { + "file": rel_path, + "size": _format_size(file_path.stat().st_size), + "snippet": _extract_snippet(content, query), + } + ) + return json.dumps({"query": query, "matches_found": len(matches), "files": matches}, indent=2) + except Exception as e: + log_error(f"search_content failed: {e}") + return f"Error searching content: {e}" + + # ------------------------------------------------------------------ + # Write operations (require confirmation by default) + # ------------------------------------------------------------------ + + def write_file(self, path: str, content: str, overwrite: bool = True, encoding: str = "utf-8") -> str: + """Write a file to the workspace, creating parent directories if needed. + + Writes are atomic: content is written to a sibling ``.tmp`` file and renamed + into place, so a crash mid-write can't leave a partially-written target. + + :param path: File path relative to the workspace root. + :param content: Text content to write. + :param overwrite: If False, fail when the file already exists (default True). + :param encoding: Text encoding (default utf-8). + :return: Success message including the path and byte count, or an error message. + """ + try: + safe, file_path = self._check_path(path, self.root) + if not safe: + log_error(f"Path escapes workspace: {path}") + return "Error: path escapes workspace root" + if file_path.exists() and not overwrite: + return f"Error: file exists and overwrite=False: {path}" + check_err = self._check_read_before_write(file_path, op="write") + if check_err: + return check_err + if not file_path.parent.exists(): + file_path.parent.mkdir(parents=True, exist_ok=True) + + tmp_path = file_path.with_name(file_path.name + ".tmp") + try: + tmp_path.write_text(content, encoding=encoding) + os.replace(tmp_path, file_path) + finally: + if tmp_path.exists(): + tmp_path.unlink() + # Treat write as a read for require_read_before_write — the agent now knows + # the contents, so subsequent edits to the same path are fair game. + self._read_paths.add(file_path) + return f"Wrote {len(content)} chars to {path}" + except Exception as e: + log_error(f"write_file failed: {e}") + return f"Error writing file: {e}" + + def edit_file( + self, + path: str, + old_str: str, + new_str: str, + replace_all: bool = False, + encoding: str = "utf-8", + ) -> str: + """Edit a file by replacing ``old_str`` with ``new_str``. + + By default ``old_str`` must match exactly once — fails if it appears zero or + more than one times. Pass ``replace_all=True`` to replace every occurrence + (useful for renames across a file). + + :param path: File path relative to the workspace root. + :param old_str: Exact substring to replace. + :param new_str: Replacement substring. + :param replace_all: If True, replace every occurrence (default False). + :param encoding: Text encoding (default utf-8). + :return: Success message with the count, or an error if no matches (or, when + ``replace_all=False``, multiple matches). + """ + try: + if not old_str: + return "Error: old_str cannot be empty" + safe, file_path = self._check_path(path, self.root) + if not safe: + return "Error: path escapes workspace root" + if not file_path.is_file(): + return f"Error: file not found: {path}" + check_err = self._check_read_before_write(file_path, op="edit") + if check_err: + return check_err + contents = file_path.read_text(encoding=encoding) + count = contents.count(old_str) + if count == 0: + return f"Error: old_str not found in {path}" + if count > 1 and not replace_all: + return ( + f"Error: old_str matches {count} times in {path}; " + "provide a more unique snippet or pass replace_all=True" + ) + new_contents = contents.replace(old_str, new_str) if replace_all else contents.replace(old_str, new_str, 1) + tmp_path = file_path.with_name(file_path.name + ".tmp") + try: + tmp_path.write_text(new_contents, encoding=encoding) + os.replace(tmp_path, file_path) + finally: + if tmp_path.exists(): + tmp_path.unlink() + return f"Edited {path}: replaced {count if replace_all else 1} occurrence{'s' if (count if replace_all else 1) != 1 else ''}" + except Exception as e: + log_error(f"edit_file failed: {e}") + return f"Error editing file: {e}" + + def move_file(self, src: str, dst: str, overwrite: bool = False) -> str: + """Move or rename a file within the workspace. + + Both ``src`` and ``dst`` must resolve inside the workspace root. By default + refuses to clobber an existing destination — pass ``overwrite=True`` to + force. + + :param src: Source file path relative to the workspace root. + :param dst: Destination path relative to the workspace root. + :param overwrite: If True, replace ``dst`` if it already exists (default False). + :return: Success message, or an error if either path escapes, src is missing, + or dst exists and overwrite is False. + """ + try: + safe_src, src_path = self._check_path(src, self.root) + if not safe_src: + return "Error: src escapes workspace root" + safe_dst, dst_path = self._check_path(dst, self.root) + if not safe_dst: + return "Error: dst escapes workspace root" + if not src_path.exists(): + return f"Error: src not found: {src}" + if src_path.is_dir(): + return f"Error: src is a directory, not a file: {src}" + if dst_path.exists() and not overwrite: + return f"Error: dst exists and overwrite=False: {dst}" + check_err = self._check_read_before_write(src_path, op="move") + if check_err: + return check_err + if not dst_path.parent.exists(): + dst_path.parent.mkdir(parents=True, exist_ok=True) + os.replace(src_path, dst_path) if overwrite else src_path.rename(dst_path) + # Carry the "has been read" status with the file. + if src_path in self._read_paths: + self._read_paths.discard(src_path) + self._read_paths.add(dst_path) + return f"Moved {src} -> {dst}" + except Exception as e: + log_error(f"move_file failed: {e}") + return f"Error moving file: {e}" + + def delete_file(self, path: str) -> str: + """Delete a file from the workspace. Refuses to delete directories. + + :param path: File path relative to the workspace root. + :return: Success message, or an error if the path doesn't exist or is a directory. + """ + try: + safe, file_path = self._check_path(path, self.root) + if not safe: + return "Error: path escapes workspace root" + if not file_path.exists(): + return f"Error: file not found: {path}" + if file_path.is_dir(): + return f"Error: path is a directory, not a file: {path}" + check_err = self._check_read_before_write(file_path, op="delete") + if check_err: + return check_err + file_path.unlink() + self._read_paths.discard(file_path) + return f"Deleted {path}" + except Exception as e: + log_error(f"delete_file failed: {e}") + return f"Error deleting file: {e}" + + def run_command(self, args: List[str], tail: int = 100, timeout: int = 120) -> str: + """Run a shell command in the workspace root and return its output. + + Args is a list of strings (e.g. ``["ls", "-la"]``) — the command is NOT + invoked through a shell, so quoting/expansion are not interpreted. To use + shell features, pass ``["bash", "-c", "your-command-here"]``. + + ANSI escape sequences (color codes, cursor moves) are stripped from output + before truncation, so terminal-formatted output doesn't waste tokens. + + :param args: Command and arguments as a list of strings. + :param tail: Maximum number of trailing lines of stdout (or stderr on error) to return. + :param timeout: Maximum seconds to wait before killing the process (default 120). + :return: Tailed stdout on success, or an error message including stderr on failure. + """ + try: + log_info(f"run_command: {args}") + result = subprocess.run( + args, + capture_output=True, + text=True, + cwd=str(self.root), + timeout=timeout, + ) + if result.returncode != 0: + err = "\n".join(_strip_ansi(result.stderr).splitlines()[-tail:]) + return f"Error (exit {result.returncode}): {err}" + return "\n".join(_strip_ansi(result.stdout).splitlines()[-tail:]) + except subprocess.TimeoutExpired: + log_warning(f"run_command timed out after {timeout}s: {args}") + return f"Error: command timed out after {timeout} seconds" + except Exception as e: + log_warning(f"run_command failed: {e}") + return f"Error running command: {e}" + + # ------------------------------------------------------------------ + # Async siblings + # ------------------------------------------------------------------ + + async def aread_file( + self, + path: str, + start_line: Optional[int] = None, + end_line: Optional[int] = None, + encoding: str = "utf-8", + ) -> str: + """Async variant of ``read_file``.""" + return await asyncio.to_thread(self.read_file, path, start_line, end_line, encoding) + + async def alist_files( + self, + directory: str = ".", + pattern: Optional[str] = None, + recursive: bool = False, + max_depth: int = 3, + ) -> str: + """Async variant of ``list_files``.""" + return await asyncio.to_thread(self.list_files, directory, pattern, recursive, max_depth) + + async def asearch_content(self, query: str, directory: str = ".", limit: int = 10) -> str: + """Async variant of ``search_content``.""" + return await asyncio.to_thread(self.search_content, query, directory, limit) + + async def awrite_file(self, path: str, content: str, overwrite: bool = True, encoding: str = "utf-8") -> str: + """Async variant of ``write_file``.""" + return await asyncio.to_thread(self.write_file, path, content, overwrite, encoding) + + async def aedit_file( + self, + path: str, + old_str: str, + new_str: str, + replace_all: bool = False, + encoding: str = "utf-8", + ) -> str: + """Async variant of ``edit_file``.""" + return await asyncio.to_thread(self.edit_file, path, old_str, new_str, replace_all, encoding) + + async def amove_file(self, src: str, dst: str, overwrite: bool = False) -> str: + """Async variant of ``move_file``.""" + return await asyncio.to_thread(self.move_file, src, dst, overwrite) + + async def adelete_file(self, path: str) -> str: + """Async variant of ``delete_file``.""" + return await asyncio.to_thread(self.delete_file, path) + + async def arun_command(self, args: List[str], tail: int = 100, timeout: int = 120) -> str: + """Async variant of ``run_command`` using ``asyncio.create_subprocess_exec``.""" + try: + log_info(f"arun_command: {args}") + proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(self.root), + ) + try: + stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + log_warning(f"arun_command timed out after {timeout}s: {args}") + return f"Error: command timed out after {timeout} seconds" + stdout = stdout_b.decode("utf-8", errors="replace") if stdout_b else "" + stderr = stderr_b.decode("utf-8", errors="replace") if stderr_b else "" + if proc.returncode != 0: + err = "\n".join(_strip_ansi(stderr).splitlines()[-tail:]) + return f"Error (exit {proc.returncode}): {err}" + return "\n".join(_strip_ansi(stdout).splitlines()[-tail:]) + except Exception as e: + log_warning(f"arun_command failed: {e}") + return f"Error running command: {e}" diff --git a/libs/agno/pyproject.toml b/libs/agno/pyproject.toml index e296f87577..5980f2e5be 100644 --- a/libs/agno/pyproject.toml +++ b/libs/agno/pyproject.toml @@ -390,6 +390,10 @@ demo = [ "sqlalchemy", "yfinance", "youtube-transcript-api", + "claude-agent-sdk", + "langgraph", + "langchain-openai", + "dspy", ] [project.urls] diff --git a/libs/agno/tests/unit/tools/test_workspace.py b/libs/agno/tests/unit/tools/test_workspace.py new file mode 100644 index 0000000000..12862e1e4f --- /dev/null +++ b/libs/agno/tests/unit/tools/test_workspace.py @@ -0,0 +1,765 @@ +import asyncio +import json +import tempfile +from pathlib import Path + +import pytest + +from agno.tools.workspace import Workspace + +# All registered tool names (the descriptive names the LLM sees, after alias translation). +ALL_METHODS = [ + "read_file", + "list_files", + "search_content", + "write_file", + "edit_file", + "move_file", + "delete_file", + "run_command", +] +READ_METHODS = ["read_file", "list_files", "search_content"] +WRITE_METHODS = ["write_file", "edit_file", "move_file", "delete_file", "run_command"] + + +# ------------------------------------------------------------------ +# Constructor: partition resolution & validation +# ------------------------------------------------------------------ + + +def test_default_partitions_when_both_none(): + """Both None → reads in allowed (auto-pass), writes in confirm.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + sync_names = list(ws.functions.keys()) + async_names = list(ws.async_functions.keys()) + + # Every method registered under its descriptive name (sync + async). + assert sorted(sync_names) == sorted(ALL_METHODS) + assert sorted(async_names) == sorted(ALL_METHODS) + + for name in WRITE_METHODS: + assert ws.functions[name].requires_confirmation is True + assert ws.async_functions[name].requires_confirmation is True + for name in READ_METHODS: + assert ws.functions[name].requires_confirmation is False + assert ws.async_functions[name].requires_confirmation is False + + +def test_only_allowed_set_makes_confirm_default_empty(): + """allowed set, confirm=None → confirm defaults to [], not WRITE_TOOLS.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, allowed=["read"]) + assert list(ws.functions.keys()) == ["read_file"] + assert ws.functions["read_file"].requires_confirmation is False + + +def test_only_confirm_set_makes_allowed_default_empty(): + """confirm set, allowed=None → allowed defaults to [], not READ_TOOLS.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, confirm=["write"]) + assert list(ws.functions.keys()) == ["write_file"] + assert ws.functions["write_file"].requires_confirmation is True + + +def test_unknown_alias_in_allowed_raises(): + with pytest.raises(ValueError, match="Unknown alias"): + Workspace(".", allowed=["read", "not_a_tool"]) + + +def test_unknown_alias_in_confirm_raises(): + with pytest.raises(ValueError, match="Unknown alias"): + Workspace(".", confirm=["bogus"]) + + +def test_full_method_name_in_alias_list_raises(): + """Aliases are short; passing a full method name like 'read_file' should fail loud.""" + with pytest.raises(ValueError, match="Unknown alias"): + Workspace(".", allowed=["read_file"]) + + +def test_overlap_between_allowed_and_confirm_raises(): + with pytest.raises(ValueError, match="mutually exclusive"): + Workspace( + ".", + allowed=["read", "write"], + confirm=["write"], + ) + + +def test_empty_lists_in_both_registers_nothing(): + """Both empty lists → no methods registered (useful for tests).""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, allowed=[], confirm=[]) + assert list(ws.functions.keys()) == [] + assert list(ws.async_functions.keys()) == [] + + +def test_confirm_as_bool_raises_typeerror(): + """confirm=True is the natural typo — fail loud, not with a confusing alias error.""" + with pytest.raises(TypeError, match="`confirm` must be a list"): + Workspace(".", confirm=True) + + +def test_allowed_as_string_raises_typeerror(): + """allowed='read' (not a list) → TypeError, not 4 'unknown alias' errors for r, e, a, d.""" + with pytest.raises(TypeError, match="`allowed` must be a list"): + Workspace(".", allowed="read") + + +def test_custom_partition_works(): + """User-defined partition with both lists set.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace( + tmp_dir, + allowed=["read"], + confirm=["delete"], + ) + assert sorted(ws.functions.keys()) == ["delete_file", "read_file"] + assert ws.functions["read_file"].requires_confirmation is False + assert ws.functions["delete_file"].requires_confirmation is True + + +def test_root_kwarg_is_optional_positional(): + """Workspace('.') and Workspace(root='.') both work.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws_pos = Workspace(tmp_dir) + ws_kw = Workspace(root=tmp_dir) + assert ws_pos.root == ws_kw.root == Path(tmp_dir).resolve() + + +def test_root_defaults_to_cwd(): + ws = Workspace() + assert ws.root == Path.cwd().resolve() + + +# ------------------------------------------------------------------ +# Path escape protection (paths must resolve under root) +# ------------------------------------------------------------------ + + +def test_path_escape_blocked_on_read(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + result = ws.read_file("../../../etc/passwd") + assert result.startswith("Error") + + +def test_path_escape_blocked_on_write(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + result = ws.write_file("../escaped.txt", "boom") + assert result.startswith("Error") + # File outside the workspace root should not have been created. + assert not (Path(tmp_dir).parent / "escaped.txt").exists() + + +def test_path_escape_blocked_on_delete(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + # Create a sibling file outside root. + outside = Path(tmp_dir).parent / "outside_test_file.txt" + outside.write_text("keep me") + try: + result = ws.delete_file("../outside_test_file.txt") + assert result.startswith("Error") + assert outside.exists() + finally: + if outside.exists(): + outside.unlink() + + +# ------------------------------------------------------------------ +# read_file (line-numbered output) +# ------------------------------------------------------------------ + + +def test_read_file_returns_line_numbered_output(): + """read_file output is cat -n style (`{6d}\\t{line}`).""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "hello.txt").write_text("alpha\nbeta\ngamma\n") + out = ws.read_file("hello.txt") + assert out == " 1\talpha\n 2\tbeta\n 3\tgamma" + + +def test_read_file_chunked_uses_actual_file_line_numbers(): + """Reading a chunk starting at line 2 should number it 2, not 1.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "lines.txt").write_text("a\nb\nc\nd\ne\n") + out = ws.read_file("lines.txt", start_line=2, end_line=4) + assert out == " 2\tb\n 3\tc\n 4\td" + + +def test_read_file_missing(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + result = ws.read_file("does_not_exist.txt") + assert result.startswith("Error: file not found") + + +def test_read_file_too_long_by_chars_hint_includes_search(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, max_file_length=10) + (Path(tmp_dir) / "big.txt").write_text("a" * 100) + result = ws.read_file("big.txt") + assert "too long" in result + assert "search_content" in result + # Chunked read still works (and is line-numbered). + out = ws.read_file("big.txt", start_line=1, end_line=1) + assert out == " 1\t" + "a" * 100 + + +def test_read_file_too_long_by_lines_hint_includes_search(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, max_file_lines=3) + (Path(tmp_dir) / "many.txt").write_text("\n".join(str(i) for i in range(10))) + result = ws.read_file("many.txt") + assert "too long" in result + assert "search_content" in result + + +# ------------------------------------------------------------------ +# list_files (richer entries + recursive) +# ------------------------------------------------------------------ + + +def test_list_files_returns_size_and_type(): + """Each entry is {path, type, size}; size is human-readable for files, null for dirs.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "a.txt").write_text("hello") # 5 bytes + (Path(tmp_dir) / "subdir").mkdir() + result = json.loads(ws.list_files()) + by_path = {e["path"]: e for e in result["files"]} + assert by_path["a.txt"]["type"] == "file" + assert by_path["a.txt"]["size"] == "5B" + assert by_path["subdir"]["type"] == "dir" + assert by_path["subdir"]["size"] is None + + +def test_list_files_with_glob_pattern(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + (base / "a.py").write_text("a") + sub = base / "sub" + sub.mkdir() + (sub / "b.py").write_text("b") + (base / "c.txt").write_text("c") + + result = json.loads(ws.list_files(pattern="**/*.py")) + paths = sorted(e["path"] for e in result["files"]) + assert paths == ["a.py", "sub/b.py"] + + +def test_list_files_skips_default_excludes(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + (base / "keep.txt").write_text("keep") + (base / ".venv").mkdir() + (base / ".venv" / "skip.txt").write_text("skip") + + result = json.loads(ws.list_files()) + paths = [e["path"] for e in result["files"]] + assert "keep.txt" in paths + assert ".venv" not in paths + + +def test_list_files_paths_are_relative(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "x.txt").write_text("x") + result = json.loads(ws.list_files()) + for e in result["files"]: + assert not e["path"].startswith("/") + assert not e["path"].startswith(tmp_dir) + + +def test_list_files_recursive_walks_tree(): + """recursive=True returns nested entries up to max_depth.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + (base / "a.txt").write_text("a") + (base / "src").mkdir() + (base / "src" / "b.py").write_text("b") + (base / "src" / "lib").mkdir() + (base / "src" / "lib" / "c.py").write_text("c") + + result = json.loads(ws.list_files(recursive=True)) + paths = sorted(e["path"] for e in result["files"]) + assert "a.txt" in paths + assert "src/b.py" in paths + assert "src/lib/c.py" in paths + assert result["recursive"] is True + + +def test_list_files_recursive_respects_max_depth(): + """max_depth=1 returns root children plus entries one level inside them.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + (base / "top.txt").write_text("a") + (base / "lvl1").mkdir() + (base / "lvl1" / "mid.txt").write_text("b") + (base / "lvl1" / "lvl2").mkdir() + (base / "lvl1" / "lvl2" / "deep.txt").write_text("c") + + result = json.loads(ws.list_files(recursive=True, max_depth=1)) + paths = sorted(e["path"] for e in result["files"]) + assert "top.txt" in paths + assert "lvl1" in paths + # Files at the boundary (depth 1) are shown. + assert "lvl1/mid.txt" in paths + assert "lvl1/lvl2" in paths + # Files beyond max_depth are not shown. + assert "lvl1/lvl2/deep.txt" not in paths + + +def test_list_files_recursive_max_depth_2_shows_two_levels(): + """max_depth=2 shows entries up to depth 2 but not depth 3.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + (base / "root.txt").write_text("a") + (base / "d1").mkdir() + (base / "d1" / "f1.txt").write_text("b") + (base / "d1" / "d2").mkdir() + (base / "d1" / "d2" / "f2.txt").write_text("c") + (base / "d1" / "d2" / "d3").mkdir() + (base / "d1" / "d2" / "d3" / "f3.txt").write_text("d") + + result = json.loads(ws.list_files(recursive=True, max_depth=2)) + paths = sorted(e["path"] for e in result["files"]) + assert "root.txt" in paths + assert "d1" in paths + assert "d1/f1.txt" in paths + assert "d1/d2" in paths + assert "d1/d2/f2.txt" in paths + assert "d1/d2/d3" in paths + # depth 3 is beyond max_depth=2 + assert "d1/d2/d3/f3.txt" not in paths + + +# ------------------------------------------------------------------ +# search_content +# ------------------------------------------------------------------ + + +def test_search_content_finds_matches(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + (base / "hello.txt").write_text("Hello World, this is a test file") + (base / "other.py").write_text("def greet():\n print('hello')") + (base / "nope.txt").write_text("nothing relevant") + + result = json.loads(ws.search_content(query="hello")) + assert result["matches_found"] == 2 + names = [m["file"] for m in result["files"]] + assert "hello.txt" in names + assert "other.py" in names + + +def test_search_content_directory_scoping(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + (base / "root.txt").write_text("target") + sub = base / "sub" + sub.mkdir() + (sub / "nested.txt").write_text("target also") + + result = json.loads(ws.search_content(query="target", directory="sub")) + assert result["matches_found"] == 1 + assert result["files"][0]["file"] == "sub/nested.txt" + + +def test_search_content_skips_excluded_dirs(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + base = Path(tmp_dir) + venv_pkg = base / ".venv" / "lib" + venv_pkg.mkdir(parents=True) + (venv_pkg / "hit.py").write_text("# TODO: vendor") + (base / "real.py").write_text("# TODO: real work") + + result = json.loads(ws.search_content(query="TODO")) + names = [m["file"] for m in result["files"]] + assert result["matches_found"] == 1 + assert "real.py" in names + assert not any(".venv" in f for f in names) + + +def test_search_content_empty_query(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + assert ws.search_content(query="").startswith("Error") + + +# ------------------------------------------------------------------ +# write_file (atomic) +# ------------------------------------------------------------------ + + +def test_write_file_creates_parent_dirs(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + result = ws.write_file("nested/deep/file.txt", "hi") + assert "Wrote" in result + assert (Path(tmp_dir) / "nested" / "deep" / "file.txt").read_text() == "hi" + + +def test_write_file_no_overwrite(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + ws.write_file("a.txt", "first") + result = ws.write_file("a.txt", "second", overwrite=False) + assert result.startswith("Error") + assert (Path(tmp_dir) / "a.txt").read_text() == "first" + + +def test_write_file_atomic_no_tmp_leftover(): + """A successful write should not leave a .tmp file behind.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + ws.write_file("a.txt", "content") + assert (Path(tmp_dir) / "a.txt").read_text() == "content" + assert not (Path(tmp_dir) / "a.txt.tmp").exists() + + +# ------------------------------------------------------------------ +# edit_file (replace_all) +# ------------------------------------------------------------------ + + +def test_edit_file_replaces_unique_match(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "doc.md").write_text("Hello, alpha. Goodbye, beta.") + result = ws.edit_file("doc.md", old_str="alpha", new_str="ALPHA") + assert "replaced 1 occurrence" in result + assert (Path(tmp_dir) / "doc.md").read_text() == "Hello, ALPHA. Goodbye, beta." + + +def test_edit_file_rejects_zero_matches(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "doc.md").write_text("Hello, alpha.") + result = ws.edit_file("doc.md", old_str="missing", new_str="x") + assert "not found" in result + + +def test_edit_file_rejects_multiple_matches_default(): + """Without replace_all, multiple matches → error mentioning replace_all.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "doc.md").write_text("foo foo foo") + result = ws.edit_file("doc.md", old_str="foo", new_str="bar") + assert "matches 3 times" in result + assert "replace_all" in result + # File untouched. + assert (Path(tmp_dir) / "doc.md").read_text() == "foo foo foo" + + +def test_edit_file_replace_all_replaces_every_occurrence(): + """replace_all=True replaces all occurrences and reports the count.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "doc.md").write_text("foo bar foo baz foo") + result = ws.edit_file("doc.md", old_str="foo", new_str="QUX", replace_all=True) + assert "replaced 3 occurrences" in result + assert (Path(tmp_dir) / "doc.md").read_text() == "QUX bar QUX baz QUX" + + +def test_edit_file_empty_old_str_rejected(): + """Empty old_str must be rejected — str.replace('', x) corrupts the file.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "doc.md").write_text("Hello") + result = ws.edit_file("doc.md", old_str="", new_str="X") + assert result.startswith("Error: old_str cannot be empty") + # File must be untouched. + assert (Path(tmp_dir) / "doc.md").read_text() == "Hello" + + +def test_edit_file_empty_old_str_with_replace_all_rejected(): + """Empty old_str with replace_all=True must also be rejected.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "doc.md").write_text("Hello") + result = ws.edit_file("doc.md", old_str="", new_str="X", replace_all=True) + assert result.startswith("Error: old_str cannot be empty") + assert (Path(tmp_dir) / "doc.md").read_text() == "Hello" + + +# ------------------------------------------------------------------ +# move_file +# ------------------------------------------------------------------ + + +def test_move_file_renames_within_workspace(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "old.txt").write_text("hi") + result = ws.move_file("old.txt", "new.txt") + assert "Moved old.txt -> new.txt" in result + assert not (Path(tmp_dir) / "old.txt").exists() + assert (Path(tmp_dir) / "new.txt").read_text() == "hi" + + +def test_move_file_creates_dst_parent_dirs(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "src.txt").write_text("hi") + result = ws.move_file("src.txt", "nested/deep/dst.txt") + assert "Moved" in result + assert (Path(tmp_dir) / "nested" / "deep" / "dst.txt").read_text() == "hi" + + +def test_move_file_refuses_existing_dst_without_overwrite(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "a.txt").write_text("a") + (Path(tmp_dir) / "b.txt").write_text("b") + result = ws.move_file("a.txt", "b.txt") + assert result.startswith("Error: dst exists") + # Both still present, untouched. + assert (Path(tmp_dir) / "a.txt").read_text() == "a" + assert (Path(tmp_dir) / "b.txt").read_text() == "b" + + +def test_move_file_overwrite_true_replaces_dst(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "a.txt").write_text("source") + (Path(tmp_dir) / "b.txt").write_text("target") + result = ws.move_file("a.txt", "b.txt", overwrite=True) + assert "Moved" in result + assert not (Path(tmp_dir) / "a.txt").exists() + assert (Path(tmp_dir) / "b.txt").read_text() == "source" + + +def test_move_file_path_escape_blocked_on_src(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + result = ws.move_file("../outside.txt", "inside.txt") + assert result.startswith("Error: src escapes") + + +def test_move_file_path_escape_blocked_on_dst(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "a.txt").write_text("hi") + result = ws.move_file("a.txt", "../escape.txt") + assert result.startswith("Error: dst escapes") + + +def test_move_file_missing_src(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + result = ws.move_file("does_not_exist.txt", "wherever.txt") + assert "Error: src not found" in result + + +# ------------------------------------------------------------------ +# delete_file +# ------------------------------------------------------------------ + + +def test_delete_file_removes_file(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + target = Path(tmp_dir) / "byebye.txt" + target.write_text("x") + result = ws.delete_file("byebye.txt") + assert "Deleted" in result + assert not target.exists() + + +def test_delete_file_refuses_directory(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + sub = Path(tmp_dir) / "subdir" + sub.mkdir() + result = ws.delete_file("subdir") + assert result.startswith("Error") + assert sub.exists() + + +# ------------------------------------------------------------------ +# require_read_before_write +# ------------------------------------------------------------------ + + +def test_require_read_before_write_blocks_unread_write(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, require_read_before_write=True) + (Path(tmp_dir) / "existing.txt").write_text("original") + result = ws.write_file("existing.txt", "tampered") + assert "require_read_before_write" in result + # File untouched. + assert (Path(tmp_dir) / "existing.txt").read_text() == "original" + + +def test_require_read_before_write_allows_after_read(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, require_read_before_write=True) + (Path(tmp_dir) / "existing.txt").write_text("original") + ws.read_file("existing.txt") + result = ws.write_file("existing.txt", "updated") + assert "Wrote" in result + assert (Path(tmp_dir) / "existing.txt").read_text() == "updated" + + +def test_require_read_before_write_allows_new_file(): + """Creating a new file doesn't require a prior read (nothing to hallucinate).""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, require_read_before_write=True) + result = ws.write_file("brand_new.txt", "content") + assert "Wrote" in result + assert (Path(tmp_dir) / "brand_new.txt").read_text() == "content" + + +def test_require_read_before_write_blocks_unread_edit(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, require_read_before_write=True) + (Path(tmp_dir) / "doc.md").write_text("Hello, world.") + result = ws.edit_file("doc.md", old_str="world", new_str="Agno") + assert "require_read_before_write" in result + assert (Path(tmp_dir) / "doc.md").read_text() == "Hello, world." + + +def test_require_read_before_write_blocks_unread_delete(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, require_read_before_write=True) + (Path(tmp_dir) / "trash.txt").write_text("anything") + result = ws.delete_file("trash.txt") + assert "require_read_before_write" in result + assert (Path(tmp_dir) / "trash.txt").exists() + + +# ------------------------------------------------------------------ +# run_command (ANSI strip) +# ------------------------------------------------------------------ + + +def test_run_command_success(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "file_a.txt").write_text("a") + (Path(tmp_dir) / "file_b.txt").write_text("b") + out = ws.run_command(["ls"]) + assert "file_a.txt" in out + assert "file_b.txt" in out + + +def test_run_command_runs_in_root(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + out = ws.run_command(["pwd"]) + assert out.strip() == str(Path(tmp_dir).resolve()) + + +def test_run_command_returns_error_on_nonzero_exit(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + out = ws.run_command(["ls", "definitely-does-not-exist-xyz"]) + assert out.startswith("Error") + + +def test_run_command_strips_ansi_color_codes(): + """Color codes from CLI output should be stripped before tailing.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + # printf interprets the \x1b escape and produces a literal red "RED" plus reset. + out = ws.run_command(["printf", "\x1b[31mRED\x1b[0m\n"]) + assert out == "RED" + assert "\x1b" not in out + + +def test_run_command_timeout_kills_long_running_process(): + """A command exceeding the timeout should be killed and return an error.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + out = ws.run_command(["sleep", "30"], timeout=1) + assert "timed out" in out + assert "1 seconds" in out + + +def test_run_command_timeout_default_allows_fast_commands(): + """Fast commands should complete normally under the default timeout.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + out = ws.run_command(["echo", "hello"]) + assert out.strip() == "hello" + + +def test_async_run_command_timeout(): + """Async variant should also respect timeout.""" + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + out = asyncio.run(ws.arun_command(["sleep", "30"], timeout=1)) + assert "timed out" in out + assert "1 seconds" in out + + +# ------------------------------------------------------------------ +# Async siblings — spot-check parity with sync +# ------------------------------------------------------------------ + + +def test_async_read_file_matches_sync(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "a.txt").write_text("hi") + sync_result = ws.read_file("a.txt") + async_result = asyncio.run(ws.aread_file("a.txt")) + assert sync_result == async_result == " 1\thi" + + +def test_async_write_then_read(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + + async def go(): + await ws.awrite_file("a.txt", "async write") + return await ws.aread_file("a.txt") + + assert asyncio.run(go()) == " 1\tasync write" + + +def test_async_run_command(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "marker.txt").write_text("x") + out = asyncio.run(ws.arun_command(["ls"])) + assert "marker.txt" in out + + +def test_async_move_file(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir) + (Path(tmp_dir) / "src.txt").write_text("x") + out = asyncio.run(ws.amove_file("src.txt", "dst.txt")) + assert "Moved" in out + assert (Path(tmp_dir) / "dst.txt").read_text() == "x" + + +# ------------------------------------------------------------------ +# Excludes config +# ------------------------------------------------------------------ + + +def test_empty_exclude_patterns_opts_out(): + with tempfile.TemporaryDirectory() as tmp_dir: + ws = Workspace(tmp_dir, exclude_patterns=[]) + venv_pkg = Path(tmp_dir) / ".venv" / "lib" + venv_pkg.mkdir(parents=True) + (venv_pkg / "x.py").write_text("print('x')") + result = json.loads(ws.list_files(pattern="**/*.py")) + paths = [e["path"] for e in result["files"]] + assert any(".venv" in p for p in paths)