diff --git a/codebase_rag/cli_help.py b/codebase_rag/cli_help.py index 01e9ffe80..c2051e66f 100644 --- a/codebase_rag/cli_help.py +++ b/codebase_rag/cli_help.py @@ -49,6 +49,11 @@ class CLICommandName(StrEnum): HELP_REPO_PATH_OPTIMIZE = "Path to the repository to optimize" HELP_REPO_PATH_WATCH = "Path to the repository to watch." +HELP_DEBOUNCE = "Debounce delay in seconds. Set to 0 to disable debouncing." +HELP_MAX_WAIT = ( + "Maximum wait time in seconds before forcing an update during continuous edits." +) + HELP_UPDATE_GRAPH = "Update the knowledge graph by parsing the repository" HELP_CLEAN_DB = "Clean the database before updating (use when adding first repo)" HELP_OUTPUT_GRAPH = "Export graph to JSON file after updating (requires --update-graph)" diff --git a/codebase_rag/constants.py b/codebase_rag/constants.py index cfaf834f2..c13cd1f57 100644 --- a/codebase_rag/constants.py +++ b/codebase_rag/constants.py @@ -691,23 +691,61 @@ class TreeSitterModule(StrEnum): # (H) File/directory ignore patterns IGNORE_PATTERNS = frozenset( { + # Version control ".git", + ".git-local", + # Python "venv", ".venv", + "env", + "ENV", "__pycache__", - "node_modules", - "build", - "dist", ".eggs", ".pytest_cache", ".mypy_cache", ".ruff_cache", - ".claude", + "develop-eggs", + ".tox", + ".nox", + ".coverage", + "htmlcov", + # Rust + "target", + ".fingerprint", + "incremental", + # TypeScript / JavaScript / Node + "node_modules", + ".npm", + ".yarn", + ".pnpm-store", + # Build outputs + "build", + "dist", + "out", + "gen", + # Coverage + "coverage", + "criterion", + # IDE / Editor ".idea", ".vscode", + ".fleet", + ".history", + # AI / Agent tools + ".claude", + ".waves", + ".agents", + ".agents2", + ".wagents", + ".codex", + ".opencode", + ".sisyphus", + # Misc + ".tmp", + ".pids", } ) -IGNORE_SUFFIXES = frozenset({".tmp", "~"}) +IGNORE_SUFFIXES = frozenset({".tmp", "~", ".bak", ".swp", ".pyc", ".pyo"}) PAYLOAD_NODE_ID = "node_id" PAYLOAD_QUALIFIED_NAME = "qualified_name" @@ -731,6 +769,10 @@ class EventType(StrEnum): WATCHER_SLEEP_INTERVAL = 1 LOG_LEVEL_INFO = "INFO" +# (H) Debounce settings for realtime watcher +DEFAULT_DEBOUNCE_SECONDS = 5 +DEFAULT_MAX_WAIT_SECONDS = 30 + class Architecture(StrEnum): X86_64 = "x86_64" diff --git a/codebase_rag/logs.py b/codebase_rag/logs.py index f97ba10c7..9a474f45b 100644 --- a/codebase_rag/logs.py +++ b/codebase_rag/logs.py @@ -91,8 +91,25 @@ # (H) File watcher logs WATCHER_ACTIVE = "File watcher is now active." +WATCHER_DEBOUNCE_ACTIVE = ( + "File watcher active with debouncing (debounce={debounce}s, max_wait={max_wait}s)" +) WATCHER_SKIP_NO_QUERY = "Ingestor does not support querying, skipping real-time update." CHANGE_DETECTED = "Change detected: {event_type} on {path}. Updating graph." +CHANGE_DEBOUNCING = ( + "Change detected: {event_type} on {name} (debouncing for {debounce}s)" +) +DEBOUNCE_RESET = "Reset debounce timer for {path}" +DEBOUNCE_MAX_WAIT = "Max wait ({max_wait}s) exceeded for {path}, processing now" +DEBOUNCE_SCHEDULED = ( + "Scheduled update for {path} in {debounce}s (max wait: {remaining}s remaining)" +) +DEBOUNCE_PROCESSING = "Processing debounced change: {path}" +DEBOUNCE_NO_EVENT = "No pending event for {path}, skipping" +DEBOUNCE_MAX_WAIT_ADJUSTED = ( + "max_wait ({max_wait}s) is less than debounce ({debounce}s). " + "Setting max_wait to debounce value." +) DELETION_QUERY = "Ran deletion query for path: {path}" RECALC_CALLS = "Recalculating all function call relationships for consistency..." GRAPH_UPDATED = "Graph updated successfully for change in: {name}" diff --git a/codebase_rag/tests/test_realtime_debounce.py b/codebase_rag/tests/test_realtime_debounce.py new file mode 100644 index 000000000..1dd2bf83b --- /dev/null +++ b/codebase_rag/tests/test_realtime_debounce.py @@ -0,0 +1,459 @@ +""" +Tests for the realtime_updater debouncing functionality. + +These tests verify the hybrid debounce strategy that prevents redundant +graph updates during rapid file saves. +""" + +from __future__ import annotations + +import threading +import time +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock + +import pytest +from watchdog.events import FileCreatedEvent, FileDeletedEvent, FileModifiedEvent + +from codebase_rag.constants import DEFAULT_DEBOUNCE_SECONDS, DEFAULT_MAX_WAIT_SECONDS +from codebase_rag.services import QueryProtocol + + +class MockQueryIngestor: + """Mock ingestor that satisfies both IngestorProtocol and QueryProtocol.""" + + def __init__(self) -> None: + self.execute_write = MagicMock() + self.flush_all = MagicMock() + self.fetch_all = MagicMock(return_value=[]) + self.ensure_node_batch = MagicMock() + self.ensure_relationship_batch = MagicMock() + + def __enter__(self) -> MockQueryIngestor: + return self + + def __exit__(self, *args: Any) -> None: + pass + + +# Register MockQueryIngestor as implementing QueryProtocol for isinstance checks +QueryProtocol.register(MockQueryIngestor) + + +class TestCodeChangeEventHandlerDebounce: + """Tests for the CodeChangeEventHandler debouncing logic.""" + + @pytest.fixture + def mock_ingestor(self) -> MockQueryIngestor: + """Create a mock ingestor that satisfies QueryProtocol.""" + return MockQueryIngestor() + + @pytest.fixture + def mock_updater( + self, tmp_path: Path, mock_ingestor: MockQueryIngestor + ) -> MagicMock: + """Create a mock GraphUpdater with required attributes.""" + updater = MagicMock() + updater.repo_path = tmp_path + updater.ingestor = mock_ingestor + updater.remove_file_from_state = MagicMock() + updater.factory = MagicMock() + updater.factory.definition_processor.process_file = MagicMock(return_value=None) + updater._process_function_calls = MagicMock() + updater.parsers = {} + updater.queries = {} + updater.ast_cache = {} + return updater + + @pytest.fixture + def sample_file(self, tmp_path: Path) -> Path: + """Create a sample file for testing.""" + test_file = tmp_path / "test.py" + test_file.write_text("# test file") + return test_file + + def test_handler_initialization_with_debounce( + self, mock_updater: MagicMock + ) -> None: + """Test that handler initializes with correct debounce settings.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=5, max_wait_seconds=30 + ) + + assert handler.debounce_seconds == 5 + assert handler.max_wait_seconds == 30 + assert handler.debounce_enabled is True + assert len(handler.timers) == 0 + assert len(handler.first_event_time) == 0 + assert len(handler.pending_events) == 0 + + def test_handler_initialization_without_debounce( + self, mock_updater: MagicMock + ) -> None: + """Test that handler initializes correctly when debouncing is disabled.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0, max_wait_seconds=30 + ) + + assert handler.debounce_seconds == 0 + assert handler.debounce_enabled is False + + def test_handler_uses_default_constants(self, mock_updater: MagicMock) -> None: + """Test that handler uses default constants when not specified.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler(mock_updater) + + assert handler.debounce_seconds == DEFAULT_DEBOUNCE_SECONDS + assert handler.max_wait_seconds == DEFAULT_MAX_WAIT_SECONDS + + def test_is_relevant_filters_ignored_patterns( + self, mock_updater: MagicMock, tmp_path: Path + ) -> None: + """Test that _is_relevant correctly filters out ignored paths.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler(mock_updater) + + # Should be ignored (directories in ignore patterns) + assert handler._is_relevant(str(tmp_path / ".git" / "config")) is False + assert handler._is_relevant(str(tmp_path / "node_modules" / "pkg.js")) is False + assert handler._is_relevant(str(tmp_path / "__pycache__" / "mod.pyc")) is False + + # Should be relevant + assert handler._is_relevant(str(tmp_path / "main.py")) is True + assert handler._is_relevant(str(tmp_path / "src" / "lib.rs")) is True + assert handler._is_relevant(str(tmp_path / "app.js")) is True + + def test_dispatch_ignores_directories( + self, mock_updater: MagicMock, mock_ingestor: MockQueryIngestor, tmp_path: Path + ) -> None: + """Test that dispatch ignores directory events.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.1, max_wait_seconds=1 + ) + + # Create event that is marked as directory + event = FileModifiedEvent(str(tmp_path / "some_dir")) + # The is_directory property is set by watchdog based on the event type + # For FileModifiedEvent, we need to check is_directory attribute + object.__setattr__(event, "is_directory", True) + + handler.dispatch(event) + + # No timer should be created for directory events + assert len(handler.timers) == 0 + mock_ingestor.execute_write.assert_not_called() + + def test_debounce_batches_rapid_events( + self, + mock_updater: MagicMock, + mock_ingestor: MockQueryIngestor, + sample_file: Path, + ) -> None: + """Test that rapid events are batched into a single update.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.2, max_wait_seconds=5 + ) + + # Simulate 5 rapid saves + for _ in range(5): + event = FileModifiedEvent(str(sample_file)) + handler.dispatch(event) + time.sleep(0.05) # 50ms between saves + + # Should have one pending event + assert len(handler.pending_events) == 1 + + # Wait for debounce to complete + time.sleep(0.4) + + # After debounce, ingestor should have been called only once + mock_ingestor.flush_all.assert_called_once() + + def test_no_debounce_processes_immediately( + self, + mock_updater: MagicMock, + mock_ingestor: MockQueryIngestor, + sample_file: Path, + ) -> None: + """Test that events are processed immediately when debounce is disabled.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0, max_wait_seconds=30 + ) + + event = FileModifiedEvent(str(sample_file)) + handler.dispatch(event) + + # Should process immediately (no pending events) + assert len(handler.pending_events) == 0 + assert len(handler.timers) == 0 + mock_ingestor.flush_all.assert_called_once() + + def test_max_wait_forces_update( + self, + mock_updater: MagicMock, + mock_ingestor: MockQueryIngestor, + sample_file: Path, + ) -> None: + """Test that max_wait forces an update even during continuous editing.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.5, max_wait_seconds=0.3 + ) + + # First event + event = FileModifiedEvent(str(sample_file)) + handler.dispatch(event) + + # Wait until max_wait is exceeded + time.sleep(0.4) + + # Second event should trigger immediate processing due to max_wait + event2 = FileModifiedEvent(str(sample_file)) + handler.dispatch(event2) + + # Give time for processing + time.sleep(0.15) + + # Should have processed at least once due to max_wait + assert mock_ingestor.flush_all.call_count >= 1 + + def test_different_files_tracked_separately( + self, mock_updater: MagicMock, tmp_path: Path + ) -> None: + """Test that different files are debounced independently.""" + from realtime_updater import CodeChangeEventHandler + + file1 = tmp_path / "file1.py" + file2 = tmp_path / "file2.py" + file1.write_text("# file 1") + file2.write_text("# file 2") + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.2, max_wait_seconds=5 + ) + + # Events for different files + event1 = FileModifiedEvent(str(file1)) + event2 = FileModifiedEvent(str(file2)) + + handler.dispatch(event1) + handler.dispatch(event2) + + # Should have two pending events + assert len(handler.pending_events) == 2 + assert len(handler.timers) == 2 + + def test_timer_cleanup_after_processing( + self, + mock_updater: MagicMock, + mock_ingestor: MockQueryIngestor, + sample_file: Path, + ) -> None: + """Test that timers and state are cleaned up after processing.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.1, max_wait_seconds=5 + ) + + event = FileModifiedEvent(str(sample_file)) + handler.dispatch(event) + + # Should have pending state + assert len(handler.pending_events) == 1 + assert len(handler.first_event_time) == 1 + + # Wait for processing + time.sleep(0.25) + + # State should be cleaned up + assert len(handler.pending_events) == 0 + assert len(handler.first_event_time) == 0 + assert len(handler.timers) == 0 + + def test_created_event_triggers_debounce( + self, mock_updater: MagicMock, tmp_path: Path + ) -> None: + """Test that created events are also debounced.""" + from realtime_updater import CodeChangeEventHandler + + new_file = tmp_path / "new_file.py" + new_file.write_text("# new file") + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.2, max_wait_seconds=5 + ) + + event = FileCreatedEvent(str(new_file)) + handler.dispatch(event) + + assert len(handler.pending_events) == 1 + + def test_deleted_event_triggers_debounce( + self, mock_updater: MagicMock, sample_file: Path + ) -> None: + """Test that deleted events are also debounced.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.2, max_wait_seconds=5 + ) + + event = FileDeletedEvent(str(sample_file)) + handler.dispatch(event) + + assert len(handler.pending_events) == 1 + + def test_thread_safety_concurrent_events( + self, mock_updater: MagicMock, tmp_path: Path + ) -> None: + """Test thread safety when multiple events arrive concurrently.""" + from realtime_updater import CodeChangeEventHandler + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.3, max_wait_seconds=5 + ) + + files = [tmp_path / f"file{i}.py" for i in range(10)] + for f in files: + f.write_text(f"# {f.name}") + + def send_events(file_path: Path) -> None: + for _ in range(5): + event = FileModifiedEvent(str(file_path)) + handler.dispatch(event) + time.sleep(0.02) + + # Send events from multiple threads + threads = [threading.Thread(target=send_events, args=(f,)) for f in files[:5]] + for t in threads: + t.start() + for t in threads: + t.join() + + # Should have 5 pending events (one per file) + assert len(handler.pending_events) == 5 + + +class TestDebounceValidation: + """Tests for CLI validation of debounce parameters.""" + + def test_validate_non_negative_float_accepts_zero(self) -> None: + """Test that zero is accepted as a valid debounce value.""" + from realtime_updater import _validate_non_negative_float + + assert _validate_non_negative_float(0) == 0 + assert _validate_non_negative_float(0.0) == 0.0 + + def test_validate_non_negative_float_accepts_positive(self) -> None: + """Test that positive values are accepted.""" + from realtime_updater import _validate_non_negative_float + + assert _validate_non_negative_float(5) == 5 + assert _validate_non_negative_float(0.5) == 0.5 + assert _validate_non_negative_float(100) == 100 + + def test_validate_non_negative_float_rejects_negative(self) -> None: + """Test that negative values are rejected.""" + import typer + + from realtime_updater import _validate_non_negative_float + + with pytest.raises(typer.BadParameter): + _validate_non_negative_float(-1) + + with pytest.raises(typer.BadParameter): + _validate_non_negative_float(-0.1) + + +class TestDebounceIntegration: + """Integration tests for debounce with real timing.""" + + @pytest.fixture + def mock_ingestor(self) -> MockQueryIngestor: + """Create a mock ingestor that satisfies QueryProtocol.""" + return MockQueryIngestor() + + @pytest.fixture + def mock_updater( + self, tmp_path: Path, mock_ingestor: MockQueryIngestor + ) -> MagicMock: + """Create a mock GraphUpdater.""" + updater = MagicMock() + updater.repo_path = tmp_path + updater.ingestor = mock_ingestor + updater.remove_file_from_state = MagicMock() + updater.factory = MagicMock() + updater.factory.definition_processor.process_file = MagicMock(return_value=None) + updater._process_function_calls = MagicMock() + updater.parsers = {} + updater.queries = {} + updater.ast_cache = {} + return updater + + def test_realistic_rapid_save_scenario( + self, mock_updater: MagicMock, mock_ingestor: MockQueryIngestor, tmp_path: Path + ) -> None: + """ + Simulate realistic rapid save scenario: + - User saves file 10 times over 3 seconds + - With 0.5s debounce and 2s max_wait, should result in ~2-4 updates + """ + from realtime_updater import CodeChangeEventHandler + + test_file = tmp_path / "editor.py" + test_file.write_text("# editing") + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.5, max_wait_seconds=2 + ) + + # Simulate 10 saves over 3 seconds + for i in range(10): + event = FileModifiedEvent(str(test_file)) + handler.dispatch(event) + time.sleep(0.3) + + # Wait for final debounce + time.sleep(0.7) + + # Should have batched into fewer updates due to max_wait and debounce + # With max_wait=2s and 3s total time, expect ~2-4 updates + call_count = mock_ingestor.flush_all.call_count + assert 1 <= call_count <= 4, f"Expected 1-4 updates, got {call_count}" + + def test_single_edit_after_quiet_period( + self, mock_updater: MagicMock, mock_ingestor: MockQueryIngestor, tmp_path: Path + ) -> None: + """Test that a single edit after quiet period is processed correctly.""" + from realtime_updater import CodeChangeEventHandler + + test_file = tmp_path / "single.py" + test_file.write_text("# single edit") + + handler = CodeChangeEventHandler( + mock_updater, debounce_seconds=0.1, max_wait_seconds=5 + ) + + event = FileModifiedEvent(str(test_file)) + handler.dispatch(event) + + # Wait for debounce + time.sleep(0.25) + + # Should have exactly one update + mock_ingestor.flush_all.assert_called_once() diff --git a/codebase_rag/tool_errors.py b/codebase_rag/tool_errors.py index bc5386867..7a26dbc4e 100644 --- a/codebase_rag/tool_errors.py +++ b/codebase_rag/tool_errors.py @@ -62,3 +62,4 @@ # (H) CLI validation errors INVALID_POSITIVE_INT = "{value!r} is not a valid positive integer" +INVALID_NON_NEGATIVE_FLOAT = "Value must be non-negative, got {value}" diff --git a/realtime_updater.py b/realtime_updater.py index 4fd95d5bc..0b778b0ca 100644 --- a/realtime_updater.py +++ b/realtime_updater.py @@ -1,4 +1,5 @@ import sys +import threading import time from pathlib import Path from typing import Annotated @@ -15,6 +16,8 @@ from codebase_rag.constants import ( CYPHER_DELETE_CALLS, CYPHER_DELETE_MODULE, + DEFAULT_DEBOUNCE_SECONDS, + DEFAULT_MAX_WAIT_SECONDS, IGNORE_PATTERNS, IGNORE_SUFFIXES, KEY_PATH, @@ -32,11 +35,47 @@ class CodeChangeEventHandler(FileSystemEventHandler): - def __init__(self, updater: GraphUpdater): + """ + Handles file system events with debouncing to prevent redundant graph updates. + + The handler implements a hybrid debounce strategy: + - Debounce: Waits for a quiet period after the last change before processing + - Max wait: Ensures updates happen within a maximum time window, even during + continuous editing + + This prevents the graph update process from running repeatedly when a file + is saved multiple times in quick succession (common during active development). + """ + + def __init__( + self, + updater: GraphUpdater, + debounce_seconds: float = DEFAULT_DEBOUNCE_SECONDS, + max_wait_seconds: float = DEFAULT_MAX_WAIT_SECONDS, + ): self.updater = updater self.ignore_patterns = IGNORE_PATTERNS self.ignore_suffixes = IGNORE_SUFFIXES - logger.info(logs.WATCHER_ACTIVE) + + # (H) Debounce configuration + self.debounce_seconds = debounce_seconds + self.max_wait_seconds = max_wait_seconds + self.debounce_enabled = debounce_seconds > 0 + + # (H) Thread-safe state for tracking pending changes + self.timers: dict[str, threading.Timer] = {} + self.first_event_time: dict[str, float] = {} + self.pending_events: dict[str, FileSystemEvent] = {} + self.lock = threading.Lock() + + if self.debounce_enabled: + logger.info( + logs.WATCHER_DEBOUNCE_ACTIVE.format( + debounce=debounce_seconds, max_wait=max_wait_seconds + ) + ) + else: + logger.info(logs.WATCHER_ACTIVE) def _is_relevant(self, path_str: str) -> bool: path = Path(path_str) @@ -65,6 +104,96 @@ def dispatch(self, event: FileSystemEvent) -> None: if event.is_directory or not self._is_relevant(src_path): return + if not self.debounce_enabled: + # (H) No debouncing - process immediately (legacy behavior) + self._process_change(event) + return + + # (H) Debounced processing with hybrid approach + path = Path(src_path) + relative_path_str = str(path.relative_to(self.updater.repo_path)) + current_time = time.time() + + with self.lock: + # (H) Track the first event time for max-wait calculation + if relative_path_str not in self.first_event_time: + self.first_event_time[relative_path_str] = current_time + logger.info( + logs.CHANGE_DEBOUNCING.format( + event_type=event.event_type, + name=path.name, + debounce=self.debounce_seconds, + ) + ) + + # (H) Always store the latest event for this file + self.pending_events[relative_path_str] = event + + # (H) Cancel any existing timer for this file + if relative_path_str in self.timers: + self.timers[relative_path_str].cancel() + logger.debug(logs.DEBOUNCE_RESET.format(path=relative_path_str)) + + # (H) Check if max wait time has been exceeded + time_since_first = current_time - self.first_event_time[relative_path_str] + + if time_since_first >= self.max_wait_seconds: + # (H) Max wait exceeded - process immediately + logger.info( + logs.DEBOUNCE_MAX_WAIT.format( + max_wait=self.max_wait_seconds, path=relative_path_str + ) + ) + self._schedule_immediate_processing(relative_path_str) + else: + # (H) Schedule debounced processing + remaining_wait = self.max_wait_seconds - time_since_first + timer = threading.Timer( + self.debounce_seconds, + self._process_debounced_change, + args=[relative_path_str], + ) + self.timers[relative_path_str] = timer + timer.start() + + logger.debug( + logs.DEBOUNCE_SCHEDULED.format( + path=relative_path_str, + debounce=self.debounce_seconds, + remaining=f"{remaining_wait:.1f}", + ) + ) + + def _schedule_immediate_processing(self, relative_path_str: str) -> None: + """Process a file change immediately (called when max wait is exceeded).""" + # (H) Use a zero-delay timer to process in the timer thread + timer = threading.Timer( + 0, self._process_debounced_change, args=[relative_path_str] + ) + self.timers[relative_path_str] = timer + timer.start() + + def _process_debounced_change(self, relative_path_str: str) -> None: + """Process a debounced file change after the timer fires.""" + with self.lock: + # (H) Retrieve and clear pending state for this file + event = self.pending_events.pop(relative_path_str, None) + self.first_event_time.pop(relative_path_str, None) + self.timers.pop(relative_path_str, None) + + if event is None: + logger.warning(logs.DEBOUNCE_NO_EVENT.format(path=relative_path_str)) + return + + logger.info(logs.DEBOUNCE_PROCESSING.format(path=relative_path_str)) + self._process_change(event) + + def _process_change(self, event: FileSystemEvent) -> None: + """Execute the actual graph update for a file change.""" + src_path = event.src_path + if isinstance(src_path, bytes): + src_path = src_path.decode() + ingestor = self.updater.ingestor if not isinstance(ingestor, QueryProtocol): logger.warning(logs.WATCHER_SKIP_NO_QUERY) @@ -77,14 +206,14 @@ def dispatch(self, event: FileSystemEvent) -> None: logs.CHANGE_DETECTED.format(event_type=event.event_type, path=path) ) - # (H) Step 1 + # (H) Step 1: Delete old data from graph ingestor.execute_write(CYPHER_DELETE_MODULE, {KEY_PATH: relative_path_str}) logger.debug(logs.DELETION_QUERY.format(path=relative_path_str)) - # (H) Step 2 + # (H) Step 2: Clear in-memory state self.updater.remove_file_from_state(path) - # (H) Step 3 + # (H) Step 3: Re-parse if modified/created if event.event_type in (EventType.MODIFIED, EventType.CREATED): lang_config = get_language_spec(path.suffix) if ( @@ -101,18 +230,23 @@ def dispatch(self, event: FileSystemEvent) -> None: root_node, language = result self.updater.ast_cache[path] = (root_node, language) - # (H) Step 4 + # (H) Step 4: Recalculate all function calls logger.info(logs.RECALC_CALLS) ingestor.execute_write(CYPHER_DELETE_CALLS) self.updater._process_function_calls() - # (H) Step 5 + # (H) Step 5: Flush changes to database self.updater.ingestor.flush_all() logger.success(logs.GRAPH_UPDATED.format(name=path.name)) def start_watcher( - repo_path: str, host: str, port: int, batch_size: int | None = None + repo_path: str, + host: str, + port: int, + batch_size: int | None = None, + debounce_seconds: float = DEFAULT_DEBOUNCE_SECONDS, + max_wait_seconds: float = DEFAULT_MAX_WAIT_SECONDS, ) -> None: repo_path_obj = Path(repo_path).resolve() parsers, queries = load_parsers() @@ -124,10 +258,24 @@ def start_watcher( port=port, batch_size=effective_batch_size, ) as ingestor: - _run_watcher_loop(ingestor, repo_path_obj, parsers, queries) + _run_watcher_loop( + ingestor, + repo_path_obj, + parsers, + queries, + debounce_seconds, + max_wait_seconds, + ) -def _run_watcher_loop(ingestor, repo_path_obj, parsers, queries): +def _run_watcher_loop( + ingestor, + repo_path_obj, + parsers, + queries, + debounce_seconds: float, + max_wait_seconds: float, +): updater = GraphUpdater(ingestor, repo_path_obj, parsers, queries) # (H) Initial full scan builds the complete context for real-time updates @@ -135,7 +283,11 @@ def _run_watcher_loop(ingestor, repo_path_obj, parsers, queries): updater.run() logger.success(logs.INITIAL_SCAN_DONE) - event_handler = CodeChangeEventHandler(updater) + event_handler = CodeChangeEventHandler( + updater, + debounce_seconds=debounce_seconds, + max_wait_seconds=max_wait_seconds, + ) observer = Observer() observer.schedule(event_handler, str(repo_path_obj), recursive=True) observer.start() @@ -157,6 +309,12 @@ def _validate_positive_int(value: int | None) -> int | None: return value +def _validate_non_negative_float(value: float) -> float: + if value < 0: + raise typer.BadParameter(te.INVALID_NON_NEGATIVE_FLOAT.format(value=value)) + return value + + def main( repo_path: Annotated[str, typer.Argument(help=ch.HELP_REPO_PATH_WATCH)], host: Annotated[ @@ -172,11 +330,62 @@ def main( callback=_validate_positive_int, ), ] = None, + debounce: Annotated[ + float, + typer.Option( + "--debounce", + "-d", + help=ch.HELP_DEBOUNCE, + callback=_validate_non_negative_float, + ), + ] = DEFAULT_DEBOUNCE_SECONDS, + max_wait: Annotated[ + float, + typer.Option( + "--max-wait", + "-m", + help=ch.HELP_MAX_WAIT, + callback=_validate_non_negative_float, + ), + ] = DEFAULT_MAX_WAIT_SECONDS, ) -> None: + """ + Watch a repository for file changes and update the knowledge graph in real-time. + + The watcher uses a hybrid debouncing strategy to efficiently handle rapid file saves: + + - DEBOUNCE: After a file change, waits for a quiet period before processing. + This batches rapid saves into a single update. + + - MAX_WAIT: Ensures updates happen within a maximum time window, even during + continuous editing. Prevents indefinite delays. + + Examples: + + # Default settings (5s debounce, 30s max wait) + python realtime_updater.py /path/to/repo + + # More aggressive batching for background monitoring + python realtime_updater.py /path/to/repo --debounce 10 --max-wait 60 + + # Quick feedback for demos + python realtime_updater.py /path/to/repo --debounce 2 --max-wait 10 + + # Disable debouncing (legacy behavior) + python realtime_updater.py /path/to/repo --debounce 0 + """ logger.remove() logger.add(sys.stdout, format=REALTIME_LOGGER_FORMAT, level=LOG_LEVEL_INFO) logger.info(logs.LOGGER_CONFIGURED) - start_watcher(repo_path, host, port, batch_size) + + # (H) Validate max_wait is greater than debounce when both are enabled + if debounce > 0 and max_wait > 0 and max_wait < debounce: + logger.warning( + logs.DEBOUNCE_MAX_WAIT_ADJUSTED.format(max_wait=max_wait, debounce=debounce) + ) + max_wait = debounce + + start_watcher(repo_path, host, port, batch_size, debounce, max_wait) if __name__ == "__main__":