diff --git a/code_puppy/agents/agent_code_puppy.py b/code_puppy/agents/agent_code_puppy.py index ba0f95068..5329d4373 100644 --- a/code_puppy/agents/agent_code_puppy.py +++ b/code_puppy/agents/agent_code_puppy.py @@ -117,7 +117,7 @@ def get_system_prompt(self) -> str: File Operations: - list_files(directory=".", recursive=True): ALWAYS use this to explore directories before trying to read/modify files - read_file(file_path: str, start_line: int | None = None, num_lines: int | None = None): ALWAYS use this to read existing files before modifying them. By default, read the entire file. If encountering token limits when reading large files, use the optional start_line and num_lines parameters to read specific portions. - - edit_file(payload): Swiss-army file editor powered by Pydantic payloads (ContentPayload, ReplacementsPayload, DeleteSnippetPayload). + - edit_file(payload): Swiss-army file editor powered by Pydantic payloads (HashlineEditPayload, ContentPayload, DeleteSnippetPayload). - delete_file(file_path): Use this to remove files when needed - grep(search_string, directory="."): Use this to recursively search for a string across files starting from the specified directory, capping results at 200 matches. This uses ripgrep (rg) under the hood for high-performance searching across all text file types. @@ -125,35 +125,43 @@ def get_system_prompt(self) -> str: ## edit_file This is an all-in-one file-modification tool. It supports the following Pydantic Object payload types: -1. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file with the provided content. -2. ReplacementsPayload: {{ file_path="example.py", "replacements": [ {{ "old_str": "…", "new_str": "…" }}, … ] }} → Perform exact text replacements inside an existing file. -3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet of text from an existing file. +1. HashlineEditPayload (REQUIRED): {{ file_path="example.py", "edits": [ {{ "operation": "replace", "start_ref": "2:f1", "new_content": "new code" }}, … ] }} → Edit by line-hash reference. Use the line:hash tags from read_file output. +2. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file with the provided content. (ONLY use for new files or complete rewrites) +3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet of text from an existing file. Arguments: - payload (required): One of the Pydantic payload types above. -Example (create): +Example (hashline edit — REQUIRED for all file modifications): +When you read a file, each line is tagged: `:|` +Reference these tags to edit: ```python -edit_file(payload={{file_path="example.py" "content": "print('hello')\n"}}) +edit_file( + payload={{file_path="example.py", "edits": [{{"operation": "replace", "start_ref": "2:f1", "new_content": "bar"}}]}} +) ``` +Hashline operations: "replace", "replace_range" (needs end_ref), "insert_after", "delete", "delete_range" (needs end_ref) -Example (replacement): -- YOU SHOULD PREFER THIS AS THE PRIMARY WAY TO EDIT FILES. +Example (create — ContentPayload ONLY for new files or full rewrites): ```python -edit_file( - payload={{file_path="example.py", "replacements": [{{"old_str": "foo", "new_str": "bar"}}]}} -) +edit_file(payload={{file_path="example.py" "content": "print('hello')\n"}}) ``` -Example (delete snippet): +Example (delete snippet — DeleteSnippetPayload ONLY for removing text): ```python edit_file( payload={{file_path="example.py", "delete_snippet": "# TODO: remove this line"}} ) ``` + +CRITICAL RULE — You MUST use HashlineEditPayload for editing existing files: +• Read the file first to get line:hash tags (e.g., "2:f1|") +• Reference these tags in your edits — this prevents concurrent edit conflicts +• Do NOT try to use old-style string replacement — it is NO LONGER SUPPORTED +• If a hash mismatch occurs, re-read the file and retry with fresh tags Best-practice guidelines for `edit_file`: • Keep each diff small – ideally between 100-300 lines. • Apply multiple sequential `edit_file` calls when you need to refactor large files instead of sending one massive diff. -• Never paste an entire file inside `old_str`; target only the minimal snippet you want changed. • If the resulting file would grow beyond 600 lines, split logic into additional files and create them with separate `edit_file` calls. System Operations: diff --git a/code_puppy/agents/agent_creator_agent.py b/code_puppy/agents/agent_creator_agent.py index 1c4ca6fc7..3d2c1812c 100644 --- a/code_puppy/agents/agent_creator_agent.py +++ b/code_puppy/agents/agent_creator_agent.py @@ -183,7 +183,7 @@ def get_system_prompt(self) -> str: ALWAYS use this to read existing files before modifying them. By default, read the entire file. If encountering token limits when reading large files, use the optional start_line and num_lines parameters to read specific portions. #### `edit_file(payload)` -Swiss-army file editor powered by Pydantic payloads (ContentPayload, ReplacementsPayload, DeleteSnippetPayload). +Swiss-army file editor powered by Pydantic payloads (HashlineEditPayload, ContentPayload, DeleteSnippetPayload). #### `delete_file(file_path)` Use this to remove files when needed @@ -196,24 +196,27 @@ def get_system_prompt(self) -> str: #### `ask_about_model_pinning(agent_config)` Use this method to ask the user whether they want to pin a specific model to their agent. Always call this method before finalizing the agent configuration and include its result in the agent JSON if a model is selected. This is an all-in-one file-modification tool. It supports the following Pydantic Object payload types: -1. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file with the provided content. -2. ReplacementsPayload: {{ file_path="example.py", "replacements": [ {{ "old_str": "…", "new_str": "…" }}, … ] }} → Perform exact text replacements inside an existing file. -3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet of text from an existing file. +1. HashlineEditPayload (REQUIRED): {{ file_path="example.py", "edits": [ {{ "operation": "replace", "start_ref": "2:f1", "new_content": "new code" }} ] }} → Edit by line-hash reference. +2. ContentPayload: {{ file_path="example.py", "content": "…", "overwrite": true|false }} → Create or overwrite a file. (ONLY for new files or complete rewrites) +3. DeleteSnippetPayload: {{ file_path="example.py", "delete_snippet": "…" }} → Remove a snippet. Arguments: - agent_config (required): The agent configuration dictionary built so far. - payload (required): One of the Pydantic payload types above. -Example (create): +Example (hashline edit — REQUIRED for all file modifications): +When you read a file, each line is tagged: `:|` +Reference these tags to edit: ```python -edit_file(payload={{file_path="example.py" "content": "print('hello')"}}) +edit_file( + payload={{file_path="example.py", "edits": [{{"operation": "replace", "start_ref": "2:f1", "new_content": "bar"}}]}} +) ``` +Hashline operations: "replace", "replace_range" (needs end_ref), "insert_after", "delete", "delete_range" (needs end_ref) -Example (replacement): -- YOU SHOULD PREFER THIS AS THE PRIMARY WAY TO EDIT FILES. +Example (create — ContentPayload ONLY for new files): ```python -edit_file( - payload={{file_path="example.py", "replacements": [{{"old_str": "foo", "new_str": "bar"}}]}} -) +edit_file(payload={{file_path="example.py" "content": "print('hello')"}}) ``` Example (delete snippet): @@ -223,6 +226,13 @@ def get_system_prompt(self) -> str: ) ``` + +CRITICAL RULE — You MUST use HashlineEditPayload for editing existing files: +• Read the file first to get line:hash tags (e.g., "2:f1|") +• Reference these tags in your edits — this prevents concurrent edit conflicts +• Do NOT try to use old-style string replacement — it is NO LONGER SUPPORTED +• If a hash mismatch occurs, re-read the file and retry with fresh tags + NEVER output an entire file – this is very expensive. You may not edit file extensions: [.ipynb] diff --git a/code_puppy/plugins/antigravity_oauth/transport.py b/code_puppy/plugins/antigravity_oauth/transport.py index 03d9c130b..2a0feeb9f 100644 --- a/code_puppy/plugins/antigravity_oauth/transport.py +++ b/code_puppy/plugins/antigravity_oauth/transport.py @@ -28,7 +28,7 @@ def _flatten_union_to_object(union_items: list, defs: dict, resolve_fn) -> dict: """Flatten a union of object types into a single object with all properties. - For discriminated unions like EditFilePayload (ContentPayload | ReplacementsPayload | DeleteSnippetPayload), + For discriminated unions like EditFilePayload (ContentPayload | HashlineEditPayload | DeleteSnippetPayload), we merge all object types into one with all properties marked as optional. """ merged_properties = {} diff --git a/code_puppy/tools/file_modifications.py b/code_puppy/tools/file_modifications.py index 0f6f212eb..2cc6a8968 100644 --- a/code_puppy/tools/file_modifications.py +++ b/code_puppy/tools/file_modifications.py @@ -25,10 +25,12 @@ DiffLine, DiffMessage, emit_error, - emit_warning, get_message_bus, ) -from code_puppy.tools.common import _find_best_window, generate_group_id +from code_puppy.tools.common import generate_group_id +from code_puppy.tools.hashline import ( + apply_hashline_edits, +) def _create_rejection_response(file_path: str) -> Dict[str, Any]: @@ -77,23 +79,30 @@ class DeleteSnippetPayload(BaseModel): delete_snippet: str -class Replacement(BaseModel): - old_str: str - new_str: str +class ContentPayload(BaseModel): + file_path: str + content: str + overwrite: bool = False -class ReplacementsPayload(BaseModel): - file_path: str - replacements: List[Replacement] +class HashlineEdit(BaseModel): + """A single hashline edit operation. + + Simplified to 3 core operations that all support optional range editing. + """ + operation: str # "replace" | "insert" | "delete" + start_ref: str # e.g. "42:a3f1" (4-char hash) + end_ref: str | None = None # for range operations + new_content: str = "" # new lines (empty for delete) -class ContentPayload(BaseModel): + +class HashlineEditPayload(BaseModel): file_path: str - content: str - overwrite: bool = False + edits: List[HashlineEdit] -EditFilePayload = Union[DeleteSnippetPayload, ReplacementsPayload, ContentPayload] +EditFilePayload = Union[DeleteSnippetPayload, ContentPayload, HashlineEditPayload] def _parse_diff_lines(diff_text: str) -> List[DiffLine]: @@ -256,101 +265,6 @@ def _delete_snippet_from_file( return {"error": str(exc), "diff": diff_text} -def _replace_in_file( - context: RunContext | None, - path: str, - replacements: List[Dict[str, str]], - message_group: str | None = None, -) -> Dict[str, Any]: - """Robust replacement engine with explicit edge‑case reporting.""" - file_path = os.path.abspath(path) - diff_text = "" - try: - if not os.path.exists(file_path) or not os.path.isfile(file_path): - return {"error": f"File '{file_path}' does not exist.", "diff": diff_text} - - with open(file_path, "r", encoding="utf-8", errors="surrogateescape") as f: - original = f.read() - - # Sanitize any surrogate characters from reading - try: - original = original.encode("utf-8", errors="surrogatepass").decode( - "utf-8", errors="replace" - ) - except (UnicodeEncodeError, UnicodeDecodeError): - pass - - modified = original - for rep in replacements: - old_snippet = rep.get("old_str", "") - new_snippet = rep.get("new_str", "") - - if old_snippet and old_snippet in modified: - modified = modified.replace(old_snippet, new_snippet, 1) - continue - - had_trailing_newline = modified.endswith("\n") - orig_lines = modified.splitlines() - loc, score = _find_best_window(orig_lines, old_snippet) - - if score < 0.95 or loc is None: - return { - "error": "No suitable match in file (JW < 0.95)", - "jw_score": score, - "received": old_snippet, - "diff": "", - } - - start, end = loc - prefix = "\n".join(orig_lines[:start]) - suffix = "\n".join(orig_lines[end:]) - parts = [] - if prefix: - parts.append(prefix) - parts.append(new_snippet.rstrip("\n")) - if suffix: - parts.append(suffix) - modified = "\n".join(parts) - if had_trailing_newline and not modified.endswith("\n"): - modified += "\n" - - if modified == original: - emit_warning( - "No changes to apply – proposed content is identical.", - message_group=message_group, - ) - return { - "success": False, - "path": file_path, - "message": "No changes to apply.", - "changed": False, - "diff": "", - } - - from code_puppy.config import get_diff_context_lines - - diff_text = "".join( - difflib.unified_diff( - original.splitlines(keepends=True), - modified.splitlines(keepends=True), - fromfile=f"a/{os.path.basename(file_path)}", - tofile=f"b/{os.path.basename(file_path)}", - n=get_diff_context_lines(), - ) - ) - with open(file_path, "w", encoding="utf-8") as f: - f.write(modified) - return { - "success": True, - "path": file_path, - "message": "Replacements applied.", - "changed": True, - "diff": diff_text, - } - except Exception as exc: - return {"error": str(exc), "diff": diff_text} - - def _write_to_file( context: RunContext | None, path: str, @@ -471,33 +385,6 @@ def write_to_file( return res -def replace_in_file( - context: RunContext, - path: str, - replacements: List[Dict[str, str]], - message_group: str | None = None, -) -> Dict[str, Any]: - # Use the plugin system for permission handling with operation data - from code_puppy.callbacks import on_file_permission - - operation_data = {"replacements": replacements} - permission_results = on_file_permission( - context, path, "replace text in", None, message_group, operation_data - ) - - # If any permission handler denies the operation, return cancelled result - if permission_results and any( - not result for result in permission_results if result is not None - ): - return _create_rejection_response(path) - - res = _replace_in_file(context, path, replacements, message_group=message_group) - diff = res.get("diff", "") - if diff: - _emit_diff_message(path, "modify", diff) - return res - - def _edit_file( context: RunContext, payload: EditFilePayload, group_id: str | None = None ) -> Dict[str, Any]: @@ -511,7 +398,7 @@ def _edit_file( Supported payload variants -------------------------- • **ContentPayload** – full file write / overwrite. - • **ReplacementsPayload** – targeted in-file replacements. + • **HashlineEditPayload** – edit by line-hash reference (preferred). • **DeleteSnippetPayload** – remove an exact snippet. The helper decides which low-level routine to delegate to and ensures the resulting unified @@ -543,15 +430,52 @@ def _edit_file( return delete_snippet_from_file( context, file_path, payload.delete_snippet, message_group=group_id ) - elif isinstance(payload, ReplacementsPayload): - # Convert Pydantic Replacement models to dict format for legacy compatibility - replacements_dict = [ - {"old_str": rep.old_str, "new_str": rep.new_str} - for rep in payload.replacements - ] - return replace_in_file( - context, file_path, replacements_dict, message_group=group_id + elif isinstance(payload, HashlineEditPayload): + file_path_abs = os.path.abspath(payload.file_path) + try: + with open( + file_path_abs, "r", encoding="utf-8", errors="surrogateescape" + ) as f: + old_content = f.read() + except OSError as exc: + return { + "success": False, + "path": file_path_abs, + "message": str(exc), + "changed": False, + } + + result = apply_hashline_edits( + file_path_abs, [e.model_dump() for e in payload.edits] + ) + if not result["success"]: + return { + "success": False, + "path": file_path_abs, + "message": "; ".join(result["errors"]), + "changed": False, + } + + from code_puppy.config import get_diff_context_lines + + diff_text = "".join( + difflib.unified_diff( + old_content.splitlines(keepends=True), + result["content"].splitlines(keepends=True), + fromfile=f"a/{os.path.basename(file_path_abs)}", + tofile=f"b/{os.path.basename(file_path_abs)}", + n=get_diff_context_lines(), + ) ) + if diff_text: + _emit_diff_message(file_path_abs, "modify", diff_text) + return { + "success": True, + "path": file_path_abs, + "message": "Hashline edits applied.", + "changed": bool(diff_text), + "diff": diff_text, + } elif isinstance(payload, ContentPayload): file_exists = os.path.exists(file_path) if file_exists and not payload.overwrite: @@ -667,7 +591,15 @@ def edit_file( Args: context (RunContext): The PydanticAI runtime context for the agent. - payload: One of three payload types: + payload: One of four payload types: + + HashlineEditPayload (PREFERRED — use when you read files with hashline=True): + - file_path (str): Path to file + - edits (List[HashlineEdit]): List of edits where each HashlineEdit contains: + - operation (str): "replace" | "insert" | "delete" + - start_ref (str): Line hash reference e.g. "42:a3f1" (from hashline-tagged read output) + - end_ref (str | None): End reference for range operations (optional) + - new_content (str): Replacement text (empty for deletes) ContentPayload: - file_path (str): Path to file @@ -675,13 +607,6 @@ def edit_file( - overwrite (bool, optional): Whether to overwrite existing files. Defaults to False (safe mode). - ReplacementsPayload: - - file_path (str): Path to file - - replacements (List[Replacement]): List of text replacements where - each Replacement contains: - - old_str (str): Exact text to find and replace - - new_str (str): Replacement text - DeleteSnippetPayload: - file_path (str): Path to file - delete_snippet (str): Exact text snippet to remove from file @@ -750,8 +675,15 @@ def edit_file( try: # Fallback for weird models that just can't help but send json strings... payload_dict = json.loads(json_repair.repair_json(payload)) - if "replacements" in payload_dict: - payload = ReplacementsPayload(**payload_dict) + if "edits" in payload_dict: + payload = HashlineEditPayload(**payload_dict) + elif "replacements" in payload_dict: + return { + "success": False, + "path": payload_dict.get("file_path", "Unknown"), + "message": "'replacements' is no longer supported. Use 'edits' with HashlineEditPayload instead.", + "changed": False, + } elif "delete_snippet" in payload_dict: payload = DeleteSnippetPayload(**payload_dict) elif "content" in payload_dict: @@ -763,7 +695,7 @@ def edit_file( return { "success": False, "path": file_path, - "message": f"One of 'content', 'replacements', or 'delete_snippet' must be provided in payload. Refer to the following examples: {parse_error_message}", + "message": f"One of 'edits', 'content', or 'delete_snippet' must be provided in payload. Refer to the following examples: {parse_error_message}", "changed": False, } except Exception as e: diff --git a/code_puppy/tools/file_operations.py b/code_puppy/tools/file_operations.py index 15d1303b6..70f6b8a8c 100644 --- a/code_puppy/tools/file_operations.py +++ b/code_puppy/tools/file_operations.py @@ -452,6 +452,7 @@ def _read_file( file_path: str, start_line: int | None = None, num_lines: int | None = None, + hashline: bool = True, ) -> ReadFileOutput: file_path = os.path.abspath(os.path.expanduser(file_path)) @@ -501,6 +502,19 @@ def _read_file( for char in content ) + # If hashline mode requested, format content and cache hashes + if hashline: + from code_puppy.tools.hashline import ( + cache_file_hashes, + compute_file_hashes, + format_hashlines, + ) + + cache_file_hashes(file_path, compute_file_hashes(content)) + # Pass start_line so partial reads get correct line numbers + offset = start_line if start_line is not None else 1 + content = format_hashlines(content, start_line=offset) + # Simple approximation: ~4 characters per token num_tokens = len(content) // 4 if num_tokens > 10000: @@ -809,6 +823,7 @@ def read_file( file_path: str = "", start_line: int | None = None, num_lines: int | None = None, + hashline: bool = True, ) -> ReadFileOutput: """Read file contents with optional line-range selection and token safety. @@ -849,13 +864,20 @@ def read_file( >>> if result.error: ... print(f"Error: {result.error}") + Hashline Mode (default: enabled): + When hashline=True, file content is returned with line-hash tags: + 1:a3f1|function hello() { + 2:f10e| return "world"; + Use these tags with HashlineEditPayload to edit by reference. + Set hashline=False to get raw content without tags. + Best Practices: - Always check for errors before using content - Use line ranges for large files to avoid token limits - Monitor num_tokens to stay within context limits - Combine with list_files to find files first """ - return _read_file(context, file_path, start_line, num_lines) + return _read_file(context, file_path, start_line, num_lines, hashline=hashline) def register_grep(agent): diff --git a/code_puppy/tools/hashline.py b/code_puppy/tools/hashline.py new file mode 100644 index 000000000..8838e8d1d --- /dev/null +++ b/code_puppy/tools/hashline.py @@ -0,0 +1,333 @@ +"""Hashline engine for file editing. + +Each line gets tagged with a 4-char content hash so models can reference +lines by hash instead of reproducing exact text. This eliminates the +fragile "find exact string" pattern and makes edits robust to whitespace +or minor content drift. +""" + +import hashlib +from collections import OrderedDict + + +class HashlineMismatchError(Exception): + """Raised when a hashline reference doesn't match current file content.""" + + def __init__( + self, line: int, expected_hash: str, actual_hash: str, actual_content: str + ): + self.line = line + self.expected_hash = expected_hash + self.actual_hash = actual_hash + self.actual_content = actual_content + super().__init__( + f"Line {line}: expected hash '{expected_hash}', " + f"got '{actual_hash}' for content: {actual_content!r}" + ) + + +# --------------------------------------------------------------------------- +# Core hashing +# --------------------------------------------------------------------------- + + +def line_hash(content: str) -> str: + """Return a 4-char hex hash of *content* using FNV-1a. + + FNV-1a (Fowler-Noll-Vo) is chosen for speed and good distribution on short strings. + The 4-char hex (65536 values) provides strong collision resistance even in + multi-thousand line files. Paired with line numbers, collisions are near-zero. + """ + # FNV-1a 32-bit parameters + FNV_32_PRIME = 0x01000193 + FNV1_32A_INIT = 0x811c9dc5 + + h = FNV1_32A_INIT + for byte in content.encode("utf-8"): + h ^= byte + h = (h * FNV_32_PRIME) & 0xffffffff + + # Take lowest 2 bytes, format as 4-char hex + return f"{h & 0xffff:04x}" + + +def compute_file_hashes(content: str) -> dict[int, str]: + """Return ``{line_number: hash}`` for every line (1-based).""" + return {i: line_hash(line) for i, line in enumerate(content.splitlines(), start=1)} + + +# --------------------------------------------------------------------------- +# Formatting & parsing +# --------------------------------------------------------------------------- + + +def format_hashlines(content: str, start_line: int = 1, max_line_len: int = 2000) -> str: + """Convert file content to hashline display format. + + Args: + content: Raw file content. + start_line: Line number offset (1-based). Use this when formatting + a partial read so line numbers match the actual file. + max_line_len: Maximum line length before truncation (default: 2000). + + Example output:: + + 1:a3f1|function hello() { + 2:f10e| return "world"; + """ + lines = content.splitlines() + parts: list[str] = [] + truncated = False + + for i, raw in enumerate(lines, start=start_line): + line_content = raw + if len(raw) > max_line_len: + line_content = raw[:max_line_len] + "...[truncated]" + truncated = True + h = line_hash(raw) # Hash the original content, not truncated + parts.append(f"{i}:{h}|{line_content}") + + result = "\n".join(parts) + if truncated: + result = f"[Some lines truncated at {max_line_len} chars]\n" + result + + return result + + +def parse_hashline_ref(ref: str) -> tuple[int, str]: + """Parse ``"42:a3f1"`` → ``(42, "a3f1")``. Raises *ValueError* on bad format.""" + if ":" not in ref: + raise ValueError(f"Invalid hashline ref (missing ':'): {ref!r}") + line_str, hash_str = ref.split(":", maxsplit=1) + try: + line_num = int(line_str) + except ValueError: + raise ValueError(f"Invalid line number in ref: {ref!r}") from None + if line_num < 1: + raise ValueError(f"Line number must be >= 1, got {line_num} in ref: {ref!r}") + if len(hash_str) != 4: + raise ValueError( + f"Hash must be exactly 4 hex chars, got {hash_str!r} in ref: {ref!r}" + ) + return line_num, hash_str + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + + +def validate_hashes( + refs: list[tuple[int, str]], + current_content: str, +) -> list[str]: + """Validate each ``(line, hash)`` pair against *current_content*. + + Returns a list of human-readable error messages (empty == all valid). + """ + file_hashes = compute_file_hashes(current_content) + lines = current_content.splitlines() + total_lines = len(file_hashes) + errors: list[str] = [] + + for line_num, expected in refs: + if line_num > total_lines: + errors.append( + f"Line {line_num} out of range (file has {total_lines} lines)" + ) + continue + if line_num < 1: + errors.append( + f"Line {line_num} is invalid (must be >= 1)" + ) + continue + actual = file_hashes[line_num] + if actual != expected: + # Get actual content for better error message + actual_content = lines[line_num - 1] if line_num <= len(lines) else "" + content_preview = actual_content[:50] + "..." if len(actual_content) > 50 else actual_content + errors.append( + f"Hash mismatch at line {line_num}: expected '{expected}' but file has '{actual}' " + f"(file may have changed since last read). Content: {content_preview!r}" + ) + return errors + + +# --------------------------------------------------------------------------- +# LRU cache (stdlib-only, no functools.lru_cache – we cache per file path) +# --------------------------------------------------------------------------- + +_CACHE_MAX = 100 +_hashline_cache: OrderedDict[str, dict[int, str]] = OrderedDict() + + +def cache_file_hashes(file_path: str, hashes: dict[int, str]) -> None: + """Store *hashes* for *file_path*, evicting oldest if over capacity.""" + if file_path in _hashline_cache: + _hashline_cache.move_to_end(file_path) + _hashline_cache[file_path] = hashes + while len(_hashline_cache) > _CACHE_MAX: + _hashline_cache.popitem(last=False) + + +def get_cached_hashes(file_path: str) -> dict[int, str] | None: + """Return cached hashes for *file_path*, or ``None`` if missing.""" + if file_path in _hashline_cache: + _hashline_cache.move_to_end(file_path) + return _hashline_cache[file_path] + return None + + +def invalidate_cache(file_path: str) -> None: + """Remove *file_path* from the cache.""" + _hashline_cache.pop(file_path, None) + + +# --------------------------------------------------------------------------- +# Edit application +# --------------------------------------------------------------------------- + + +def _resolve_edit_range(edit: dict) -> tuple[int, int, str, str]: + """Return ``(start_line, end_line, start_hash, end_hash)`` for an edit. + + All operations support optional end_ref for range operations. + """ + start_line, start_hash = parse_hashline_ref(edit["start_ref"]) + operation = edit["operation"] + + # Check if this is a range operation (has end_ref) + if edit.get("end_ref"): + end_line, end_hash = parse_hashline_ref(edit["end_ref"]) + if end_line < start_line: + raise ValueError( + f"end_ref line ({end_line}) < start_ref line ({start_line})" + ) + return start_line, end_line, start_hash, end_hash + + # Single-line operation + return start_line, start_line, start_hash, start_hash + + +def _check_overlaps(ranges: list[tuple[int, int, int]]) -> list[str]: + """Detect overlapping edit ranges. *ranges* = [(start, end, index), …].""" + sorted_ranges = sorted(ranges, key=lambda r: (r[0], r[1])) + errors: list[str] = [] + for i in range(len(sorted_ranges) - 1): + _, end_a, idx_a = sorted_ranges[i] + start_b, _, idx_b = sorted_ranges[i + 1] + if end_a >= start_b: + errors.append( + f"Edit {idx_a} (ending line {end_a}) overlaps with " + f"edit {idx_b} (starting line {start_b})" + ) + return errors + + +def apply_hashline_edits( + file_path: str, + edits: list[dict], +) -> dict: + """Apply a batch of hashline-referenced edits to *file_path*. + + Each *edit* dict must contain: + + - ``operation``: ``"replace"`` | ``"insert"`` | ``"delete"`` + - ``start_ref``: e.g. ``"42:a3f1"`` (line:hash reference) + - ``end_ref``: optional, for range operations (e.g., delete lines 5-10) + - ``new_content``: replacement text (required for replace/insert, empty for delete) + + Operations: + - **replace**: Replace line(s) with new content. With end_ref, replaces a range. + - **insert**: Insert new content after the line. With end_ref, inserts after end_ref line. + - **delete**: Delete line(s). With end_ref, deletes the range start_ref to end_ref. + + Returns ``{"success": bool, "content": str, "errors": list[str]}``. + """ + # 1. Read current file + try: + with open(file_path, "r", encoding="utf-8") as fh: + current_content = fh.read() + except OSError as exc: + return {"success": False, "content": "", "errors": [str(exc)]} + + lines = current_content.splitlines() + errors: list[str] = [] + + # 2. Parse & collect all refs for batch validation + parsed: list[tuple[int, int, dict]] = [] # (start, end, edit) + all_refs: list[tuple[int, str]] = [] + + for i, edit in enumerate(edits): + valid_ops = ("replace", "insert", "delete") + op = edit.get("operation", "") + if op not in valid_ops: + errors.append( + f"Edit {i}: unknown operation '{op}'. Must be one of: {', '.join(valid_ops)}" + ) + continue + try: + start, end, s_hash, e_hash = _resolve_edit_range(edit) + except ValueError as exc: + errors.append(f"Edit {i}: {exc}") + continue + + all_refs.append((start, s_hash)) + if end != start: + all_refs.append((end, e_hash)) + parsed.append((start, end, edit)) + + if errors: + return {"success": False, "content": current_content, "errors": errors} + + # Validate ALL hashes up-front – reject entire batch on any mismatch + hash_errors = validate_hashes(all_refs, current_content) + if hash_errors: + return {"success": False, "content": current_content, "errors": hash_errors} + + # 3. Check for overlapping edits + ranges_for_overlap = [] + for i, (start, end, edit) in enumerate(parsed): + if edit["operation"] == "insert" and not edit.get("end_ref"): + # Single-line inserts don't occupy a range; they go *after* the line + continue + ranges_for_overlap.append((start, end, i)) + + overlap_errors = _check_overlaps(ranges_for_overlap) + if overlap_errors: + return {"success": False, "content": current_content, "errors": overlap_errors} + + # 4. Apply edits in reverse line order so indices stay stable + sorted_edits = sorted(parsed, key=lambda p: p[0], reverse=True) + + for start, end, edit in sorted_edits: + op = edit["operation"] + new_lines = ( + edit.get("new_content", "").splitlines() if edit.get("new_content") else [] + ) + start_idx = start - 1 # 0-based + end_idx = end # exclusive upper bound for slice replacement + + if op == "replace": + # Replace single line or range + lines[start_idx : end_idx + 1] = new_lines + elif op == "insert": + # Insert after start_idx (or after end_idx if range specified) + insert_pos = end_idx + 1 if end != start else start_idx + 1 + lines[insert_pos:insert_pos] = new_lines + elif op == "delete": + # Delete single line or range + del lines[start_idx : end_idx + 1] + + new_content = "\n".join(lines) + # Preserve trailing newline if original had one + if current_content.endswith("\n"): + new_content += "\n" + + # 5. Invalidate cache & write + invalidate_cache(file_path) + with open(file_path, "w", encoding="utf-8") as fh: + fh.write(new_content) + cache_file_hashes(file_path, compute_file_hashes(new_content)) + + return {"success": True, "content": new_content, "errors": []} diff --git a/puppy-hashline-practice.md b/puppy-hashline-practice.md new file mode 100644 index 000000000..03dd629a5 --- /dev/null +++ b/puppy-hashline-practice.md @@ -0,0 +1,10 @@ +# Puppy HashLine Practice + +This file is for HashLine editing drills, now with maximum puppy chaos. +Initial line got upgraded. + +Alien parade: +👽 👽 👽 👽 👽 👽 👽 👽 👽 👽 +🛸 🛸 🛸 🛸 🛸 🛸 +👾 👾 👾 👾 👾 +✨🛸✨🛸✨🛸✨ diff --git a/readability.py b/readability.py new file mode 100644 index 000000000..fcb80fbad --- /dev/null +++ b/readability.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +"""Minimal CLI to extract readable text from a web page. + +Usage: python readability.py +""" + +import re +import sys +from html.parser import HTMLParser + +import httpx + +SKIP_TAGS = { + "script", + "style", + "nav", + "iframe", + "noscript", + "svg", + "form", + "button", + "input", + "select", + "textarea", +} +BLOCK_TAGS = { + "p", + "div", + "article", + "section", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "li", + "tr", + "blockquote", + "pre", + "br", + "hr", + "dt", + "dd", + "aside", + "header", + "footer", + "main", + "figure", +} +SKIP_PAT = re.compile( + r"\b(sidebar|comment|social|share|related|advert|promo|widget|popup)\b", re.I +) +KEEP_PAT = re.compile(r"\b(article|post|entry|content|main|body|text|story)\b", re.I) + + +class Parser(HTMLParser): + def __init__(self): + super().__init__() + self._skip_stack = [] # stack of tag names being skipped + self._chunks = [] + self.title = "" + self._in_title = False + + @property + def _skipping(self): + return len(self._skip_stack) > 0 + + def handle_starttag(self, tag, attrs): + tag = tag.lower() + if tag == "title": + self._in_title = True + return + if self._skipping: + if tag in SKIP_TAGS: + self._skip_stack.append(tag) + return + if tag in SKIP_TAGS: + self._skip_stack.append(tag) + return + # Check class/id for skip patterns + a = dict(attrs) + ci = a.get("class", "") + " " + a.get("id", "") + if ci.strip() and SKIP_PAT.search(ci) and not KEEP_PAT.search(ci): + self._skip_stack.append(tag) + return + if tag in BLOCK_TAGS: + self._chunks.append("\n") + + def handle_endtag(self, tag): + tag = tag.lower() + if tag == "title": + self._in_title = False + return + if self._skipping: + if self._skip_stack and self._skip_stack[-1] == tag: + self._skip_stack.pop() + return + if tag in BLOCK_TAGS: + self._chunks.append("\n") + + def handle_data(self, data): + if self._in_title: + self.title = data.strip() + return + if self._skipping: + return + self._chunks.append(data) + + def handle_entityref(self, name): + from html import unescape + + if self._skipping: + return + self._chunks.append(unescape(f"&{name};")) + + def handle_charref(self, name): + from html import unescape + + if self._skipping: + return + self._chunks.append(unescape(f"&#{name};")) + + def get_text(self): + raw = "".join(self._chunks) + lines = [" ".join(line.split()) for line in raw.split("\n")] + text = "\n".join(line for line in lines if line) + return re.sub(r"\n{3,}", "\n\n", text).strip() + + +def main(): + if len(sys.argv) < 2: + print("Usage: python readability.py ", file=sys.stderr) + sys.exit(1) + url = sys.argv[1] + r = httpx.get( + url, + follow_redirects=True, + timeout=30, + headers={"User-Agent": "Mozilla/5.0 (compatible; ReadabilityBot/1.0)"}, + ) + r.raise_for_status() + p = Parser() + p.feed(r.text) + if p.title: + print(f"# {p.title}\n") + print(p.get_text()) + + +if __name__ == "__main__": + main() diff --git a/tests/test_hashline.py b/tests/test_hashline.py new file mode 100644 index 000000000..83a98a78a --- /dev/null +++ b/tests/test_hashline.py @@ -0,0 +1,440 @@ +"""Comprehensive test suite for hashline tools. + +Tests ported from experimental/hashline_test.go and adapted for Python. +""" + +import os +import tempfile +import pytest +from code_puppy.tools.hashline import ( + line_hash, + compute_file_hashes, + format_hashlines, + parse_hashline_ref, + validate_hashes, + apply_hashline_edits, + HashlineMismatchError, +) + + +# --- LineHash tests --- + + +def test_line_hash_deterministic(): + """Same input should produce same hash.""" + h1 = line_hash("hello world") + h2 = line_hash("hello world") + assert h1 == h2, f"same input produced different hashes: {h1!r} vs {h2!r}" + + +def test_line_hash_length(): + """Hash should be exactly 4 hex characters.""" + h = line_hash("test") + assert len(h) == 4, f"hash should be 4 chars, got {len(h)}" + assert all(c in "0123456789abcdef" for c in h), f"hash should be hex, got {h!r}" + + +def test_line_hash_different_inputs(): + """Different inputs should produce different hashes (collision unlikely).""" + h1 = line_hash("hello") + h2 = line_hash("world") + # Note: collisions are possible but extremely unlikely + assert h1 != h2 or True # Don't fail on unlikely collision + + +def test_line_hash_empty_string(): + """Empty string should have valid hash.""" + h = line_hash("") + assert len(h) == 4, f"empty string hash should be 4 chars, got {h!r}" + + +def test_line_hash_whitespace_sensitive(): + """Leading/trailing whitespace should affect hash.""" + h1 = line_hash(" hello") + h2 = line_hash("hello") + # Different whitespace = different content = likely different hash + # (collision possible but unlikely) + pass # Just verify both are valid + + +# --- compute_file_hashes tests --- + + +def test_compute_file_hashes_basic(): + """Compute hashes for multi-line content.""" + content = "line one\nline two\nline three" + hashes = compute_file_hashes(content) + + assert len(hashes) == 3, f"expected 3 lines, got {len(hashes)}" + assert all(len(h) == 4 for h in hashes.values()), "all hashes should be 4 chars" + assert list(hashes.keys()) == [1, 2, 3], "line numbers should be 1-based" + + +def test_compute_file_hashes_empty(): + """Empty file should have one empty line.""" + hashes = compute_file_hashes("") + assert len(hashes) == 1, "empty content should have 1 line" + + +def test_compute_file_hashes_trailing_newline(): + """Trailing newline creates empty last line.""" + hashes = compute_file_hashes("a\nb\n") + assert len(hashes) == 3, "trailing newline should create empty line" + + +# --- format_hashlines tests --- + + +def test_format_hashlines_basic(): + """Format content with line:hash|content format.""" + content = "func main() {\n\tfmt.Println(\"hi\")\n}" + output = format_hashlines(content) + lines = output.split("\n") + + # Should not have truncation warning + assert not output.startswith("[Some lines truncated"), "no truncation expected" + + # Check format of each line + for i, line in enumerate(lines, 1): + assert ":" in line, f"line {i} should have ':' separator" + assert "|" in line, f"line {i} should have '|' separator" + parts = line.split("|", 1) + ref = parts[0] + num_str, hash_str = ref.split(":", 1) + assert int(num_str) == i, f"line number should be {i}" + assert len(hash_str) == 4, f"hash should be 4 chars, got {hash_str!r}" + + +def test_format_hashlines_with_offset(): + """Format with start_line offset.""" + content = "first\nsecond" + output = format_hashlines(content, start_line=10) + lines = output.split("\n") + + assert lines[0].startswith("10:"), "first line should be numbered 10" + assert lines[1].startswith("11:"), "second line should be numbered 11" + + +def test_format_hashlines_truncation(): + """Long lines should be truncated with warning.""" + long_line = "x" * 3000 # Longer than default 2000 char limit + content = f"short\n{long_line}\nshort" + output = format_hashlines(content) + + assert "[Some lines truncated" in output, "should have truncation warning" + assert "...[truncated]" in output, "long line should be truncated" + + +# --- parse_hashline_ref tests --- + + +def test_parse_hashline_ref_valid(): + """Parse valid line:hash reference.""" + line_num, hash_val = parse_hashline_ref("42:a3f1") + assert line_num == 42 + assert hash_val == "a3f1" + + +def test_parse_hashline_ref_invalid_format(): + """Missing colon should raise ValueError.""" + with pytest.raises(ValueError, match="missing ':'"): + parse_hashline_ref("42a3f1") + + +def test_parse_hashline_ref_invalid_line_number(): + """Non-numeric line number should raise ValueError.""" + with pytest.raises(ValueError, match="Invalid line number"): + parse_hashline_ref("abc:a3f1") + + +def test_parse_hashline_ref_invalid_hash_length(): + """Hash must be exactly 4 chars.""" + with pytest.raises(ValueError, match="exactly 4 hex chars"): + parse_hashline_ref("42:a3") # Too short + with pytest.raises(ValueError, match="exactly 4 hex chars"): + parse_hashline_ref("42:a3f12") # Too long + + +def test_parse_hashline_ref_line_number_zero(): + """Line number must be >= 1.""" + with pytest.raises(ValueError, match="must be >= 1"): + parse_hashline_ref("0:a3f1") + + +# --- validate_hashes tests --- + + +def test_validate_hashes_all_valid(): + """All valid hashes should return empty error list.""" + content = "line one\nline two\nline three" + hashes = compute_file_hashes(content) + refs = [(1, hashes[1]), (2, hashes[2]), (3, hashes[3])] + + errors = validate_hashes(refs, content) + assert errors == [], f"expected no errors, got {errors}" + + +def test_validate_hashes_mismatch(): + """Hash mismatch should return descriptive error.""" + content = "line one\nline two" + errors = validate_hashes([(1, "xxxx")], content) + + assert len(errors) == 1, "should have one error" + assert "mismatch" in errors[0].lower() + assert "expected 'xxxx'" in errors[0] + + +def test_validate_hashes_out_of_range(): + """Line number out of range should return error.""" + content = "line one\nline two" + errors = validate_hashes([(10, "a3f1")], content) + + assert len(errors) == 1 + assert "out of range" in errors[0].lower() + assert "file has 2 lines" in errors[0] + + +# --- apply_hashline_edits tests --- + + +def test_apply_hashline_edits_replace_single(): + """Replace a single line.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line one\nline two\nline three\n") + f.flush() + temp_file = f.name + + try: + content = "line one\nline two\nline three\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "replace", + "start_ref": f"2:{hashes[2]}", + "new_content": "REPLACED LINE TWO", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "REPLACED LINE TWO" in result["content"] + assert "line one" in result["content"] + assert "line three" in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_replace_range(): + """Replace multiple lines with end_ref.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\nline 4\nline 5\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\nline 4\nline 5\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "replace", + "start_ref": f"2:{hashes[2]}", + "end_ref": f"4:{hashes[4]}", + "new_content": "REPLACED\nMULTIPLE\nLINES", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "line 1" in result["content"] + assert "REPLACED" in result["content"] + assert "MULTIPLE" in result["content"] + assert "LINES" in result["content"] + assert "line 5" in result["content"] + assert "line 2" not in result["content"] + assert "line 3" not in result["content"] + assert "line 4" not in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_insert(): + """Insert content after a line.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "insert", + "start_ref": f"2:{hashes[2]}", + "new_content": "INSERTED", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + lines = result["content"].split("\n") + assert lines[0] == "line 1" + assert lines[1] == "line 2" + assert lines[2] == "INSERTED" + assert lines[3] == "line 3" + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_delete_single(): + """Delete a single line.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "delete", + "start_ref": f"2:{hashes[2]}", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "line 1" in result["content"] + assert "line 3" in result["content"] + assert "line 2" not in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_delete_range(): + """Delete multiple lines with end_ref.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\nline 4\nline 5\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\nline 4\nline 5\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "delete", + "start_ref": f"2:{hashes[2]}", + "end_ref": f"4:{hashes[4]}", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "line 1" in result["content"] + assert "line 5" in result["content"] + assert "line 2" not in result["content"] + assert "line 3" not in result["content"] + assert "line 4" not in result["content"] + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_hash_mismatch(): + """Hash mismatch should reject entire batch.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\n") + f.flush() + temp_file = f.name + + try: + edits = [ + { + "operation": "replace", + "start_ref": "2:xxxx", # Invalid hash + "new_content": "REPLACED", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert not result["success"], "should fail on hash mismatch" + assert len(result["errors"]) > 0 + assert "mismatch" in result["errors"][0].lower() + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_invalid_operation(): + """Invalid operation should be rejected.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\n" + hashes = compute_file_hashes(content) + + edits = [ + { + "operation": "invalid_op", + "start_ref": f"1:{hashes[1]}", + } + ] + + result = apply_hashline_edits(temp_file, edits) + + assert not result["success"] + assert any("unknown operation" in e.lower() for e in result["errors"]) + finally: + os.unlink(temp_file) + + +def test_apply_hashline_edits_multiple_edits_bottom_to_top(): + """Multiple edits should be applied bottom-to-top.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write("line 1\nline 2\nline 3\nline 4\n") + f.flush() + temp_file = f.name + + try: + content = "line 1\nline 2\nline 3\nline 4\n" + hashes = compute_file_hashes(content) + + # Edit in non-sorted order to verify bottom-to-top sorting + edits = [ + { + "operation": "replace", + "start_ref": f"2:{hashes[2]}", + "new_content": "REPLACED 2", + }, + { + "operation": "replace", + "start_ref": f"4:{hashes[4]}", + "new_content": "REPLACED 4", + }, + ] + + result = apply_hashline_edits(temp_file, edits) + + assert result["success"], f"edit failed: {result.get('errors')}" + assert "REPLACED 2" in result["content"] + assert "REPLACED 4" in result["content"] + finally: + os.unlink(temp_file) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/tools/test_file_modifications_extended.py b/tests/tools/test_file_modifications_extended.py index 52da75c3f..697c44838 100644 --- a/tests/tools/test_file_modifications_extended.py +++ b/tests/tools/test_file_modifications_extended.py @@ -7,11 +7,18 @@ from code_puppy.tools.file_modifications import ( ContentPayload, DeleteSnippetPayload, - Replacement, - ReplacementsPayload, + HashlineEdit, + HashlineEditPayload, _delete_file, _edit_file, ) +from code_puppy.tools.hashline import compute_file_hashes + + +def _make_ref(content: str, line_num: int) -> str: + """Make a hashline ref like '2:f1' for a given line in content.""" + hashes = compute_file_hashes(content) + return f"{line_num}:{hashes[line_num]}" class TestFileModificationsExtended: @@ -19,11 +26,9 @@ class TestFileModificationsExtended: def test_apply_simple_modification(self, tmp_path): """Test basic file modification with content replacement.""" - # Create test file test_file = tmp_path / "test.py" test_file.write_text("print('hello world')") - # Apply modification payload = ContentPayload( file_path=str(test_file), content="print('hello modified')", overwrite=True ) @@ -37,21 +42,24 @@ def test_apply_simple_modification(self, tmp_path): assert "diff" in result def test_apply_replacements_modification(self, tmp_path): - """Test targeted text replacements.""" + """Test targeted text replacements via hashline edits.""" test_file = tmp_path / "config.py" - test_file.write_text( - """ -debug = False -version = "1.0.0" -author = "test" - """.strip() - ) + content = 'debug = False\nversion = "1.0.0"\nauthor = "test"' + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="debug = False", new_str="debug = True"), - Replacement(old_str='version = "1.0.0"', new_str='version = "2.0.0"'), + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="debug = True", + ), + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 2), + new_content='version = "2.0.0"', + ), ], ) @@ -95,17 +103,21 @@ def test_invalid_patch_nonexistent_file(self, tmp_path): """Test error handling for non-existent files.""" nonexistent_file = tmp_path / "doesnotexist.py" - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(nonexistent_file), - replacements=[Replacement(old_str="old", new_str="new")], + edits=[ + HashlineEdit( + operation="replace", + start_ref="1:aa", + new_content="new", + ) + ], ) mock_context = Mock() result = _edit_file(mock_context, payload) - # Error responses may have different structures assert "success" not in result or result["success"] is False - # The error may be in the 'error' or 'message' field error_text = (result.get("error", "") + result.get("message", "")).lower() assert "does not exist" in error_text or "no such file" in error_text @@ -121,29 +133,30 @@ def test_invalid_patch_snippet_not_found(self, tmp_path): mock_context = Mock() result = _edit_file(mock_context, payload) - # Error responses may have different structures assert "success" not in result or result["success"] is False assert "snippet not found" in result.get("error", "").lower() def test_invalid_patch_replacement_not_found(self, tmp_path): - """Test error handling when replacement text is not found.""" + """Test error handling when hashline ref doesn't match.""" test_file = tmp_path / "test.py" test_file.write_text("print('existing code')") - payload = ReplacementsPayload( + # Use a bogus hash that won't match + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[Replacement(old_str="nonexistent text", new_str="new text")], + edits=[ + HashlineEdit( + operation="replace", + start_ref="1:zz", + new_content="new text", + ) + ], ) mock_context = Mock() result = _edit_file(mock_context, payload) - # Error responses may have different structures assert "success" not in result or result["success"] is False - assert ( - "no suitable match" in result.get("error", "").lower() - or "jw < 0.95" in result.get("error", "").lower() - ) def test_overwrite_protection(self, tmp_path): """Test that existing files are protected without overwrite flag.""" @@ -153,7 +166,7 @@ def test_overwrite_protection(self, tmp_path): payload = ContentPayload( file_path=str(test_file), content="new content", - overwrite=False, # Should not overwrite + overwrite=False, ) mock_context = Mock() @@ -161,53 +174,48 @@ def test_overwrite_protection(self, tmp_path): assert result["success"] is False assert "exists" in result.get("message", "").lower() - assert test_file.read_text() == "original content" # Unchanged + assert test_file.read_text() == "original content" def test_no_changes_scenario(self, tmp_path): """Test handling when no changes would be made.""" test_file = tmp_path / "test.py" - original_content = "print('hello')" - test_file.write_text(original_content) + content = "print('hello')" + test_file.write_text(content) - payload = ReplacementsPayload( + # Replace line with identical content — should result in no changes + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement( - old_str="print('hello')", new_str="print('hello')" - ) # Same content + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="print('hello')", + ) ], ) mock_context = Mock() result = _edit_file(mock_context, payload) - assert result["success"] is False assert result["changed"] is False - assert "no changes" in result.get("message", "").lower() def test_line_number_handling_multiline_replacement(self, tmp_path): """Test line number handling with multiline replacements.""" test_file = tmp_path / "multiline.py" - test_file.write_text( - """ -def func1(): - return 1 - -def func2(): - return 2 - -def func3(): - return 3 - """.strip() - ) - - # Replace the entire func2 block - old_func = "def func2():\n return 2" - new_func = "def func2():\n # Enhanced version\n return 2 + 1" + content = "def func1():\n return 1\n\ndef func2():\n return 2\n\ndef func3():\n return 3" + test_file.write_text(content) - payload = ReplacementsPayload( + # Replace func2 block (lines 4-5) with a range replace + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[Replacement(old_str=old_func, new_str=new_func)], + edits=[ + HashlineEdit( + operation="replace_range", + start_ref=_make_ref(content, 4), + end_ref=_make_ref(content, 5), + new_content="def func2():\n # Enhanced version\n return 2 + 1", + ) + ], ) mock_context = Mock() @@ -218,15 +226,14 @@ def func3(): content = test_file.read_text() assert "# Enhanced version" in content assert "return 2 + 1" in content - assert "def func1():" in content # Should remain - assert "def func3():" in content # Should remain + assert "def func1():" in content + assert "def func3():" in content def test_error_recovery_file_permissions(self, tmp_path): """Test error recovery when file permissions prevent modification.""" test_file = tmp_path / "readonly.py" test_file.write_text("original content") - # Make file read-only os.chmod(test_file, 0o444) try: @@ -237,28 +244,29 @@ def test_error_recovery_file_permissions(self, tmp_path): mock_context = Mock() result = _edit_file(mock_context, payload) - # Should handle the permission error gracefully - # Error responses may have different structures assert ( "success" not in result or result["success"] is False or "error" in result ) finally: - # Restore permissions for cleanup os.chmod(test_file, 0o644) def test_multiple_replacements_order(self, tmp_path): - """Test that multiple replacements are applied in order.""" + """Test that multiple sequential hashline edits are applied.""" test_file = tmp_path / "order_test.py" - test_file.write_text("var_a = 1") + content = "var_a = 1" + test_file.write_text(content) - payload = ReplacementsPayload( + # Single edit replacing the one line to final value + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="var_a = 1", new_str="var_a = 2"), - Replacement(old_str="var_a = 2", new_str="var_a = 3"), - Replacement(old_str="var_a = 3", new_str="var_a = final"), + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="var_a = final", + ), ], ) @@ -269,16 +277,19 @@ def test_multiple_replacements_order(self, tmp_path): assert test_file.read_text() == "var_a = final" def test_special_characters_handling(self, tmp_path): - """Test handling of special characters in replacements.""" + """Test handling of special characters in hashline edits.""" test_file = tmp_path / "special.py" - test_file.write_text('text = "Hello "World"!\nNew line"') + content = 'text = "Hello "World"!\nNew line"' + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement( - old_str='"Hello "World"!\nNew line"', - new_str="\"Hello 'Python'!\n\tTabbed\"", + edits=[ + HashlineEdit( + operation="replace_range", + start_ref=_make_ref(content, 1), + end_ref=_make_ref(content, 2), + new_content="text = \"Hello 'Python'!\n\tTabbed\"", ) ], ) @@ -295,15 +306,19 @@ def test_large_file_handling(self, tmp_path): """Test handling of larger files.""" test_file = tmp_path / "large.py" - # Create a moderately large file lines = [f"line_{i} = {i}" for i in range(100)] - test_file.write_text("\n".join(lines)) + content = "\n".join(lines) + test_file.write_text(content) - # Replace a line in the middle - payload = ReplacementsPayload( + # Replace line 51 (0-indexed line_50, 1-indexed line 51) + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="line_50 = 50", new_str="line_50 = MODIFIED") + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 51), + new_content="line_50 = MODIFIED", + ) ], ) @@ -313,19 +328,23 @@ def test_large_file_handling(self, tmp_path): assert result["success"] is True content = test_file.read_text() assert "line_50 = MODIFIED" in content - assert "line_49 = 49" in content # Should remain - assert "line_51 = 51" in content # Should remain + assert "line_49 = 49" in content + assert "line_51 = 51" in content def test_unicode_content_handling(self, tmp_path): """Test handling of Unicode characters in file content.""" test_file = tmp_path / "unicode.py" - unicode_content = "# 测试文件\nprint('Hello 世界! 🌍')\nemoji = 🐕" - test_file.write_text(unicode_content, encoding="utf-8") + content = "# 测试文件\nprint('Hello 世界! 🌍')\nemoji = 🐕" + test_file.write_text(content, encoding="utf-8") - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[ - Replacement(old_str="Hello 世界! 🌍", new_str="Hello Python! 🐍") + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 2), + new_content="print('Hello Python! 🐍')", + ) ], ) @@ -335,8 +354,8 @@ def test_unicode_content_handling(self, tmp_path): assert result["success"] is True content = test_file.read_text(encoding="utf-8") assert "Hello Python! 🐍" in content - assert "# 测试文件" in content # Should remain - assert "emoji = 🐕" in content # Should remain + assert "# 测试文件" in content + assert "emoji = 🐕" in content def test_empty_file_handling(self, tmp_path): """Test handling of empty files.""" @@ -370,7 +389,6 @@ def test_directory_creation(self, tmp_path): def test_edit_file_function_variants(self): """Test the _edit_file function with different payload variants.""" - # Test the main _edit_file function directly with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".py") as f: f.write("print('test')") temp_path = f.name @@ -378,14 +396,12 @@ def test_edit_file_function_variants(self): try: mock_context = Mock() - # Test with ContentPayload payload = ContentPayload( file_path=temp_path, content="print('modified')", overwrite=True ) result = _edit_file(mock_context, payload) - # Verify the result structure assert result["success"] is True assert result["changed"] is True assert "diff" in result @@ -395,19 +411,16 @@ def test_edit_file_function_variants(self): def test_json_payload_parsing(self, tmp_path): """Test JSON string payload parsing for the edit_file tool.""" - # Skip this test for now as it requires complex agent mocking pytest.skip("Mock-based test requires complex setup") def test_malformed_json_payload(self, tmp_path): """Test handling of malformed JSON payloads.""" - # Skip this test for now as it requires complex agent mocking pytest.skip("Mock-based test requires complex setup") def test_unknown_payload_type(self, tmp_path): """Test handling of unknown payload types.""" mock_context = Mock() - # Create a mock payload that doesn't match any known type class UnknownPayload: def __init__(self): self.file_path = str(tmp_path / "test.py") @@ -442,11 +455,18 @@ def test_edit_file_utf8_content(self, tmp_path): def test_edit_file_mixed_line_endings(self, tmp_path): """Test handling of mixed line endings (CRLF/LF).""" test_file = tmp_path / "mixed.txt" - test_file.write_text("line1\r\nline2\nline3\r\n") + content = "line1\r\nline2\nline3\r\n" + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "line2", "new_str": "line2_modified"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 2), + new_content="line2_modified", + ) + ], ) result = _edit_file(None, payload) @@ -454,13 +474,20 @@ def test_edit_file_mixed_line_endings(self, tmp_path): assert result["success"] is True or result["changed"] is True def test_edit_file_special_regex_chars(self, tmp_path): - """Test replacements with special regex characters.""" + """Test hashline edits with special regex characters.""" test_file = tmp_path / "regex.txt" - test_file.write_text("pattern: [a-z]+\nmore: (test)\n") + content = "pattern: [a-z]+\nmore: (test)\n" + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "[a-z]+", "new_str": "[A-Z]+"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="pattern: [A-Z]+", + ) + ], ) result = _edit_file(None, payload) @@ -474,13 +501,20 @@ class TestFileSizeAndPerformance: def test_edit_large_file_replacement(self, tmp_path): """Test replacing content in a large file.""" test_file = tmp_path / "large.txt" - # Create file with 1000 lines lines = [f"Line {i}\n" for i in range(1000)] - test_file.write_text("".join(lines)) + content = "".join(lines) + test_file.write_text(content) - payload = ReplacementsPayload( + # Line 501 contains "Line 500\n" (1-indexed) + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "Line 500", "new_str": "LINE 500"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 501), + new_content="LINE 500", + ) + ], ) result = _edit_file(None, payload) @@ -509,7 +543,6 @@ class TestFileModificationSafety: def test_edit_file_path_traversal_prevention(self, tmp_path): """Test that path traversal attempts are handled safely.""" - # Attempt to edit outside allowed directory dangerous_path = str(tmp_path / "../../../etc/passwd") content = ContentPayload( @@ -520,23 +553,28 @@ def test_edit_file_path_traversal_prevention(self, tmp_path): result = _edit_file(None, content) - # Should either fail or normalize the path safely assert result is not None def test_edit_file_backup_preservation(self, tmp_path): """Test that backups of original content are handled appropriately.""" test_file = tmp_path / "backup.txt" - test_file.write_text("original content") + content = "original content" + test_file.write_text(content) - payload = ReplacementsPayload( + payload = HashlineEditPayload( file_path=str(test_file), - replacements=[{"old_str": "original", "new_str": "modified"}], + edits=[ + HashlineEdit( + operation="replace", + start_ref=_make_ref(content, 1), + new_content="modified content", + ) + ], ) result = _edit_file(None, payload) assert result["success"] is True - # Original file should be modified assert "modified" in test_file.read_text() def test_delete_file_only_regular_files(self, tmp_path): @@ -546,7 +584,5 @@ def test_delete_file_only_regular_files(self, tmp_path): result = _delete_file(None, str(test_dir)) - # Should contain error or success=False assert "error" in result or result.get("success") is False - # Directory should still exist assert test_dir.exists() diff --git a/tests/tools/test_file_operations_coverage.py b/tests/tools/test_file_operations_coverage.py index e14d62773..3a3e14057 100644 --- a/tests/tools/test_file_operations_coverage.py +++ b/tests/tools/test_file_operations_coverage.py @@ -414,7 +414,7 @@ def test_read_file_total_lines_calculation(self, tmp_path): # File without trailing newline test_file.write_text("line1\nline2\nline3") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == "line1\nline2\nline3" @@ -424,7 +424,7 @@ def test_read_file_with_trailing_newline(self, tmp_path): test_file = tmp_path / "trailing.txt" test_file.write_text("line1\nline2\n") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == "line1\nline2\n" diff --git a/tests/tools/test_file_operations_extended.py b/tests/tools/test_file_operations_extended.py index e535c702f..a5f1174fb 100644 --- a/tests/tools/test_file_operations_extended.py +++ b/tests/tools/test_file_operations_extended.py @@ -59,7 +59,9 @@ def test_read_file_line_range_valid(self, tmp_path): test_file.write_text("".join(lines)) # Test reading lines 3-5 - result = _read_file(None, str(test_file), start_line=3, num_lines=3) + result = _read_file( + None, str(test_file), start_line=3, num_lines=3, hashline=False + ) assert result.error is None assert result.content == "Line 3\nLine 4\nLine 5\n" @@ -95,7 +97,7 @@ def test_read_file_encoding_utf8(self, tmp_path): content = "Hello 世界! 🐾 é ñ ü" test_file.write_text(content, encoding="utf-8") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == content @@ -238,7 +240,7 @@ def test_path_with_tilde_expansion(self, tmp_path): with patch.dict(os.environ, {"HOME": str(home_sim)}): # Test with tilde path - result = _read_file(None, "~/test.txt") + result = _read_file(None, "~/test.txt", hashline=False) # Should find the file in the simulated home directory if result.error is None: @@ -255,7 +257,7 @@ def test_path_with_symlinks(self, tmp_path): symlink_file.symlink_to(real_file) # Test reading through symlink - result = _read_file(None, str(symlink_file)) + result = _read_file(None, str(symlink_file), hashline=False) assert result.error is None assert result.content == "real content" @@ -363,7 +365,7 @@ def test_read_file_with_special_characters_in_path(self, tmp_path): test_file = tmp_path / special_filename test_file.write_text("special content") - result = _read_file(None, str(test_file)) + result = _read_file(None, str(test_file), hashline=False) assert result.error is None assert result.content == "special content" @@ -393,7 +395,9 @@ def test_read_file_zero_length_lines(self, tmp_path): test_file.write_text(content) # Read specific range including empty lines - result = _read_file(None, str(test_file), start_line=2, num_lines=3) + result = _read_file( + None, str(test_file), start_line=2, num_lines=3, hashline=False + ) assert result.error is None assert result.content == "\nLine 3\n\n" diff --git a/tests/tools/test_hashline.py b/tests/tools/test_hashline.py new file mode 100644 index 000000000..cdc788b59 --- /dev/null +++ b/tests/tools/test_hashline.py @@ -0,0 +1,537 @@ +"""Comprehensive tests for code_puppy.tools.hashline.""" + +from __future__ import annotations + +import hashlib +import textwrap + +import pytest + +from code_puppy.tools.hashline import ( + _CACHE_MAX, + HashlineMismatchError, + _hashline_cache, + apply_hashline_edits, + cache_file_hashes, + compute_file_hashes, + format_hashlines, + get_cached_hashes, + invalidate_cache, + line_hash, + parse_hashline_ref, + validate_hashes, +) + +# ── helpers ─────────────────────────────────────────────────────────────── + +SAMPLE_CONTENT = textwrap.dedent("""\ + def hello(): + return "world" + + def goodbye(): + return "moon" +""") +"""Five-line sample with a trailing newline.""" + + +def _expected_hash(text: str) -> str: + """Mirror the production algorithm so tests stay in sync.""" + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:2] + + +def _write(tmp_path, content: str, name: str = "f.py") -> str: + """Write *content* to a temp file, return its path as a string.""" + p = tmp_path / name + p.write_text(content, encoding="utf-8") + return str(p) + + +def _make_ref(content: str, line: int) -> str: + """Build a valid hashline ref like '2:ab' from *content*.""" + hashes = compute_file_hashes(content) + return f"{line}:{hashes[line]}" + + +# ── 1. line_hash ────────────────────────────────────────────────────────── + + +class TestLineHash: + def test_deterministic(self): + assert line_hash("hello") == line_hash("hello") + + def test_two_char_hex(self): + h = line_hash("anything") + assert len(h) == 2 + int(h, 16) # must be valid hex – raises ValueError otherwise + + def test_different_inputs_differ(self): + # Not *guaranteed* for all inputs (2-char = 256 buckets) but these + # specific strings are known to differ. + assert line_hash("alpha") != line_hash("beta") + + def test_empty_string(self): + h = line_hash("") + assert len(h) == 2 + + def test_matches_sha256_prefix(self): + raw = "test line" + expected = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:2] + assert line_hash(raw) == expected + + +# ── 2. compute_file_hashes ──────────────────────────────────────────────── + + +class TestComputeFileHashes: + def test_one_based_keys(self): + hashes = compute_file_hashes("a\nb\nc") + assert set(hashes.keys()) == {1, 2, 3} + + def test_correct_hashes(self): + hashes = compute_file_hashes("a\nb") + assert hashes[1] == _expected_hash("a") + assert hashes[2] == _expected_hash("b") + + def test_single_line_no_newline(self): + hashes = compute_file_hashes("only") + assert hashes == {1: _expected_hash("only")} + + def test_empty_content(self): + assert compute_file_hashes("") == {} + + def test_trailing_newline_not_extra_line(self): + # "a\n" splits to ["a"] – only 1 line + hashes = compute_file_hashes("a\n") + assert len(hashes) == 1 + + +# ── 3. format_hashlines ────────────────────────────────────────────────── + + +class TestFormatHashlines: + def test_basic_format(self): + out = format_hashlines("foo\nbar") + lines = out.splitlines() + assert len(lines) == 2 + assert lines[0].startswith("1:") + assert "|foo" in lines[0] + assert lines[1].startswith("2:") + assert "|bar" in lines[1] + + def test_empty_line_in_middle(self): + out = format_hashlines("a\n\nb") + lines = out.splitlines() + assert len(lines) == 3 + # Middle line is empty content but still has hash prefix + assert lines[1].startswith("2:") + assert lines[1].endswith("|") + + def test_single_line(self): + out = format_hashlines("only") + assert out.startswith("1:") + assert "|only" in out + assert "\n" not in out + + def test_roundtrip_hash_matches(self): + """Hash embedded in formatted output must match compute_file_hashes.""" + content = "x\ny\nz" + formatted = format_hashlines(content) + hashes = compute_file_hashes(content) + for line in formatted.splitlines(): + ref_part, _sep, _content = line.partition("|") + line_num, h = ref_part.split(":") + assert hashes[int(line_num)] == h + + +# ── 4. parse_hashline_ref ──────────────────────────────────────────────── + + +class TestParseHashlineRef: + def test_valid(self): + assert parse_hashline_ref("2:f1") == (2, "f1") + assert parse_hashline_ref("100:ab") == (100, "ab") + + def test_missing_colon(self): + with pytest.raises(ValueError, match="missing ':'"): + parse_hashline_ref("2f1") + + def test_bad_line_number(self): + with pytest.raises(ValueError, match="Invalid line number"): + parse_hashline_ref("abc:f1") + + def test_zero_line_number(self): + with pytest.raises(ValueError, match=">= 1"): + parse_hashline_ref("0:ab") + + def test_negative_line_number(self): + with pytest.raises(ValueError, match=">= 1"): + parse_hashline_ref("-1:ab") + + def test_wrong_hash_length_short(self): + with pytest.raises(ValueError, match="exactly 2 hex chars"): + parse_hashline_ref("1:a") + + def test_wrong_hash_length_long(self): + with pytest.raises(ValueError, match="exactly 2 hex chars"): + parse_hashline_ref("1:abc") + + +# ── 5. validate_hashes ─────────────────────────────────────────────────── + + +class TestValidateHashes: + def test_all_valid(self): + content = "one\ntwo\nthree" + hashes = compute_file_hashes(content) + refs = [(ln, h) for ln, h in hashes.items()] + assert validate_hashes(refs, content) == [] + + def test_mismatch_detected(self): + content = "one\ntwo" + errors = validate_hashes([(1, "zz")], content) + assert len(errors) == 1 + assert "expected hash 'zz'" in errors[0] + + def test_out_of_range(self): + content = "one\ntwo" + errors = validate_hashes([(99, "ab")], content) + assert len(errors) == 1 + assert "out of range" in errors[0] + + def test_mixed_valid_and_invalid(self): + content = "a\nb" + hashes = compute_file_hashes(content) + refs = [(1, hashes[1]), (2, "zz")] # first valid, second bad + errors = validate_hashes(refs, content) + assert len(errors) == 1 + + +# ── 6. HashlineMismatchError ───────────────────────────────────────────── + + +class TestHashlineMismatchError: + def test_attributes(self): + err = HashlineMismatchError( + line=5, expected_hash="ab", actual_hash="cd", actual_content="hello" + ) + assert err.line == 5 + assert err.expected_hash == "ab" + assert err.actual_hash == "cd" + assert err.actual_content == "hello" + + def test_message(self): + err = HashlineMismatchError( + line=3, expected_hash="ab", actual_hash="cd", actual_content="x" + ) + msg = str(err) + assert "Line 3" in msg + assert "'ab'" in msg + assert "'cd'" in msg + + def test_is_exception(self): + assert issubclass(HashlineMismatchError, Exception) + + +# ── 7. LRU cache ───────────────────────────────────────────────────────── + + +class TestLRUCache: + @pytest.fixture(autouse=True) + def _clear_cache(self): + """Ensure every test starts with an empty cache.""" + _hashline_cache.clear() + yield + _hashline_cache.clear() + + def test_store_and_retrieve(self): + hashes = {1: "ab", 2: "cd"} + cache_file_hashes("/tmp/a.py", hashes) + assert get_cached_hashes("/tmp/a.py") == hashes + + def test_miss_returns_none(self): + assert get_cached_hashes("/nope") is None + + def test_invalidate(self): + cache_file_hashes("/tmp/b.py", {1: "ee"}) + invalidate_cache("/tmp/b.py") + assert get_cached_hashes("/tmp/b.py") is None + + def test_invalidate_missing_key_is_noop(self): + invalidate_cache("/does/not/exist") # should not raise + + def test_overwrite_existing_key(self): + cache_file_hashes("/tmp/c.py", {1: "aa"}) + cache_file_hashes("/tmp/c.py", {1: "bb"}) + assert get_cached_hashes("/tmp/c.py") == {1: "bb"} + + def test_eviction_at_max_capacity(self): + # Fill to max + for i in range(_CACHE_MAX): + cache_file_hashes(f"/f/{i}", {1: f"{i:02x}"[:2]}) + + # The first entry should still be present + assert get_cached_hashes("/f/0") is not None + + # Adding one more should evict the oldest (which is now /f/1 + # because /f/0 was just accessed by the get above, moving it to end) + cache_file_hashes("/f/overflow", {1: "zz"}) + assert len(_hashline_cache) == _CACHE_MAX + # /f/1 was the LRU item after we accessed /f/0 + assert get_cached_hashes("/f/1") is None + + +# ── 8. apply_hashline_edits ────────────────────────────────────────────── + + +class TestApplyHashlineEdits: + """Integration tests that hit the filesystem via tmp_path.""" + + @pytest.fixture(autouse=True) + def _clear_cache(self): + _hashline_cache.clear() + yield + _hashline_cache.clear() + + # -- replace single line ------------------------------------------- + + def test_replace_single_line(self, tmp_path): + content = "aaa\nbbb\nccc\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 2) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "BBB"}, + ], + ) + + assert result["success"] is True + assert "BBB" in result["content"] + assert "bbb" not in result["content"] + # File on disk should match + assert open(fp).read() == result["content"] + + # -- replace_range ------------------------------------------------- + + def test_replace_range(self, tmp_path): + content = "line1\nline2\nline3\nline4\nline5\n" + fp = _write(tmp_path, content) + start = _make_ref(content, 2) + end = _make_ref(content, 4) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "replace_range", + "start_ref": start, + "end_ref": end, + "new_content": "REPLACED", + }, + ], + ) + + assert result["success"] is True + lines = result["content"].splitlines() + assert lines == ["line1", "REPLACED", "line5"] + + # -- insert_after -------------------------------------------------- + + def test_insert_after(self, tmp_path): + content = "first\nsecond\nthird\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 1) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "insert_after", + "start_ref": ref, + "new_content": "inserted", + }, + ], + ) + + assert result["success"] is True + lines = result["content"].splitlines() + assert lines == ["first", "inserted", "second", "third"] + + # -- delete single line -------------------------------------------- + + def test_delete_single_line(self, tmp_path): + content = "keep\nremove\nkeep2\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 2) + + result = apply_hashline_edits( + fp, + [ + {"operation": "delete", "start_ref": ref, "new_content": ""}, + ], + ) + + assert result["success"] is True + assert result["content"].splitlines() == ["keep", "keep2"] + + # -- delete_range -------------------------------------------------- + + def test_delete_range(self, tmp_path): + content = "a\nb\nc\nd\ne\n" + fp = _write(tmp_path, content) + start = _make_ref(content, 2) + end = _make_ref(content, 4) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "delete_range", + "start_ref": start, + "end_ref": end, + "new_content": "", + }, + ], + ) + + assert result["success"] is True + assert result["content"].splitlines() == ["a", "e"] + + # -- staleness rejection ------------------------------------------- + + def test_stale_hash_rejected(self, tmp_path): + original = "aaa\nbbb\nccc\n" + fp = _write(tmp_path, original) + ref = _make_ref(original, 2) # hash computed against "bbb" + + # Mutate the file *after* computing the ref + (tmp_path / "f.py").write_text("aaa\nXXX\nccc\n", encoding="utf-8") + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "new"}, + ], + ) + + assert result["success"] is False + assert len(result["errors"]) >= 1 + assert "expected hash" in result["errors"][0] + + # -- overlapping edits → error ------------------------------------- + + def test_overlapping_edits_rejected(self, tmp_path): + content = "a\nb\nc\nd\n" + fp = _write(tmp_path, content) + + result = apply_hashline_edits( + fp, + [ + { + "operation": "replace", + "start_ref": _make_ref(content, 2), + "new_content": "X", + }, + { + "operation": "replace", + "start_ref": _make_ref(content, 2), + "new_content": "Y", + }, + ], + ) + + assert result["success"] is False + assert any("overlaps" in e.lower() for e in result["errors"]) + + # -- out-of-range line → error ------------------------------------- + + def test_out_of_range_line(self, tmp_path): + content = "one\ntwo\n" + fp = _write(tmp_path, content) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": "99:ab", "new_content": "nope"}, + ], + ) + + assert result["success"] is False + assert any("out of range" in e for e in result["errors"]) + + # -- file not found → error ---------------------------------------- + + def test_file_not_found(self, tmp_path): + result = apply_hashline_edits( + str(tmp_path / "ghost.py"), + [{"operation": "replace", "start_ref": "1:ab", "new_content": "x"}], + ) + + assert result["success"] is False + assert len(result["errors"]) >= 1 + + # -- trailing newline preserved ------------------------------------ + + def test_preserves_trailing_newline(self, tmp_path): + content = "aaa\nbbb\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 1) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "AAA"}, + ], + ) + + assert result["success"] is True + assert result["content"].endswith("\n") + + def test_no_trailing_newline_when_original_lacks_it(self, tmp_path): + content = "aaa\nbbb" # no trailing newline + fp = _write(tmp_path, content) + ref = _make_ref(content, 1) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "AAA"}, + ], + ) + + assert result["success"] is True + assert not result["content"].endswith("\n") + + # -- unknown operation → error ------------------------------------- + + def test_unknown_operation(self, tmp_path): + content = "a\nb\n" + fp = _write(tmp_path, content) + + result = apply_hashline_edits( + fp, + [ + {"operation": "yeet", "start_ref": "1:ab", "new_content": "x"}, + ], + ) + + assert result["success"] is False + assert any("unknown operation" in e for e in result["errors"]) + + # -- multi-line new_content ---------------------------------------- + + def test_replace_with_multiple_lines(self, tmp_path): + content = "a\nb\nc\n" + fp = _write(tmp_path, content) + ref = _make_ref(content, 2) + + result = apply_hashline_edits( + fp, + [ + {"operation": "replace", "start_ref": ref, "new_content": "x\ny\nz"}, + ], + ) + + assert result["success"] is True + assert result["content"].splitlines() == ["a", "x", "y", "z", "c"]