From 2aaec8065f72dd9d7b38f21f34f197aad7e13527 Mon Sep 17 00:00:00 2001 From: Mike Pfaffenberger Date: Fri, 13 Feb 2026 14:36:54 -0500 Subject: [PATCH 1/2] chore: save current hashline edit changes --- code_puppy/agents/agent_code_puppy.py | 32 +- code_puppy/agents/agent_creator_agent.py | 30 +- .../plugins/antigravity_oauth/transport.py | 2 +- code_puppy/tools/file_modifications.py | 239 +++----- code_puppy/tools/file_operations.py | 24 +- code_puppy/tools/hashline.py | 293 ++++++++++ puppy-hashline-practice.md | 10 + readability.py | 151 +++++ .../tools/test_file_modifications_extended.py | 272 +++++---- tests/tools/test_file_operations_coverage.py | 4 +- tests/tools/test_file_operations_extended.py | 16 +- tests/tools/test_hashline.py | 537 ++++++++++++++++++ 12 files changed, 1306 insertions(+), 304 deletions(-) create mode 100644 code_puppy/tools/hashline.py create mode 100644 puppy-hashline-practice.md create mode 100644 readability.py create mode 100644 tests/tools/test_hashline.py diff --git a/code_puppy/agents/agent_code_puppy.py b/code_puppy/agents/agent_code_puppy.py index ba0f95068..5329d4373 100644 --- a/code_puppy/agents/agent_code_puppy.py +++ b/code_puppy/agents/agent_code_puppy.py @@ -117,7 +117,7 @@ def get_system_prompt(self) -> str: File Operations: - list_files(directory=".", recursive=True): ALWAYS use this to explore directories before trying to read/modify files - read_file(file_path: str, start_line: int | None = None, num_lines: int | None = None): ALWAYS use this to read existing files before modifying them. By default, read the entire file. If encountering token limits when reading large files, use the optional start_line and num_lines parameters to read specific portions. - - edit_file(payload): Swiss-army file editor powered by Pydantic payloads (ContentPayload, ReplacementsPayload, DeleteSnippetPayload). + - edit_file(payload): Swiss-army file editor powered by Pydantic payloads (HashlineEditPayload, ContentPayload, DeleteSnippetPayload). - delete_file(file_path): Use this to remove files when needed - grep(search_string, directory="."): Use this to recursively search for a string across files starting from the specified directory, capping results at 200 matches. This uses ripgrep (rg) under the hood for high-performance searching across all text file types. @@ -125,35 +125,43 @@ def get_system_prompt(self) -> str: ## edit_file This is an all-in-one file-modification tool. It supports the following Pydantic Object payload types: -1. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file with the provided content. -2. ReplacementsPayload: {{ file_path="example.py", "replacements": [ {{ "old_str": "…", "new_str": "…" }}, … ] }} → Perform exact text replacements inside an existing file. -3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet of text from an existing file. +1. HashlineEditPayload (REQUIRED): {{ file_path="example.py", "edits": [ {{ "operation": "replace", "start_ref": "2:f1", "new_content": "new code" }}, … ] }} → Edit by line-hash reference. Use the line:hash tags from read_file output. +2. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file with the provided content. (ONLY use for new files or complete rewrites) +3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet of text from an existing file. Arguments: - payload (required): One of the Pydantic payload types above. -Example (create): +Example (hashline edit — REQUIRED for all file modifications): +When you read a file, each line is tagged: `:|` +Reference these tags to edit: ```python -edit_file(payload={{file_path="example.py" "content": "print('hello')\n"}}) +edit_file( + payload={{file_path="example.py", "edits": [{{"operation": "replace", "start_ref": "2:f1", "new_content": "bar"}}]}} +) ``` +Hashline operations: "replace", "replace_range" (needs end_ref), "insert_after", "delete", "delete_range" (needs end_ref) -Example (replacement): -- YOU SHOULD PREFER THIS AS THE PRIMARY WAY TO EDIT FILES. +Example (create — ContentPayload ONLY for new files or full rewrites): ```python -edit_file( - payload={{file_path="example.py", "replacements": [{{"old_str": "foo", "new_str": "bar"}}]}} -) +edit_file(payload={{file_path="example.py" "content": "print('hello')\n"}}) ``` -Example (delete snippet): +Example (delete snippet — DeleteSnippetPayload ONLY for removing text): ```python edit_file( payload={{file_path="example.py", "delete_snippet": "# TODO: remove this line"}} ) ``` + +CRITICAL RULE — You MUST use HashlineEditPayload for editing existing files: +• Read the file first to get line:hash tags (e.g., "2:f1|") +• Reference these tags in your edits — this prevents concurrent edit conflicts +• Do NOT try to use old-style string replacement — it is NO LONGER SUPPORTED +• If a hash mismatch occurs, re-read the file and retry with fresh tags Best-practice guidelines for `edit_file`: • Keep each diff small – ideally between 100-300 lines. • Apply multiple sequential `edit_file` calls when you need to refactor large files instead of sending one massive diff. -• Never paste an entire file inside `old_str`; target only the minimal snippet you want changed. • If the resulting file would grow beyond 600 lines, split logic into additional files and create them with separate `edit_file` calls. System Operations: diff --git a/code_puppy/agents/agent_creator_agent.py b/code_puppy/agents/agent_creator_agent.py index 1c4ca6fc7..3d2c1812c 100644 --- a/code_puppy/agents/agent_creator_agent.py +++ b/code_puppy/agents/agent_creator_agent.py @@ -183,7 +183,7 @@ def get_system_prompt(self) -> str: ALWAYS use this to read existing files before modifying them. By default, read the entire file. If encountering token limits when reading large files, use the optional start_line and num_lines parameters to read specific portions. #### `edit_file(payload)` -Swiss-army file editor powered by Pydantic payloads (ContentPayload, ReplacementsPayload, DeleteSnippetPayload). +Swiss-army file editor powered by Pydantic payloads (HashlineEditPayload, ContentPayload, DeleteSnippetPayload). #### `delete_file(file_path)` Use this to remove files when needed @@ -196,24 +196,27 @@ def get_system_prompt(self) -> str: #### `ask_about_model_pinning(agent_config)` Use this method to ask the user whether they want to pin a specific model to their agent. Always call this method before finalizing the agent configuration and include its result in the agent JSON if a model is selected. This is an all-in-one file-modification tool. It supports the following Pydantic Object payload types: -1. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file with the provided content. -2. ReplacementsPayload: {{ file_path="example.py", "replacements": [ {{ "old_str": "…", "new_str": "…" }}, … ] }} → Perform exact text replacements inside an existing file. -3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet of text from an existing file. +1. HashlineEditPayload (REQUIRED): {{ file_path="example.py", "edits": [ {{ "operation": "replace", "start_ref": "2:f1", "new_content": "new code" }} ] }} → Edit by line-hash reference. +2. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file. (ONLY for new files or complete rewrites) +3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet. Arguments: - agent_config (required): The agent configuration dictionary built so far. - payload (required): One of the Pydantic payload types above. -Example (create): +Example (hashline edit — REQUIRED for all file modifications): +When you read a file, each line is tagged: `:|` +Reference these tags to edit: ```python -edit_file(payload={{file_path="example.py" "content": "print('hello')"}}) +edit_file( + payload={{file_path="example.py", "edits": [{{"operation": "replace", "start_ref": "2:f1", "new_content": "bar"}}]}} +) ``` +Hashline operations: "replace", "replace_range" (needs end_ref), "insert_after", "delete", "delete_range" (needs end_ref) -Example (replacement): -- YOU SHOULD PREFER THIS AS THE PRIMARY WAY TO EDIT FILES. +Example (create — ContentPayload ONLY for new files): ```python -edit_file( - payload={{file_path="example.py", "replacements": [{{"old_str": "foo", "new_str": "bar"}}]}} -) +edit_file(payload={{file_path="example.py" "content": "print('hello')"}}) ``` Example (delete snippet): @@ -223,6 +226,13 @@ def get_system_prompt(self) -> str: ) ``` + +CRITICAL RULE — You MUST use HashlineEditPayload for editing existing files: +• Read the file first to get line:hash tags (e.g., "2:f1|") +• Reference these tags in your edits — this prevents concurrent edit conflicts +• Do NOT try to use old-style string replacement — it is NO LONGER SUPPORTED +• If a hash mismatch occurs, re-read the file and retry with fresh tags + NEVER output an entire file – this is very expensive. You may not edit file extensions: [.ipynb] diff --git a/code_puppy/plugins/antigravity_oauth/transport.py b/code_puppy/plugins/antigravity_oauth/transport.py index 03d9c130b..2a0feeb9f 100644 --- a/code_puppy/plugins/antigravity_oauth/transport.py +++ b/code_puppy/plugins/antigravity_oauth/transport.py @@ -28,7 +28,7 @@ def _flatten_union_to_object(union_items: list, defs: dict, resolve_fn) -> dict: """Flatten a union of object types into a single object with all properties. - For discriminated unions like EditFilePayload (ContentPayload | ReplacementsPayload | DeleteSnippetPayload), + For discriminated unions like EditFilePayload (ContentPayload | HashlineEditPayload | DeleteSnippetPayload), we merge all object types into one with all properties marked as optional. """ merged_properties = {} diff --git a/code_puppy/tools/file_modifications.py b/code_puppy/tools/file_modifications.py index 0f6f212eb..439a02e7f 100644 --- a/code_puppy/tools/file_modifications.py +++ b/code_puppy/tools/file_modifications.py @@ -25,10 +25,12 @@ DiffLine, DiffMessage, emit_error, - emit_warning, get_message_bus, ) -from code_puppy.tools.common import _find_best_window, generate_group_id +from code_puppy.tools.common import generate_group_id +from code_puppy.tools.hashline import ( + apply_hashline_edits, +) def _create_rejection_response(file_path: str) -> Dict[str, Any]: @@ -77,23 +79,29 @@ class DeleteSnippetPayload(BaseModel): delete_snippet: str -class Replacement(BaseModel): - old_str: str - new_str: str +class ContentPayload(BaseModel): + file_path: str + content: str + overwrite: bool = False -class ReplacementsPayload(BaseModel): - file_path: str - replacements: List[Replacement] +class HashlineEdit(BaseModel): + """A single hashline edit operation.""" + operation: ( + str # "replace" | "replace_range" | "insert_after" | "delete" | "delete_range" + ) + start_ref: str # e.g. "2:f1" + end_ref: str | None = None # for range operations + new_content: str = "" # new lines (empty for delete) -class ContentPayload(BaseModel): + +class HashlineEditPayload(BaseModel): file_path: str - content: str - overwrite: bool = False + edits: List[HashlineEdit] -EditFilePayload = Union[DeleteSnippetPayload, ReplacementsPayload, ContentPayload] +EditFilePayload = Union[DeleteSnippetPayload, ContentPayload, HashlineEditPayload] def _parse_diff_lines(diff_text: str) -> List[DiffLine]: @@ -256,101 +264,6 @@ def _delete_snippet_from_file( return {"error": str(exc), "diff": diff_text} -def _replace_in_file( - context: RunContext | None, - path: str, - replacements: List[Dict[str, str]], - message_group: str | None = None, -) -> Dict[str, Any]: - """Robust replacement engine with explicit edge‑case reporting.""" - file_path = os.path.abspath(path) - diff_text = "" - try: - if not os.path.exists(file_path) or not os.path.isfile(file_path): - return {"error": f"File '{file_path}' does not exist.", "diff": diff_text} - - with open(file_path, "r", encoding="utf-8", errors="surrogateescape") as f: - original = f.read() - - # Sanitize any surrogate characters from reading - try: - original = original.encode("utf-8", errors="surrogatepass").decode( - "utf-8", errors="replace" - ) - except (UnicodeEncodeError, UnicodeDecodeError): - pass - - modified = original - for rep in replacements: - old_snippet = rep.get("old_str", "") - new_snippet = rep.get("new_str", "") - - if old_snippet and old_snippet in modified: - modified = modified.replace(old_snippet, new_snippet, 1) - continue - - had_trailing_newline = modified.endswith("\n") - orig_lines = modified.splitlines() - loc, score = _find_best_window(orig_lines, old_snippet) - - if score < 0.95 or loc is None: - return { - "error": "No suitable match in file (JW < 0.95)", - "jw_score": score, - "received": old_snippet, - "diff": "", - } - - start, end = loc - prefix = "\n".join(orig_lines[:start]) - suffix = "\n".join(orig_lines[end:]) - parts = [] - if prefix: - parts.append(prefix) - parts.append(new_snippet.rstrip("\n")) - if suffix: - parts.append(suffix) - modified = "\n".join(parts) - if had_trailing_newline and not modified.endswith("\n"): - modified += "\n" - - if modified == original: - emit_warning( - "No changes to apply – proposed content is identical.", - message_group=message_group, - ) - return { - "success": False, - "path": file_path, - "message": "No changes to apply.", - "changed": False, - "diff": "", - } - - from code_puppy.config import get_diff_context_lines - - diff_text = "".join( - difflib.unified_diff( - original.splitlines(keepends=True), - modified.splitlines(keepends=True), - fromfile=f"a/{os.path.basename(file_path)}", - tofile=f"b/{os.path.basename(file_path)}", - n=get_diff_context_lines(), - ) - ) - with open(file_path, "w", encoding="utf-8") as f: - f.write(modified) - return { - "success": True, - "path": file_path, - "message": "Replacements applied.", - "changed": True, - "diff": diff_text, - } - except Exception as exc: - return {"error": str(exc), "diff": diff_text} - - def _write_to_file( context: RunContext | None, path: str, @@ -471,33 +384,6 @@ def write_to_file( return res -def replace_in_file( - context: RunContext, - path: str, - replacements: List[Dict[str, str]], - message_group: str | None = None, -) -> Dict[str, Any]: - # Use the plugin system for permission handling with operation data - from code_puppy.callbacks import on_file_permission - - operation_data = {"replacements": replacements} - permission_results = on_file_permission( - context, path, "replace text in", None, message_group, operation_data - ) - - # If any permission handler denies the operation, return cancelled result - if permission_results and any( - not result for result in permission_results if result is not None - ): - return _create_rejection_response(path) - - res = _replace_in_file(context, path, replacements, message_group=message_group) - diff = res.get("diff", "") - if diff: - _emit_diff_message(path, "modify", diff) - return res - - def _edit_file( context: RunContext, payload: EditFilePayload, group_id: str | None = None ) -> Dict[str, Any]: @@ -511,7 +397,7 @@ def _edit_file( Supported payload variants -------------------------- • **ContentPayload** – full file write / overwrite. - • **ReplacementsPayload** – targeted in-file replacements. + • **HashlineEditPayload** – edit by line-hash reference (preferred). • **DeleteSnippetPayload** – remove an exact snippet. The helper decides which low-level routine to delegate to and ensures the resulting unified @@ -543,15 +429,52 @@ def _edit_file( return delete_snippet_from_file( context, file_path, payload.delete_snippet, message_group=group_id ) - elif isinstance(payload, ReplacementsPayload): - # Convert Pydantic Replacement models to dict format for legacy compatibility - replacements_dict = [ - {"old_str": rep.old_str, "new_str": rep.new_str} - for rep in payload.replacements - ] - return replace_in_file( - context, file_path, replacements_dict, message_group=group_id + elif isinstance(payload, HashlineEditPayload): + file_path_abs = os.path.abspath(payload.file_path) + try: + with open( + file_path_abs, "r", encoding="utf-8", errors="surrogateescape" + ) as f: + old_content = f.read() + except OSError as exc: + return { + "success": False, + "path": file_path_abs, + "message": str(exc), + "changed": False, + } + + result = apply_hashline_edits( + file_path_abs, [e.model_dump() for e in payload.edits] + ) + if not result["success"]: + return { + "success": False, + "path": file_path_abs, + "message": "; ".join(result["errors"]), + "changed": False, + } + + from code_puppy.config import get_diff_context_lines + + diff_text = "".join( + difflib.unified_diff( + old_content.splitlines(keepends=True), + result["content"].splitlines(keepends=True), + fromfile=f"a/{os.path.basename(file_path_abs)}", + tofile=f"b/{os.path.basename(file_path_abs)}", + n=get_diff_context_lines(), + ) ) + if diff_text: + _emit_diff_message(file_path_abs, "modify", diff_text) + return { + "success": True, + "path": file_path_abs, + "message": "Hashline edits applied.", + "changed": bool(diff_text), + "diff": diff_text, + } elif isinstance(payload, ContentPayload): file_exists = os.path.exists(file_path) if file_exists and not payload.overwrite: @@ -667,7 +590,15 @@ def edit_file( Args: context (RunContext): The PydanticAI runtime context for the agent. - payload: One of three payload types: + payload: One of four payload types: + + HashlineEditPayload (PREFERRED — use when you read files with hashline=True): + - file_path (str): Path to file + - edits (List[HashlineEdit]): List of edits where each HashlineEdit contains: + - operation (str): "replace" | "replace_range" | "insert_after" | "delete" | "delete_range" + - start_ref (str): Line hash reference e.g. "2:f1" (from hashline-tagged read output) + - end_ref (str | None): End reference for range operations + - new_content (str): Replacement text (empty for deletes) ContentPayload: - file_path (str): Path to file @@ -675,13 +606,6 @@ def edit_file( - overwrite (bool, optional): Whether to overwrite existing files. Defaults to False (safe mode). - ReplacementsPayload: - - file_path (str): Path to file - - replacements (List[Replacement]): List of text replacements where - each Replacement contains: - - old_str (str): Exact text to find and replace - - new_str (str): Replacement text - DeleteSnippetPayload: - file_path (str): Path to file - delete_snippet (str): Exact text snippet to remove from file @@ -750,8 +674,15 @@ def edit_file( try: # Fallback for weird models that just can't help but send json strings... payload_dict = json.loads(json_repair.repair_json(payload)) - if "replacements" in payload_dict: - payload = ReplacementsPayload(**payload_dict) + if "edits" in payload_dict: + payload = HashlineEditPayload(**payload_dict) + elif "replacements" in payload_dict: + return { + "success": False, + "path": payload_dict.get("file_path", "Unknown"), + "message": "'replacements' is no longer supported. Use 'edits' with HashlineEditPayload instead.", + "changed": False, + } elif "delete_snippet" in payload_dict: payload = DeleteSnippetPayload(**payload_dict) elif "content" in payload_dict: @@ -763,7 +694,7 @@ def edit_file( return { "success": False, "path": file_path, - "message": f"One of 'content', 'replacements', or 'delete_snippet' must be provided in payload. Refer to the following examples: {parse_error_message}", + "message": f"One of 'edits', 'content', or 'delete_snippet' must be provided in payload. Refer to the following examples: {parse_error_message}", "changed": False, } except Exception as e: diff --git a/code_puppy/tools/file_operations.py b/code_puppy/tools/file_operations.py index 15d1303b6..0a912cc8e 100644 --- a/code_puppy/tools/file_operations.py +++ b/code_puppy/tools/file_operations.py @@ -452,6 +452,7 @@ def _read_file( file_path: str, start_line: int | None = None, num_lines: int | None = None, + hashline: bool = True, ) -> ReadFileOutput: file_path = os.path.abspath(os.path.expanduser(file_path)) @@ -501,6 +502,19 @@ def _read_file( for char in content ) + # If hashline mode requested, format content and cache hashes + if hashline: + from code_puppy.tools.hashline import ( + cache_file_hashes, + compute_file_hashes, + format_hashlines, + ) + + cache_file_hashes(file_path, compute_file_hashes(content)) + # Pass start_line so partial reads get correct line numbers + offset = start_line if start_line is not None else 1 + content = format_hashlines(content, start_line=offset) + # Simple approximation: ~4 characters per token num_tokens = len(content) // 4 if num_tokens > 10000: @@ -809,6 +823,7 @@ def read_file( file_path: str = "", start_line: int | None = None, num_lines: int | None = None, + hashline: bool = True, ) -> ReadFileOutput: """Read file contents with optional line-range selection and token safety. @@ -849,13 +864,20 @@ def read_file( >>> if result.error: ... print(f"Error: {result.error}") + Hashline Mode (default: enabled): + When hashline=True, file content is returned with line-hash tags: + 1:a3|function hello() { + 2:f1| return "world"; + Use these tags with HashlineEditPayload to edit by reference. + Set hashline=False to get raw content without tags. + Best Practices: - Always check for errors before using content - Use line ranges for large files to avoid token limits - Monitor num_tokens to stay within context limits - Combine with list_files to find files first """ - return _read_file(context, file_path, start_line, num_lines) + return _read_file(context, file_path, start_line, num_lines, hashline=hashline) def register_grep(agent): diff --git a/code_puppy/tools/hashline.py b/code_puppy/tools/hashline.py new file mode 100644 index 000000000..2b5369139 --- /dev/null +++ b/code_puppy/tools/hashline.py @@ -0,0 +1,293 @@ +"""Hashline engine for file editing. + +Each line gets tagged with a 2-char content hash so models can reference +lines by hash instead of reproducing exact text. This eliminates the +fragile "find exact string" pattern and makes edits robust to whitespace +or minor content drift. +""" + +import hashlib +from collections import OrderedDict + + +class HashlineMismatchError(Exception): + """Raised when a hashline reference doesn't match current file content.""" + + def __init__( + self, line: int, expected_hash: str, actual_hash: str, actual_content: str + ): + self.line = line + self.expected_hash = expected_hash + self.actual_hash = actual_hash + self.actual_content = actual_content + super().__init__( + f"Line {line}: expected hash '{expected_hash}', " + f"got '{actual_hash}' for content: {actual_content!r}" + ) + + +# --------------------------------------------------------------------------- +# Core hashing +# --------------------------------------------------------------------------- + + +def line_hash(content: str) -> str: + """Return a 2-char hex hash of *content* (SHA-256, first byte).""" + return hashlib.sha256(content.encode("utf-8")).hexdigest()[:2] + + +def compute_file_hashes(content: str) -> dict[int, str]: + """Return ``{line_number: hash}`` for every line (1-based).""" + return {i: line_hash(line) for i, line in enumerate(content.splitlines(), start=1)} + + +# --------------------------------------------------------------------------- +# Formatting & parsing +# --------------------------------------------------------------------------- + + +def format_hashlines(content: str, start_line: int = 1) -> str: + """Convert file content to hashline display format. + + Args: + content: Raw file content. + start_line: Line number offset (1-based). Use this when formatting + a partial read so line numbers match the actual file. + + Example output:: + + 1:a3|function hello() { + 2:f1| return "world"; + """ + lines = content.splitlines() + parts: list[str] = [] + for i, raw in enumerate(lines, start=start_line): + h = line_hash(raw) + parts.append(f"{i}:{h}|{raw}") + return "\n".join(parts) + + +def parse_hashline_ref(ref: str) -> tuple[int, str]: + """Parse ``"2:f1"`` → ``(2, "f1")``. Raises *ValueError* on bad format.""" + if ":" not in ref: + raise ValueError(f"Invalid hashline ref (missing ':'): {ref!r}") + line_str, hash_str = ref.split(":", maxsplit=1) + try: + line_num = int(line_str) + except ValueError: + raise ValueError(f"Invalid line number in ref: {ref!r}") from None + if line_num < 1: + raise ValueError(f"Line number must be >= 1, got {line_num} in ref: {ref!r}") + if len(hash_str) != 2: + raise ValueError( + f"Hash must be exactly 2 hex chars, got {hash_str!r} in ref: {ref!r}" + ) + return line_num, hash_str + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + + +def validate_hashes( + refs: list[tuple[int, str]], + current_content: str, +) -> list[str]: + """Validate each ``(line, hash)`` pair against *current_content*. + + Returns a list of human-readable error messages (empty == all valid). + """ + file_hashes = compute_file_hashes(current_content) + total_lines = len(file_hashes) + errors: list[str] = [] + + for line_num, expected in refs: + if line_num > total_lines: + errors.append( + f"Line {line_num} out of range (file has {total_lines} lines)" + ) + continue + actual = file_hashes[line_num] + if actual != expected: + errors.append( + f"Line {line_num}: expected hash '{expected}', got '{actual}'" + ) + return errors + + +# --------------------------------------------------------------------------- +# LRU cache (stdlib-only, no functools.lru_cache – we cache per file path) +# --------------------------------------------------------------------------- + +_CACHE_MAX = 100 +_hashline_cache: OrderedDict[str, dict[int, str]] = OrderedDict() + + +def cache_file_hashes(file_path: str, hashes: dict[int, str]) -> None: + """Store *hashes* for *file_path*, evicting oldest if over capacity.""" + if file_path in _hashline_cache: + _hashline_cache.move_to_end(file_path) + _hashline_cache[file_path] = hashes + while len(_hashline_cache) > _CACHE_MAX: + _hashline_cache.popitem(last=False) + + +def get_cached_hashes(file_path: str) -> dict[int, str] | None: + """Return cached hashes for *file_path*, or ``None`` if missing.""" + if file_path in _hashline_cache: + _hashline_cache.move_to_end(file_path) + return _hashline_cache[file_path] + return None + + +def invalidate_cache(file_path: str) -> None: + """Remove *file_path* from the cache.""" + _hashline_cache.pop(file_path, None) + + +# --------------------------------------------------------------------------- +# Edit application +# --------------------------------------------------------------------------- + + +def _resolve_edit_range(edit: dict) -> tuple[int, int, str, str]: + """Return ``(start_line, end_line, start_hash, end_hash)`` for an edit.""" + start_line, start_hash = parse_hashline_ref(edit["start_ref"]) + operation = edit["operation"] + + if operation in ("replace_range", "delete_range"): + if not edit.get("end_ref"): + raise ValueError(f"'{operation}' requires 'end_ref'") + end_line, end_hash = parse_hashline_ref(edit["end_ref"]) + if end_line < start_line: + raise ValueError( + f"end_ref line ({end_line}) < start_ref line ({start_line})" + ) + return start_line, end_line, start_hash, end_hash + + return start_line, start_line, start_hash, start_hash + + +def _check_overlaps(ranges: list[tuple[int, int, int]]) -> list[str]: + """Detect overlapping edit ranges. *ranges* = [(start, end, index), …].""" + sorted_ranges = sorted(ranges, key=lambda r: (r[0], r[1])) + errors: list[str] = [] + for i in range(len(sorted_ranges) - 1): + _, end_a, idx_a = sorted_ranges[i] + start_b, _, idx_b = sorted_ranges[i + 1] + if end_a >= start_b: + errors.append( + f"Edit {idx_a} (ending line {end_a}) overlaps with " + f"edit {idx_b} (starting line {start_b})" + ) + return errors + + +def apply_hashline_edits( + file_path: str, + edits: list[dict], +) -> dict: + """Apply a batch of hashline-referenced edits to *file_path*. + + Each *edit* dict must contain: + + - ``operation``: ``"replace"`` | ``"replace_range"`` | ``"insert_after"`` + | ``"delete"`` | ``"delete_range"`` + - ``start_ref``: e.g. ``"2:f1"`` + - ``end_ref``: required for range operations, else ``None`` + - ``new_content``: replacement text (empty string for deletes) + + Returns ``{"success": bool, "content": str, "errors": list[str]}``. + """ + # 1. Read current file + try: + with open(file_path, "r", encoding="utf-8") as fh: + current_content = fh.read() + except OSError as exc: + return {"success": False, "content": "", "errors": [str(exc)]} + + lines = current_content.splitlines() + errors: list[str] = [] + + # 2. Parse & collect all refs for batch validation + parsed: list[tuple[int, int, dict]] = [] # (start, end, edit) + all_refs: list[tuple[int, str]] = [] + + for i, edit in enumerate(edits): + valid_ops = ( + "replace", + "replace_range", + "insert_after", + "delete", + "delete_range", + ) + op = edit.get("operation", "") + if op not in valid_ops: + errors.append(f"Edit {i}: unknown operation '{op}'") + continue + try: + start, end, s_hash, e_hash = _resolve_edit_range(edit) + except ValueError as exc: + errors.append(f"Edit {i}: {exc}") + continue + + all_refs.append((start, s_hash)) + if end != start: + all_refs.append((end, e_hash)) + parsed.append((start, end, edit)) + + if errors: + return {"success": False, "content": current_content, "errors": errors} + + # Validate ALL hashes up-front – reject entire batch on any mismatch + hash_errors = validate_hashes(all_refs, current_content) + if hash_errors: + return {"success": False, "content": current_content, "errors": hash_errors} + + # 3. Check for overlapping edits + ranges_for_overlap = [] + for i, (start, end, edit) in enumerate(parsed): + if edit["operation"] == "insert_after": + # Inserts don't occupy a range; they go *after* the line + continue + ranges_for_overlap.append((start, end, i)) + + overlap_errors = _check_overlaps(ranges_for_overlap) + if overlap_errors: + return {"success": False, "content": current_content, "errors": overlap_errors} + + # 4. Apply edits in reverse line order so indices stay stable + sorted_edits = sorted(parsed, key=lambda p: p[0], reverse=True) + + for start, end, edit in sorted_edits: + op = edit["operation"] + new_lines = ( + edit.get("new_content", "").splitlines() if edit.get("new_content") else [] + ) + start_idx = start - 1 # 0-based + end_idx = end # exclusive upper bound for slice replacement + + if op == "replace": + lines[start_idx : start_idx + 1] = new_lines + elif op == "replace_range": + lines[start_idx:end_idx] = new_lines + elif op == "insert_after": + lines[start_idx + 1 : start_idx + 1] = new_lines + elif op == "delete": + del lines[start_idx] + elif op == "delete_range": + del lines[start_idx:end_idx] + + new_content = "\n".join(lines) + # Preserve trailing newline if original had one + if current_content.endswith("\n"): + new_content += "\n" + + # 5. Invalidate cache & write + invalidate_cache(file_path) + with open(file_path, "w", encoding="utf-8") as fh: + fh.write(new_content) + cache_file_hashes(file_path, compute_file_hashes(new_content)) + + return {"success": True, "content": new_content, "errors": []} diff --git a/puppy-hashline-practice.md b/puppy-hashline-practice.md new file mode 100644 index 000000000..03dd629a5 --- /dev/null +++ b/puppy-hashline-practice.md @@ -0,0 +1,10 @@ +# Puppy HashLine Practice + +This file is for HashLine editing drills, now with maximum puppy chaos. +Initial line got upgraded. + +Alien parade: +👽 👽 👽 👽 👽 👽 👽 👽 👽 👽 +🛸 🛸 🛸 🛸 🛸 🛸 +👾 👾 👾 👾 👾 +✨🛸✨🛸✨🛸✨ diff --git a/readability.py b/readability.py new file mode 100644 index 000000000..fcb80fbad --- /dev/null +++ b/readability.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +"""Minimal CLI to extract readable text from a web page. + +Usage: python readability.py +""" + +import re +import sys +from html.parser import HTMLParser + +import httpx + +SKIP_TAGS = { + "script", + "style", + "nav", + "iframe", + "noscript", + "svg", + "form", + "button", + "input", + "select", + "textarea", +} +BLOCK_TAGS = { + "p", + "div", + "article", + "section", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "li", + "tr", + "blockquote", + "pre", + "br", + "hr", + "dt", + "dd", + "aside", + "header", + "footer", + "main", + "figure", +} +SKIP_PAT = re.compile( + r"\b(sidebar|comment|social|share|related|advert|promo|widget|popup)\b", re.I +) +KEEP_PAT = re.compile(r"\b(article|post|entry|content|main|body|text|story)\b", re.I) + + +class Parser(HTMLParser): + def __init__(self): + super().__init__() + self._skip_stack = [] # stack of tag names being skipped + self._chunks = [] + self.title = "" + self._in_title = False + + @property + def _skipping(self): + return len(self._skip_stack) > 0 + + def handle_starttag(self, tag, attrs): + tag = tag.lower() + if tag == "title": + self._in_title = True + return + if self._skipping: + if tag in SKIP_TAGS: + self._skip_stack.append(tag) + return + if tag in SKIP_TAGS: + self._skip_stack.append(tag) + return + # Check class/id for skip patterns + a = dict(attrs) + ci = a.get("class", "") + " " + a.get("id", "") + if ci.strip() and SKIP_PAT.search(ci) and not KEEP_PAT.search(ci): + self._skip_stack.append(tag) + return + if tag in BLOCK_TAGS: + self._chunks.append("\n") + + def handle_endtag(self, tag): + tag = tag.lower() + if tag == "title": + self._in_title = False + return + if self._skipping: + if self._skip_stack and self._skip_stack[-1] == tag: + self._skip_stack.pop() + return + if tag in BLOCK_TAGS: + self._chunks.append("\n") + + def handle_data(self, data): + if self._in_title: + self.title = data.strip() + return + if self._skipping: + return + self._chunks.append(data) + + def handle_entityref(self, name): + from html import unescape + + if self._skipping: + return + self._chunks.append(unescape(f"&{name};")) + + def handle_charref(self, name): + from html import unescape + + if self._skipping: + return + self._chunks.append(unescape(f"&#{name};")) + + def get_text(self): + raw = "".join(self._chunks) + lines = [" ".join(line.split()) for line in raw.split("\n")] + text = "\n".join(line for line in lines if line) + return re.sub(r"\n{3,}", "\n\n", text).strip() + + +def main(): + if len(sys.argv) < 2: + print("Usage: python readability.py ", file=sys.stderr) + sys.exit(1) + url = sys.argv[1] + r = httpx.get( + url, + follow_redirects=True, + timeout=30, + headers={"User-Agent": "Mozilla/5.0 (compatible; ReadabilityBot/1.0)"}, + ) + r.raise_for_status() + p = Parser() + p.feed(r.text) + if p.title: + print(f"# {p.title}\n") + print(p.get_text()) + + +if __name__ == "__main__": + main() diff --git a/tests/tools/test_file_modifications_extended.py b/tests/tools/test_file_modifications_extended.py index 52da75c3f..697c44838 100644 --- a/tests/tools/test_file_modifications_extended.py +++ b/tests/tools/test_file_modifications_extended.py @@ -7,11 +7,18 @@ from code_puppy.tools.file_modifications import ( ContentPayload, DeleteSnippetPayload, - Replacement, - ReplacementsPayload, + HashlineEdit, + HashlineEditPayload, _delete_file, _edit_file, ) +from code_puppy.tools.hashline import compute_file_hashes + + +def _make_ref(content: str, line_num: int) -> str: + """Make a hashline ref like '2:f1' for a given line in content.""" + hashes = compute_file_hashes(content) + return f"{line_num}:{hashes[line_num]}" class TestFileModificationsExtended: @@ -19,11 +26,9 @@ class TestFileModificationsExtended: def test_apply_simple_modification(self, tmp_path): """Test basic file modification with content replacement.""" - # Create test file test_file = tmp_path / "test.py" test_file.write_text("print('hello world')") - # Apply modification payload = ContentPayload( file_path=str(test_file), content="print('hello modified')", overwrite=True ) @@ -37,21 +42,24 @@ def test_apply_simple_modification(self, tmp_path): assert "diff" in result def test_apply_replacements_modification(self, tmp_path): - """Test targeted text replacements.""" + """Test targeted text replacements via hashline edits.""" test_file = tmp_path / "config.py" - test_file.write_text( - """ -debug = False -version = "1.0.0" -author = "test" - """.strip() - ) + content = 'debug = False\nversion = "1.0.0"\nauthor = "test"' + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="debug = False", new_str="debug = True"), - Replacement(old_str='version = "1.0.0"', new_str='version = "2.0.0"'), + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="debug = True", + ), + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 2), + new_content='version = "2.0.0"', + ), ], ) @@ -95,17 +103,21 @@ def test_invalid_patch_nonexistent_file(self, tmp_path): """Test error handling for non-existent files.""" nonexistent_file = tmp_path / "doesnotexist.py" - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(nonexistent_file), - replacements=[Replacement(old_str="old", new_str="new")], + edits=[ + HashlineEdit( + operation="replace", + start_ref="1:aa", + new_content="new", + ) + ], ) mock_context = Mock() result = _edit_file(mock_context, payload) - # Error responses may have different structures assert "success" not in result or result["success"] is False - # The error may be in the 'error' or 'message' field error_text = (result.get("error", "") + result.get("message", "")).lower() assert "does not exist" in error_text or "no such file" in error_text @@ -121,29 +133,30 @@ def test_invalid_patch_snippet_not_found(self, tmp_path): mock_context = Mock() result = _edit_file(mock_context, payload) - # Error responses may have different structures assert "success" not in result or result["success"] is False assert "snippet not found" in result.get("error", "").lower() def test_invalid_patch_replacement_not_found(self, tmp_path): - """Test error handling when replacement text is not found.""" + """Test error handling when hashline ref doesn't match.""" test_file = tmp_path / "test.py" test_file.write_text("print('existing code')") - payload = ReplacementsPayload( + # Use a bogus hash that won't match + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[Replacement(old_str="nonexistent text", new_str="new text")], + edits=[ + HashlineEdit( + operation="replace", + start_ref="1:zz", + new_content="new text", + ) + ], ) mock_context = Mock() result = _edit_file(mock_context, payload) - # Error responses may have different structures assert "success" not in result or result["success"] is False - assert ( - "no suitable match" in result.get("error", "").lower() - or "jw < 0.95" in result.get("error", "").lower() - ) def test_overwrite_protection(self, tmp_path): """Test that existing files are protected without overwrite flag.""" @@ -153,7 +166,7 @@ def test_overwrite_protection(self, tmp_path): payload = ContentPayload( file_path=str(test_file), content="new content", - overwrite=False, # Should not overwrite + overwrite=False, ) mock_context = Mock() @@ -161,53 +174,48 @@ def test_overwrite_protection(self, tmp_path): assert result["success"] is False assert "exists" in result.get("message", "").lower() - assert test_file.read_text() == "original content" # Unchanged + assert test_file.read_text() == "original content" def test_no_changes_scenario(self, tmp_path): """Test handling when no changes would be made.""" test_file = tmp_path / "test.py" - original_content = "print('hello')" - test_file.write_text(original_content) + content = "print('hello')" + test_file.write_text(content) - payload = ReplacementsPayload( + # Replace line with identical content — should result in no changes + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement( - old_str="print('hello')", new_str="print('hello')" - ) # Same content + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="print('hello')", + ) ], ) mock_context = Mock() result = _edit_file(mock_context, payload) - assert result["success"] is False assert result["changed"] is False - assert "no changes" in result.get("message", "").lower() def test_line_number_handling_multiline_replacement(self, tmp_path): """Test line number handling with multiline replacements.""" test_file = tmp_path / "multiline.py" - test_file.write_text( - """ -def func1(): - return 1 - -def func2(): - return 2 - -def func3(): - return 3 - """.strip() - ) - - # Replace the entire func2 block - old_func = "def func2():\n return 2" - new_func = "def func2():\n # Enhanced version\n return 2 + 1" + content = "def func1():\n return 1\n\ndef func2():\n return 2\n\ndef func3():\n return 3" + test_file.write_text(content) - payload = ReplacementsPayload( + # Replace func2 block (lines 4-5) with a range replace + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[Replacement(old_str=old_func, new_str=new_func)], + edits=[ + HashlineEdit( + operation="replace_range", + start_ref=_make_ref(content, 4), + end_ref=_make_ref(content, 5), + new_content="def func2():\n # Enhanced version\n return 2 + 1", + ) + ], ) mock_context = Mock() @@ -218,15 +226,14 @@ def func3(): content = test_file.read_text() assert "# Enhanced version" in content assert "return 2 + 1" in content - assert "def func1():" in content # Should remain - assert "def func3():" in content # Should remain + assert "def func1():" in content + assert "def func3():" in content def test_error_recovery_file_permissions(self, tmp_path): """Test error recovery when file permissions prevent modification.""" test_file = tmp_path / "readonly.py" test_file.write_text("original content") - # Make file read-only os.chmod(test_file, 0o444) try: @@ -237,28 +244,29 @@ def test_error_recovery_file_permissions(self, tmp_path): mock_context = Mock() result = _edit_file(mock_context, payload) - # Should handle the permission error gracefully - # Error responses may have different structures assert ( "success" not in result or result["success"] is False or "error" in result ) finally: - # Restore permissions for cleanup os.chmod(test_file, 0o644) def test_multiple_replacements_order(self, tmp_path): - """Test that multiple replacements are applied in order.""" + """Test that multiple sequential hashline edits are applied.""" test_file = tmp_path / "order_test.py" - test_file.write_text("var_a = 1") + content = "var_a = 1" + test_file.write_text(content) - payload = ReplacementsPayload( + # Single edit replacing the one line to final value + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="var_a = 1", new_str="var_a = 2"), - Replacement(old_str="var_a = 2", new_str="var_a = 3"), - Replacement(old_str="var_a = 3", new_str="var_a = final"), + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="var_a = final", + ), ], ) @@ -269,16 +277,19 @@ def test_multiple_replacements_order(self, tmp_path): assert test_file.read_text() == "var_a = final" def test_special_characters_handling(self, tmp_path): - """Test handling of special characters in replacements.""" + """Test handling of special characters in hashline edits.""" test_file = tmp_path / "special.py" - test_file.write_text('text = "Hello "World"!\nNew line"') + content = 'text = "Hello "World"!\nNew line"' + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement( - old_str='"Hello "World"!\nNew line"', - new_str="\"Hello 'Python'!\n\tTabbed\"", + edits=[ + HashlineEdit( + operation="replace_range", + start_ref=_make_ref(content, 1), + end_ref=_make_ref(content, 2), + new_content="text = \"Hello 'Python'!\n\tTabbed\"", ) ], ) @@ -295,15 +306,19 @@ def test_large_file_handling(self, tmp_path): """Test handling of larger files.""" test_file = tmp_path / "large.py" - # Create a moderately large file lines = [f"line_{i} = {i}" for i in range(100)] - test_file.write_text("\n".join(lines)) + content = "\n".join(lines) + test_file.write_text(content) - # Replace a line in the middle - payload = ReplacementsPayload( + # Replace line 51 (0-indexed line_50, 1-indexed line 51) + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="line_50 = 50", new_str="line_50 = MODIFIED") + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 51), + new_content="line_50 = MODIFIED", + ) ], ) @@ -313,19 +328,23 @@ def test_large_file_handling(self, tmp_path): assert result["success"] is True content = test_file.read_text() assert "line_50 = MODIFIED" in content - assert "line_49 = 49" in content # Should remain - assert "line_51 = 51" in content # Should remain + assert "line_49 = 49" in content + assert "line_51 = 51" in content def test_unicode_content_handling(self, tmp_path): """Test handling of Unicode characters in file content.""" test_file = tmp_path / "unicode.py" - unicode_content = "# 测试文件\nprint('Hello 世界! 🌍')\nemoji = 🐕" - test_file.write_text(unicode_content, encoding="utf-8") + content = "# 测试文件\nprint('Hello 世界! 🌍')\nemoji = 🐕" + test_file.write_text(content, encoding="utf-8") - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="Hello 世界! 🌍", new_str="Hello Python! 🐍") + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 2), + new_content="print('Hello Python! 🐍')", + ) ], ) @@ -335,8 +354,8 @@ def test_unicode_content_handling(self, tmp_path): assert result["success"] is True content = test_file.read_text(encoding="utf-8") assert "Hello Python! 🐍" in content - assert "# 测试文件" in content # Should remain - assert "emoji = 🐕" in content # Should remain + assert "# 测试文件" in content + assert "emoji = 🐕" in content def test_empty_file_handling(self, tmp_path): """Test handling of empty files.""" @@ -370,7 +389,6 @@ def test_directory_creation(self, tmp_path): def test_edit_file_function_variants(self): """Test the _edit_file function with different payload variants.""" - # Test the main _edit_file function directly with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".py") as f: f.write("print('test')") temp_path = f.name @@ -378,14 +396,12 @@ def test_edit_file_function_variants(self): try: mock_context = Mock() - # Test with ContentPayload payload = ContentPayload( file_path=temp_path, content="print('modified')", overwrite=True ) result = _edit_file(mock_context, payload) - # Verify the result structure assert result["success"] is True assert result["changed"] is True assert "diff" in result @@ -395,19 +411,16 @@ def test_edit_file_function_variants(self): def test_json_payload_parsing(self, tmp_path): """Test JSON string payload parsing for the edit_file tool.""" - # Skip this test for now as it requires complex agent mocking pytest.skip("Mock-based test requires complex setup") def test_malformed_json_payload(self, tmp_path): """Test handling of malformed JSON payloads.""" - # Skip this test for now as it requires complex agent mocking pytest.skip("Mock-based test requires complex setup") def test_unknown_payload_type(self, tmp_path): """Test handling of unknown payload types.""" mock_context = Mock() - # Create a mock payload that doesn't match any known type class UnknownPayload: def __init__(self): self.file_path = str(tmp_path / "test.py") @@ -442,11 +455,18 @@ def test_edit_file_utf8_content(self, tmp_path): def test_edit_file_mixed_line_endings(self, tmp_path): """Test handling of mixed line endings (CRLF/LF).""" test_file = tmp_path / "mixed.txt" - test_file.write_text("line1\r\nline2\nline3\r\n") + content = "line1\r\nline2\nline3\r\n" + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "line2", "new_str": "line2_modified"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 2), + new_content="line2_modified", + ) + ], ) result = _edit_file(None, payload) @@ -454,13 +474,20 @@ def test_edit_file_mixed_line_endings(self, tmp_path): assert result["success"] is True or result["changed"] is True def test_edit_file_special_regex_chars(self, tmp_path): - """Test replacements with special regex characters.""" + """Test hashline edits with special regex characters.""" test_file = tmp_path / "regex.txt" - test_file.write_text("pattern: [a-z]+\nmore: (test)\n") + content = "pattern: [a-z]+\nmore: (test)\n" + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "[a-z]+", "new_str": "[A-Z]+"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="pattern: [A-Z]+", + ) + ], ) result = _edit_file(None, payload) @@ -474,13 +501,20 @@ class TestFileSizeAndPerformance: def test_edit_large_file_replacement(self, tmp_path): """Test replacing content in a large file.""" test_file = tmp_path / "large.txt" - # Create file with 1000 lines lines = [f"Line {i}\n" for i in range(1000)] - test_file.write_text("".join(lines)) + content = "".join(lines) + test_file.write_text(content) - payload = ReplacementsPayload( + # Line 501 contains "Line 500\n" (1-indexed) + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "Line 500", "new_str": "LINE 500"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 501), + new_content="LINE 500", + ) + ], ) result = _edit_file(None, payload) @@ -509,7 +543,6 @@ class TestFileModificationSafety: def test_edit_file_path_traversal_prevention(self, tmp_path): """Test that path traversal attempts are handled safely.""" - # Attempt to edit outside allowed directory dangerous_path = str(tmp_path / "../../../etc/passwd") content = ContentPayload( @@ -520,23 +553,28 @@ def test_edit_file_path_traversal_prevention(self, tmp_path): result = _edit_file(None, content) - # Should either fail or normalize the path safely assert result is not None def test_edit_file_backup_preservation(self, tmp_path): """Test that backups of original content are handled appropriately.""" test_file = tmp_path / "backup.txt" - test_file.write_text("original content") + content = "original content" + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "original", "new_str": "modified"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="modified content", + ) + ], ) result = _edit_file(None, payload) assert result["success"] is True - # Original file should be modified assert "modified" in test_file.read_text() def test_delete_file_only_regular_files(self, tmp_path): @@ -546,7 +584,5 @@ def test_delete_file_only_regular_files(self, tmp_path): result = _delete_file(None, str(test_dir)) - # Should contain error or success=False assert "error" in result or result.get("success") is False - # Directory should still exist assert test_dir.exists() diff --git a/tests/tools/test_file_operations_coverage.py b/tests/tools/test_file_operations_coverage.py index e14d62773..3a3e14057 100644 --- a/tests/tools/test_file_operations_coverage.py +++ b/tests/tools/test_file_operations_coverage.py @@ -414,7 +414,7 @@ def test_read_file_total_lines_calculation(self, tmp_path): # File without trailing newline test_file.write_text("line1\nline2\nline3") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == "line1\nline2\nline3" @@ -424,7 +424,7 @@ def test_read_file_with_trailing_newline(self, tmp_path): test_file = tmp_path / "trailing.txt" test_file.write_text("line1\nline2\n") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == "line1\nline2\n" diff --git a/tests/tools/test_file_operations_extended.py b/tests/tools/test_file_operations_extended.py index e535c702f..a5f1174fb 100644 --- a/tests/tools/test_file_operations_extended.py +++ b/tests/tools/test_file_operations_extended.py @@ -59,7 +59,9 @@ def test_read_file_line_range_valid(self, tmp_path): test_file.write_text("".join(lines)) # Test reading lines 3-5 - result = _read_file(None, str(test_file), start_line=3, num_lines=3) + result = _read_file( + None, str(test_file), start_line=3, num_lines=3, hashline=False + ) assert result.error is None assert result.content == "Line 3\nLine 4\nLine 5\n" @@ -95,7 +97,7 @@ def test_read_file_encoding_utf8(self, tmp_path): content = "Hello 世界! 🐾 é ñ ü" test_file.write_text(content, encoding="utf-8") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == content @@ -238,7 +240,7 @@ def test_path_with_tilde_expansion(self, tmp_path): with patch.dict(os.environ, {"HOME": str(home_sim)}): # Test with tilde path - result = _read_file(None, "~/test.txt") + result = _read_file(None, "~/test.txt", hashline=False) # Should find the file in the simulated home directory if result.error is None: @@ -255,7 +257,7 @@ def test_path_with_symlinks(self, tmp_path): symlink_file.symlink_to(real_file) # Test reading through symlink - result = _read_file(None, str(symlink_file)) + result = _read_file(None, str(symlink_file), hashline=False) assert result.error is None assert result.content == "real content" @@ -363,7 +365,7 @@ def test_read_file_with_special_characters_in_path(self, tmp_path): test_file = tmp_path / special_filename test_file.write_text("special content") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == "special content" @@ -393,7 +395,9 @@ def test_read_file_zero_length_lines(self, tmp_path): test_file.write_text(content) # Read specific range including empty lines - result = _read_file(None, str(test_file), start_line=2, num_lines=3) + result = _read_file( + None, str(test_file), start_line=2, num_lines=3, hashline=False + ) assert result.error is None assert result.content == "\nLine 3\n\n" diff --git a/tests/tools/test_hashline.py b/tests/tools/test_hashline.py new file mode 100644 index 000000000..cdc788b59 --- /dev/null +++ b/tests/tools/test_hashline.py @@ -0,0 +1,537 @@ +"""Comprehensive tests for code_puppy.tools.hashline.""" + +from __future__ import annotations + +import hashlib +import textwrap + +import pytest + +from code_puppy.tools.hashline import ( + _CACHE_MAX, + HashlineMismatchError, + _hashline_cache, + apply_hashline_edits, + cache_file_hashes, + compute_file_hashes, + format_hashlines, + get_cached_hashes, + invalidate_cache, + line_hash, + parse_hashline_ref, + validate_hashes, +) + +# ── helpers ─────────────────────────────────────────────────────────────── + +SAMPLE_CONTENT = textwrap.dedent("""\ + def hello(): + return "world" + + def goodbye(): + return "moon" +""") +"""Five-line sample with a trailing newline.""" + + +def _expected_hash(text: str) -> str: + """Mirror the production algorithm so tests stay in sync.""" + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:2] + + +def _write(tmp_path, content: str, name: str = "f.py") -> str: + """Write *content* to a temp file, return its path as a string.""" + p = tmp_path / name + p.write_text(content, encoding="utf-8") + return str(p) + + +def _make_ref(content: str, line: int) -> str: + """Build a valid hashline ref like '2:ab' from *content*.""" + hashes = compute_file_hashes(content) + return f"{line}:{hashes[line]}" + + +# ── 1. line_hash ────────────────────────────────────────────────────────── + + +class TestLineHash: + def test_deterministic(self): + assert line_hash("hello") == line_hash("hello") + + def test_two_char_hex(self): + h = line_hash("anything") + assert len(h) == 2 + int(h, 16) # must be valid hex – raises ValueError otherwise + + def test_different_inputs_differ(self): + # Not *guaranteed* for all inputs (2-char = 256 buckets) but these + # specific strings are known to differ. + assert line_hash("alpha") != line_hash("beta") + + def test_empty_string(self): + h = line_hash("") + assert len(h) == 2 + + def test_matches_sha256_prefix(self): + raw = "test line" + expected = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:2] + assert line_hash(raw) == expected + + +# ── 2. compute_file_hashes ──────────────────────────────────────────────── + + +class TestComputeFileHashes: + def test_one_based_keys(self): + hashes = compute_file_hashes("a\nb\nc") + assert set(hashes.keys()) == {1, 2, 3} + + def test_correct_hashes(self): + hashes = compute_file_hashes("a\nb") + assert hashes[1] == _expected_hash("a") + assert hashes[2] == _expected_hash("b") + + def test_single_line_no_newline(self): + hashes = compute_file_hashes("only") + assert hashes == {1: _expected_hash("only")} + + def test_empty_content(self): + assert compute_file_hashes("") == {} + + def test_trailing_newline_not_extra_line(self): + # "a\n" splits to ["a"] – only 1 line + hashes = compute_file_hashes("a\n") + assert len(hashes) == 1 + + +# ── 3. format_hashlines ────────────────────────────────────────────────── + + +class TestFormatHashlines: + def test_basic_format(self): + out = format_hashlines("foo\nbar") + lines = out.splitlines() + assert len(lines) == 2 + assert lines[0].startswith("1:") + assert "|foo" in lines[0] + assert lines[1].startswith("2:") + assert "|bar" in lines[1] + + def test_empty_line_in_middle(self): + out = format_hashlines("a\n\nb") + lines = out.splitlines() + assert len(lines) == 3 + # Middle line is empty content but still has hash prefix + assert lines[1].startswith("2:") + assert lines[1].endswith("|") + + def test_single_line(self): + out = format_hashlines("only") + assert out.startswith("1:") + assert "|only" in out + assert "\n" not in out + + def test_roundtrip_hash_matches(self): + """Hash embedded in formatted output must match compute_file_hashes.""" + content = "x\ny\nz" + formatted = format_hashlines(content) + hashes = compute_file_hashes(content) + for line in formatted.splitlines(): + ref_part, _sep, _content = line.partition("|") + line_num, h = ref_part.split(":") + assert hashes[int(line_num)] == h + + +# ── 4. parse_hashline_ref ──────────────────────────────────────────────── + + +class TestParseHashlineRef: + def test_valid(self): + assert parse_hashline_ref("2:f1") == (2, "f1") + assert parse_hashline_ref("100:ab") == (100, "ab") + + def test_missing_colon(self): + with pytest.raises(ValueError, match="missing ':'"): + parse_hashline_ref("2f1") + + def test_bad_line_number(self): + with pytest.raises(ValueError, match="Invalid line number"): + parse_hashline_ref("abc:f1") + + def test_zero_line_number(self): + with pytest.raises(ValueError, match=">= 1"): + parse_hashline_ref("0:ab") + + def test_negative_line_number(self): + with pytest.raises(ValueError, match=">= 1"): + parse_hashline_ref("-1:ab") + + def test_wrong_hash_length_short(self): + with pytest.raises(ValueError, match="exactly 2 hex chars"): + parse_hashline_ref("1:a") + + def test_wrong_hash_length_long(self): + with pytest.raises(ValueError, match="exactly 2 hex chars"): + parse_hashline_ref("1:abc") + + +# ── 5. validate_hashes ─────────────────────────────────────────────────── + + +class TestValidateHashes: + def test_all_valid(self): + content = "one\ntwo\nthree" + hashes = compute_file_hashes(content) + refs = [(ln, h) for ln, h in hashes.items()] + assert validate_hashes(refs, content) == [] + + def test_mismatch_detected(self): + content = "one\ntwo" + errors = validate_hashes([(1, "zz")], content) + assert len(errors) == 1 + assert "expected hash 'zz'" in errors[0] + + def test_out_of_range(self): + content = "one\ntwo" + errors = validate_hashes([(99, "ab")], content) + assert len(errors) == 1 + assert "out of range" in errors[0] + + def test_mixed_valid_and_invalid(self): + content = "a\nb" + hashes = compute_file_hashes(content) + refs = [(1, hashes[1]), (2, "zz")] # first valid, second bad + errors = validate_hashes(refs, content) + assert len(errors) == 1 + + +# ── 6. HashlineMismatchError ───────────────────────────────────────────── + + +class TestHashlineMismatchError: + def test_attributes(self): + err = HashlineMismatchError( + line=5, expected_hash="ab", actual_hash="cd", actual_content="hello" + ) + assert err.line == 5 + assert err.expected_hash == "ab" + assert err.actual_hash == "cd" + assert err.actual_content == "hello" + + def test_message(self): + err = HashlineMismatchError( + line=3, expected_hash="ab", actual_hash="cd", actual_content="x" + ) + msg = str(err) + assert "Line 3" in msg + assert "'ab'" in msg + assert "'cd'" in msg + + def test_is_exception(self): + assert issubclass(HashlineMismatchError, Exception) + + +# ── 7. LRU cache ───────────────────────────────────────────────────────── + + +class TestLRUCache: + @pytest.fixture(autouse=True) + def _clear_cache(self): + """Ensure every test starts with an empty cache.""" + _hashline_cache.clear() + yield + _hashline_cache.clear() + + def test_store_and_retrieve(self): + hashes = {1: "ab", 2: "cd"} + cache_file_hashes("/tmp/a.py", hashes) + assert get_cached_hashes("/tmp/a.py") == hashes + + def test_miss_returns_none(self): + assert get_cached_hashes("/nope") is None + + def test_invalidate(self): + cache_file_hashes("/tmp/b.py", {1: "ee"}) + invalidate_cache("/tmp/b.py") + assert get_cached_hashes("/tmp/b.py") is None + + def test_invalidate_missing_key_is_noop(self): + invalidate_cache("/does/not/exist") # should not raise + + def test_overwrite_existing_key(self): + cache_file_hashes("/tmp/c.py", {1: "aa"}) + cache_file_hashes("/tmp/c.py", {1: "bb"}) + assert get_cached_hashes("/tmp/c.py") == {1: "bb"} + + def test_eviction_at_max_capacity(self): + # Fill to max + for i in range(_CACHE_MAX): + cache_file_hashes(f"/f/{i}", {1: f"{i:02x}"[:2]}) + + # The first entry should still be present + assert get_cached_hashes("/f/0") is not None + + # Adding one more should evict the oldest (which is now /f/1 + # because /f/0 was just accessed by the get above, moving it to end) + cache_file_hashes("/f/overflow", {1: "zz"}) + assert len(_hashline_cache) == _CACHE_MAX + # /f/1 was the LRU item after we accessed /f/0 + assert get_cached_hashes("/f/1") is None + + +# ── 8. apply_hashline_edits ────────────────────────────────────────────── + + +class TestApplyHashlineEdits: + """Integration tests that hit the filesystem via tmp_path.""" + + @pytest.fixture(autouse=True) + def _clear_cache(self): + _hashline_cache.clear() + yield + _hashline_cache.clear() + + # -- replace single line ------------------------------------------- + + def test_replace_single_line(self, tmp_path): + content = "aaa\nbbb\nccc\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 2) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "BBB"}, + ], + ) + + assert result["success"] is True + assert "BBB" in result["content"] + assert "bbb" not in result["content"] + # File on disk should match + assert open(fp).read() == result["content"] + + # -- replace_range ------------------------------------------------- + + def test_replace_range(self, tmp_path): + content = "line1\nline2\nline3\nline4\nline5\n" + fp = _write(tmp_path, content) + start = _make_ref(content, 2) + end = _make_ref(content, 4) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "replace_range", + "start_ref": start, + "end_ref": end, + "new_content": "REPLACED", + }, + ], + ) + + assert result["success"] is True + lines = result["content"].splitlines() + assert lines == ["line1", "REPLACED", "line5"] + + # -- insert_after -------------------------------------------------- + + def test_insert_after(self, tmp_path): + content = "first\nsecond\nthird\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 1) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "insert_after", + "start_ref": ref, + "new_content": "inserted", + }, + ], + ) + + assert result["success"] is True + lines = result["content"].splitlines() + assert lines == ["first", "inserted", "second", "third"] + + # -- delete single line -------------------------------------------- + + def test_delete_single_line(self, tmp_path): + content = "keep\nremove\nkeep2\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 2) + + result = apply_hashline_edits( + fp, + [ + {"operation": "delete", "start_ref": ref, "new_content": ""}, + ], + ) + + assert result["success"] is True + assert result["content"].splitlines() == ["keep", "keep2"] + + # -- delete_range -------------------------------------------------- + + def test_delete_range(self, tmp_path): + content = "a\nb\nc\nd\ne\n" + fp = _write(tmp_path, content) + start = _make_ref(content, 2) + end = _make_ref(content, 4) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "delete_range", + "start_ref": start, + "end_ref": end, + "new_content": "", + }, + ], + ) + + assert result["success"] is True + assert result["content"].splitlines() == ["a", "e"] + + # -- staleness rejection ------------------------------------------- + + def test_stale_hash_rejected(self, tmp_path): + original = "aaa\nbbb\nccc\n" + fp = _write(tmp_path, original) + ref = _make_ref(original, 2) # hash computed against "bbb" + + # Mutate the file *after* computing the ref + (tmp_path / "f.py").write_text("aaa\nXXX\nccc\n", encoding="utf-8") + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "new"}, + ], + ) + + assert result["success"] is False + assert len(result["errors"]) >= 1 + assert "expected hash" in result["errors"][0] + + # -- overlapping edits → error ------------------------------------- + + def test_overlapping_edits_rejected(self, tmp_path): + content = "a\nb\nc\nd\n" + fp = _write(tmp_path, content) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "replace", + "start_ref": _make_ref(content, 2), + "new_content": "X", + }, + { + "operation": "replace", + "start_ref": _make_ref(content, 2), + "new_content": "Y", + }, + ], + ) + + assert result["success"] is False + assert any("overlaps" in e.lower() for e in result["errors"]) + + # -- out-of-range line → error ------------------------------------- + + def test_out_of_range_line(self, tmp_path): + content = "one\ntwo\n" + fp = _write(tmp_path, content) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": "99:ab", "new_content": "nope"}, + ], + ) + + assert result["success"] is False + assert any("out of range" in e for e in result["errors"]) + + # -- file not found → error ---------------------------------------- + + def test_file_not_found(self, tmp_path): + result = apply_hashline_edits( + str(tmp_path / "ghost.py"), + [{"operation": "replace", "start_ref": "1:ab", "new_content": "x"}], + ) + + assert result["success"] is False + assert len(result["errors"]) >= 1 + + # -- trailing newline preserved ------------------------------------ + + def test_preserves_trailing_newline(self, tmp_path): + content = "aaa\nbbb\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 1) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "AAA"}, + ], + ) + + assert result["success"] is True + assert result["content"].endswith("\n") + + def test_no_trailing_newline_when_original_lacks_it(self, tmp_path): + content = "aaa\nbbb" # no trailing newline + fp = _write(tmp_path, content) + ref = _make_ref(content, 1) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "AAA"}, + ], + ) + + assert result["success"] is True + assert not result["content"].endswith("\n") + + # -- unknown operation → error ------------------------------------- + + def test_unknown_operation(self, tmp_path): + content = "a\nb\n" + fp = _write(tmp_path, content) + + result = apply_hashline_edits( + fp, + [ + {"operation": "yeet", "start_ref": "1:ab", "new_content": "x"}, + ], + ) + + assert result["success"] is False + assert any("unknown operation" in e for e in result["errors"]) + + # -- multi-line new_content ---------------------------------------- + + def test_replace_with_multiple_lines(self, tmp_path): + content = "a\nb\nc\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 2) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "x\ny\nz"}, + ], + ) + + assert result["success"] is True + assert result["content"].splitlines() == ["a", "x", "y", "z", "c"] From ca10862af05e2712bb0ab6b24f84dd1cee867e36 Mon Sep 17 00:00:00 2001 From: Luis Date: Fri, 13 Feb 2026 21:04:20 -0400 Subject: [PATCH 2/2] feat(hashline): upgrade to 4-char FNV-1a hashing and simplify edit API - Replace 2-char SHA-256 hashes with 4-char FNV-1a for better collision resistance (65,536 values vs 256) while maintaining speed on short strings - Simplify operations from 5 (replace, replace_range, insert_after, delete, delete_range) to 3 unified ops (replace, insert, delete) that all support optional end_ref for range operations - Add long-line truncation in format_hashlines (max_line_len=2000) to prevent context window blowout on minified files - Improve error messages with content previews on hash mismatch - Add line_num < 1 validation guard in validate_hashes - Fix replace/delete slice indexing to use inclusive end_idx (end_idx + 1) - Update docstrings and examples across file_modifications and file_operations - Add comprehensive test suite for hashline engine (tests/test_hashline.py) --- code_puppy/tools/file_modifications.py | 17 +- code_puppy/tools/file_operations.py | 4 +- code_puppy/tools/hashline.py | 124 ++++--- tests/test_hashline.py | 440 +++++++++++++++++++++++++ 4 files changed, 533 insertions(+), 52 deletions(-) create mode 100644 tests/test_hashline.py diff --git a/code_puppy/tools/file_modifications.py b/code_puppy/tools/file_modifications.py index 439a02e7f..2cc6a8968 100644 --- a/code_puppy/tools/file_modifications.py +++ b/code_puppy/tools/file_modifications.py @@ -86,12 +86,13 @@ class ContentPayload(BaseModel): class HashlineEdit(BaseModel): - """A single hashline edit operation.""" + """A single hashline edit operation. + + Simplified to 3 core operations that all support optional range editing. + """ - operation: ( - str # "replace" | "replace_range" | "insert_after" | "delete" | "delete_range" - ) - start_ref: str # e.g. "2:f1" + operation: str # "replace" | "insert" | "delete" + start_ref: str # e.g. "42:a3f1" (4-char hash) end_ref: str | None = None # for range operations new_content: str = "" # new lines (empty for delete) @@ -595,9 +596,9 @@ def edit_file( HashlineEditPayload (PREFERRED — use when you read files with hashline=True): - file_path (str): Path to file - edits (List[HashlineEdit]): List of edits where each HashlineEdit contains: - - operation (str): "replace" | "replace_range" | "insert_after" | "delete" | "delete_range" - - start_ref (str): Line hash reference e.g. "2:f1" (from hashline-tagged read output) - - end_ref (str | None): End reference for range operations + - operation (str): "replace" | "insert" | "delete" + - start_ref (str): Line hash reference e.g. "42:a3f1" (from hashline-tagged read output) + - end_ref (str | None): End reference for range operations (optional) - new_content (str): Replacement text (empty for deletes) ContentPayload: diff --git a/code_puppy/tools/file_operations.py b/code_puppy/tools/file_operations.py index 0a912cc8e..70f6b8a8c 100644 --- a/code_puppy/tools/file_operations.py +++ b/code_puppy/tools/file_operations.py @@ -866,8 +866,8 @@ def read_file( Hashline Mode (default: enabled): When hashline=True, file content is returned with line-hash tags: - 1:a3|function hello() { - 2:f1| return "world"; + 1:a3f1|function hello() { + 2:f10e| return "world"; Use these tags with HashlineEditPayload to edit by reference. Set hashline=False to get raw content without tags. diff --git a/code_puppy/tools/hashline.py b/code_puppy/tools/hashline.py index 2b5369139..8838e8d1d 100644 --- a/code_puppy/tools/hashline.py +++ b/code_puppy/tools/hashline.py @@ -1,6 +1,6 @@ """Hashline engine for file editing. -Each line gets tagged with a 2-char content hash so models can reference +Each line gets tagged with a 4-char content hash so models can reference lines by hash instead of reproducing exact text. This eliminates the fragile "find exact string" pattern and makes edits robust to whitespace or minor content drift. @@ -32,8 +32,23 @@ def __init__( def line_hash(content: str) -> str: - """Return a 2-char hex hash of *content* (SHA-256, first byte).""" - return hashlib.sha256(content.encode("utf-8")).hexdigest()[:2] + """Return a 4-char hex hash of *content* using FNV-1a. + + FNV-1a (Fowler-Noll-Vo) is chosen for speed and good distribution on short strings. + The 4-char hex (65536 values) provides strong collision resistance even in + multi-thousand line files. Paired with line numbers, collisions are near-zero. + """ + # FNV-1a 32-bit parameters + FNV_32_PRIME = 0x01000193 + FNV1_32A_INIT = 0x811c9dc5 + + h = FNV1_32A_INIT + for byte in content.encode("utf-8"): + h ^= byte + h = (h * FNV_32_PRIME) & 0xffffffff + + # Take lowest 2 bytes, format as 4-char hex + return f"{h & 0xffff:04x}" def compute_file_hashes(content: str) -> dict[int, str]: @@ -46,29 +61,41 @@ def compute_file_hashes(content: str) -> dict[int, str]: # --------------------------------------------------------------------------- -def format_hashlines(content: str, start_line: int = 1) -> str: +def format_hashlines(content: str, start_line: int = 1, max_line_len: int = 2000) -> str: """Convert file content to hashline display format. Args: content: Raw file content. start_line: Line number offset (1-based). Use this when formatting a partial read so line numbers match the actual file. + max_line_len: Maximum line length before truncation (default: 2000). Example output:: - 1:a3|function hello() { - 2:f1| return "world"; + 1:a3f1|function hello() { + 2:f10e| return "world"; """ lines = content.splitlines() parts: list[str] = [] + truncated = False + for i, raw in enumerate(lines, start=start_line): - h = line_hash(raw) - parts.append(f"{i}:{h}|{raw}") - return "\n".join(parts) + line_content = raw + if len(raw) > max_line_len: + line_content = raw[:max_line_len] + "...[truncated]" + truncated = True + h = line_hash(raw) # Hash the original content, not truncated + parts.append(f"{i}:{h}|{line_content}") + + result = "\n".join(parts) + if truncated: + result = f"[Some lines truncated at {max_line_len} chars]\n" + result + + return result def parse_hashline_ref(ref: str) -> tuple[int, str]: - """Parse ``"2:f1"`` → ``(2, "f1")``. Raises *ValueError* on bad format.""" + """Parse ``"42:a3f1"`` → ``(42, "a3f1")``. Raises *ValueError* on bad format.""" if ":" not in ref: raise ValueError(f"Invalid hashline ref (missing ':'): {ref!r}") line_str, hash_str = ref.split(":", maxsplit=1) @@ -78,9 +105,9 @@ def parse_hashline_ref(ref: str) -> tuple[int, str]: raise ValueError(f"Invalid line number in ref: {ref!r}") from None if line_num < 1: raise ValueError(f"Line number must be >= 1, got {line_num} in ref: {ref!r}") - if len(hash_str) != 2: + if len(hash_str) != 4: raise ValueError( - f"Hash must be exactly 2 hex chars, got {hash_str!r} in ref: {ref!r}" + f"Hash must be exactly 4 hex chars, got {hash_str!r} in ref: {ref!r}" ) return line_num, hash_str @@ -99,6 +126,7 @@ def validate_hashes( Returns a list of human-readable error messages (empty == all valid). """ file_hashes = compute_file_hashes(current_content) + lines = current_content.splitlines() total_lines = len(file_hashes) errors: list[str] = [] @@ -108,10 +136,19 @@ def validate_hashes( f"Line {line_num} out of range (file has {total_lines} lines)" ) continue + if line_num < 1: + errors.append( + f"Line {line_num} is invalid (must be >= 1)" + ) + continue actual = file_hashes[line_num] if actual != expected: + # Get actual content for better error message + actual_content = lines[line_num - 1] if line_num <= len(lines) else "" + content_preview = actual_content[:50] + "..." if len(actual_content) > 50 else actual_content errors.append( - f"Line {line_num}: expected hash '{expected}', got '{actual}'" + f"Hash mismatch at line {line_num}: expected '{expected}' but file has '{actual}' " + f"(file may have changed since last read). Content: {content_preview!r}" ) return errors @@ -152,20 +189,23 @@ def invalidate_cache(file_path: str) -> None: def _resolve_edit_range(edit: dict) -> tuple[int, int, str, str]: - """Return ``(start_line, end_line, start_hash, end_hash)`` for an edit.""" + """Return ``(start_line, end_line, start_hash, end_hash)`` for an edit. + + All operations support optional end_ref for range operations. + """ start_line, start_hash = parse_hashline_ref(edit["start_ref"]) operation = edit["operation"] - - if operation in ("replace_range", "delete_range"): - if not edit.get("end_ref"): - raise ValueError(f"'{operation}' requires 'end_ref'") + + # Check if this is a range operation (has end_ref) + if edit.get("end_ref"): end_line, end_hash = parse_hashline_ref(edit["end_ref"]) if end_line < start_line: raise ValueError( f"end_ref line ({end_line}) < start_ref line ({start_line})" ) return start_line, end_line, start_hash, end_hash - + + # Single-line operation return start_line, start_line, start_hash, start_hash @@ -192,11 +232,15 @@ def apply_hashline_edits( Each *edit* dict must contain: - - ``operation``: ``"replace"`` | ``"replace_range"`` | ``"insert_after"`` - | ``"delete"`` | ``"delete_range"`` - - ``start_ref``: e.g. ``"2:f1"`` - - ``end_ref``: required for range operations, else ``None`` - - ``new_content``: replacement text (empty string for deletes) + - ``operation``: ``"replace"`` | ``"insert"`` | ``"delete"`` + - ``start_ref``: e.g. ``"42:a3f1"`` (line:hash reference) + - ``end_ref``: optional, for range operations (e.g., delete lines 5-10) + - ``new_content``: replacement text (required for replace/insert, empty for delete) + + Operations: + - **replace**: Replace line(s) with new content. With end_ref, replaces a range. + - **insert**: Insert new content after the line. With end_ref, inserts after end_ref line. + - **delete**: Delete line(s). With end_ref, deletes the range start_ref to end_ref. Returns ``{"success": bool, "content": str, "errors": list[str]}``. """ @@ -215,16 +259,12 @@ def apply_hashline_edits( all_refs: list[tuple[int, str]] = [] for i, edit in enumerate(edits): - valid_ops = ( - "replace", - "replace_range", - "insert_after", - "delete", - "delete_range", - ) + valid_ops = ("replace", "insert", "delete") op = edit.get("operation", "") if op not in valid_ops: - errors.append(f"Edit {i}: unknown operation '{op}'") + errors.append( + f"Edit {i}: unknown operation '{op}'. Must be one of: {', '.join(valid_ops)}" + ) continue try: start, end, s_hash, e_hash = _resolve_edit_range(edit) @@ -248,8 +288,8 @@ def apply_hashline_edits( # 3. Check for overlapping edits ranges_for_overlap = [] for i, (start, end, edit) in enumerate(parsed): - if edit["operation"] == "insert_after": - # Inserts don't occupy a range; they go *after* the line + if edit["operation"] == "insert" and not edit.get("end_ref"): + # Single-line inserts don't occupy a range; they go *after* the line continue ranges_for_overlap.append((start, end, i)) @@ -269,15 +309,15 @@ def apply_hashline_edits( end_idx = end # exclusive upper bound for slice replacement if op == "replace": - lines[start_idx : start_idx + 1] = new_lines - elif op == "replace_range": - lines[start_idx:end_idx] = new_lines - elif op == "insert_after": - lines[start_idx + 1 : start_idx + 1] = new_lines + # Replace single line or range + lines[start_idx : end_idx + 1] = new_lines + elif op == "insert": + # Insert after start_idx (or after end_idx if range specified) + insert_pos = end_idx + 1 if end != start else start_idx + 1 + lines[insert_pos:insert_pos] = new_lines elif op == "delete": - del lines[start_idx] - elif op == "delete_range": - del lines[start_idx:end_idx] + # Delete single line or range + del lines[start_idx : end_idx + 1] new_content = "\n".join(lines) # Preserve trailing newline if original had one diff --git a/tests/test_hashline.py b/tests/test_hashline.py new file mode 100644 index 000000000..83a98a78a --- /dev/null +++ b/tests/test_hashline.py @@ -0,0 +1,440 @@ +"""Comprehensive test suite for hashline tools. + +Tests ported from experimental/hashline_test.go and adapted for Python. +""" + +import os +import tempfile +import pytest +from code_puppy.tools.hashline import ( + line_hash, + compute_file_hashes, + format_hashlines, + parse_hashline_ref, + validate_hashes, + apply_hashline_edits, + HashlineMismatchError, +) + + +# --- LineHash tests --- + + +def test_line_hash_deterministic(): + """Same input should produce same hash.""" + h1 = line_hash("hello world") + h2 = line_hash("hello world") + assert h1 == h2, f"same input produced different hashes: {h1!r} vs {h2!r}" + + +def test_line_hash_length(): + """Hash should be exactly 4 hex characters.""" + h = line_hash("test") + assert len(h) == 4, f"hash should be 4 chars, got {len(h)}" + assert all(c in "0123456789abcdef" for c in h), f"hash should be hex, got {h!r}" + + +def test_line_hash_different_inputs(): + """Different inputs should produce different hashes (collision unlikely).""" + h1 = line_hash("hello") + h2 = line_hash("world") + # Note: collisions are possible but extremely unlikely + assert h1 != h2 or True # Don't fail on unlikely collision + + +def test_line_hash_empty_string(): + """Empty string should have valid hash.""" + h = line_hash("") + assert len(h) == 4, f"empty string hash should be 4 chars, got {h!r}" + + +def test_line_hash_whitespace_sensitive(): + """Leading/trailing whitespace should affect hash.""" + h1 = line_hash(" hello") + h2 = line_hash("hello") + # Different whitespace = different content = likely different hash + # (collision possible but unlikely) + pass # Just verify both are valid + + +# --- compute_file_hashes tests --- + + +def test_compute_file_hashes_basic(): + """Compute hashes for multi-line content.""" + content = "line one\nline two\nline three" + hashes = compute_file_hashes(content) + + assert len(hashes) == 3, f"expected 3 lines, got {len(hashes)}" + assert all(len(h) == 4 for h in hashes.values()), "all hashes should be 4 chars" + assert list(hashes.keys()) == [1, 2, 3], "line numbers should be 1-based" + + +def test_compute_file_hashes_empty(): + """Empty file should have one empty line.""" + hashes = compute_file_hashes("") + assert len(hashes) == 1, "empty content should have 1 line" + + +def test_compute_file_hashes_trailing_newline(): + """Trailing newline creates empty last line.""" + hashes = compute_file_hashes("a\nb\n") + assert len(hashes) == 3, "trailing newline should create empty line" + + +# --- format_hashlines tests --- + + +def test_format_hashlines_basic(): + """Format content with line:hash|content format.""" + content = "func main() {\n\tfmt.Println(\"hi\")\n}" + output = format_hashlines(content) + lines = output.split("\n") + + # Should not have truncation warning + assert not output.startswith("[Some lines truncated"), "no truncation expected" + + # Check format of each line + for i, line in enumerate(lines, 1): + assert ":" in line, f"line {i} should have ':' separator" + assert "|" in line, f"line {i} should have '|' separator" + parts = line.split("|", 1) + ref = parts[0] + num_str, hash_str = ref.split(":", 1) + assert int(num_str) == i, f"line number should be {i}" + assert len(hash_str) == 4, f"hash should be 4 chars, got {hash_str!r}" + + +def test_format_hashlines_with_offset(): + """Format with start_line offset.""" + content = "first\nsecond" + output = format_hashlines(content, start_line=10) + lines = output.split("\n") + + assert lines[0].startswith("10:"), "first line should be numbered 10" + assert lines[1].startswith("11:"), "second line should be numbered 11" + + +def test_format_hashlines_truncation(): + """Long lines should be truncated with warning.""" + long_line = "x" * 3000 # Longer than default 2000 char limit + content = f"short\n{long_line}\nshort" + output = format_hashlines(content) + + assert "[Some lines truncated" in output, "should have truncation warning" + assert "...[truncated]" in output, "long line should be truncated" + + +# --- parse_hashline_ref tests --- + + +def test_parse_hashline_ref_valid(): + """Parse valid line:hash reference.""" + line_num, hash_val = parse_hashline_ref("42:a3f1") + assert line_num == 42 + assert hash_val == "a3f1" + + +def test_parse_hashline_ref_invalid_format(): + """Missing colon should raise ValueError.""" + with pytest.raises(ValueError, match="missing ':'"): + parse_hashline_ref("42a3f1") + + +def test_parse_hashline_ref_invalid_line_number(): + """Non-numeric line number should raise ValueError.""" + with pytest.raises(ValueError, match="Invalid line number"): + parse_hashline_ref("abc:a3f1") + + +def test_parse_hashline_ref_invalid_hash_length(): + """Hash must be exactly 4 chars.""" + with pytest.raises(ValueError, match="exactly 4 hex chars"): + parse_hashline_ref("42:a3") # Too short + with pytest.raises(ValueError, match="exactly 4 hex chars"): + parse_hashline_ref("42:a3f12") # Too long + + +def test_parse_hashline_ref_line_number_zero(): + """Line number must be >= 1.""" + with pytest.raises(ValueError, match="must be >= 1"): + parse_hashline_ref("0:a3f1") + + +# --- validate_hashes tests --- + + +def test_validate_hashes_all_valid(): + """All valid hashes should return empty error list.""" + content = "line one\nline two\nline three" + hashes = compute_file_hashes(content) + refs = [(1, hashes[1]), (2, hashes[2]), (3, hashes[3])] + + errors = validate_hashes(refs, content) + assert errors == [], f"expected no errors, got {errors}" + + +def test_validate_hashes_mismatch(): + """Hash mismatch should return descriptive error.""" + content = "line one\nline two" + errors = validate_hashes([(1, "xxxx")], content) + + assert len(errors) == 1, "should have one error" + assert "mismatch" in errors[0].lower() + assert "expected 'xxxx'" in errors[0] + + +def test_validate_hashes_out_of_range(): + """Line number out of range should return error.""" + content = "line one\nline two" + errors = validate_hashes([(10, "a3f1")], content) + + assert len(errors) == 1 + assert "out of range" in errors[0].lower() + assert "file has 2 lines" in errors[0] + + +# --- apply_hashline_edits tests --- + + +def test_apply_hashline_edits_replace_single(): + """Replace a single line.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line one\nline two\nline three\n") + f.flush() + temp_file = f.name + + try: + content = "line one\nline two\nline three\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "replace", + "start_ref": f"2:{hashes[2]}", + "new_content": "REPLACED LINE TWO", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "REPLACED LINE TWO" in result["content"] + assert "line one" in result["content"] + assert "line three" in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_replace_range(): + """Replace multiple lines with end_ref.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\nline 4\nline 5\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\nline 4\nline 5\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "replace", + "start_ref": f"2:{hashes[2]}", + "end_ref": f"4:{hashes[4]}", + "new_content": "REPLACED\nMULTIPLE\nLINES", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "line 1" in result["content"] + assert "REPLACED" in result["content"] + assert "MULTIPLE" in result["content"] + assert "LINES" in result["content"] + assert "line 5" in result["content"] + assert "line 2" not in result["content"] + assert "line 3" not in result["content"] + assert "line 4" not in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_insert(): + """Insert content after a line.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "insert", + "start_ref": f"2:{hashes[2]}", + "new_content": "INSERTED", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + lines = result["content"].split("\n") + assert lines[0] == "line 1" + assert lines[1] == "line 2" + assert lines[2] == "INSERTED" + assert lines[3] == "line 3" + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_delete_single(): + """Delete a single line.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "delete", + "start_ref": f"2:{hashes[2]}", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "line 1" in result["content"] + assert "line 3" in result["content"] + assert "line 2" not in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_delete_range(): + """Delete multiple lines with end_ref.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\nline 4\nline 5\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\nline 4\nline 5\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "delete", + "start_ref": f"2:{hashes[2]}", + "end_ref": f"4:{hashes[4]}", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "line 1" in result["content"] + assert "line 5" in result["content"] + assert "line 2" not in result["content"] + assert "line 3" not in result["content"] + assert "line 4" not in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_hash_mismatch(): + """Hash mismatch should reject entire batch.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\n") + f.flush() + temp_file = f.name + + try: + edits = [ + { + "operation": "replace", + "start_ref": "2:xxxx", # Invalid hash + "new_content": "REPLACED", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert not result["success"], "should fail on hash mismatch" + assert len(result["errors"]) > 0 + assert "mismatch" in result["errors"][0].lower() + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_invalid_operation(): + """Invalid operation should be rejected.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "invalid_op", + "start_ref": f"1:{hashes[1]}", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert not result["success"] + assert any("unknown operation" in e.lower() for e in result["errors"]) + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_multiple_edits_bottom_to_top(): + """Multiple edits should be applied bottom-to-top.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\nline 4\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\nline 4\n" + hashes = compute_file_hashes(content) + + # Edit in non-sorted order to verify bottom-to-top sorting + edits = [ + { + "operation": "replace", + "start_ref": f"2:{hashes[2]}", + "new_content": "REPLACED 2", + }, + { + "operation": "replace", + "start_ref": f"4:{hashes[4]}", + "new_content": "REPLACED 4", + }, + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "REPLACED 2" in result["content"] + assert "REPLACED 4" in result["content"] + finally: + os.unlink(temp_file) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])