From 50147e70bf82f1030a4520a860f7205de0c44027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Fri, 20 Dec 2024 17:38:30 +0100 Subject: [PATCH 1/2] fix: fixed reindexing of unchanged files, now uses last_modified stamp --- gptme_rag/cli.py | 25 ++++++++++++++++++++----- gptme_rag/indexing/indexer.py | 10 +++++++++- gptme_rag/indexing/watcher.py | 5 ++++- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/gptme_rag/cli.py b/gptme_rag/cli.py index 2c4c1f6..89a580a 100644 --- a/gptme_rag/cli.py +++ b/gptme_rag/cli.py @@ -4,6 +4,7 @@ import signal import sys import time +from datetime import datetime from pathlib import Path import click @@ -65,9 +66,21 @@ def index(paths: list[Path], pattern: str, persist_dir: Path): for doc in existing_docs: if "source" in doc.metadata: abs_path = os.path.abspath(doc.metadata["source"]) - mtime = doc.metadata.get("mtime", 0) - existing_files[abs_path] = mtime - logger.debug("Existing file: %s (mtime: %s)", abs_path, mtime) + last_modified = doc.metadata.get("last_modified") + if last_modified: + try: + # Parse ISO format timestamp to float + existing_files[abs_path] = datetime.fromisoformat( + last_modified + ).timestamp() + except ValueError: + logger.warning( + "Invalid last_modified format: %s", last_modified + ) + existing_files[abs_path] = 0 + else: + existing_files[abs_path] = 0 + # logger.debug("Existing file: %s", abs_path) # Too spammy logger.debug("Loaded %d existing files from index", len(existing_files)) @@ -91,13 +104,15 @@ def index(paths: list[Path], pattern: str, persist_dir: Path): abs_source = os.path.abspath(source) doc.metadata["source"] = abs_source current_mtime = os.path.getmtime(abs_source) - doc.metadata["mtime"] = current_mtime # Include if file is new or modified if abs_source not in existing_files: logger.debug("New file: %s", abs_source) filtered_documents.append(doc) - elif current_mtime > existing_files[abs_source]: + # Round to microseconds (6 decimal places) for comparison + elif round(current_mtime, 6) > round( + existing_files[abs_source], 6 + ): logger.debug( "Modified file: %s (current: %s, stored: %s)", abs_source, diff --git a/gptme_rag/indexing/indexer.py b/gptme_rag/indexing/indexer.py index 0d06af6..e412231 100644 --- a/gptme_rag/indexing/indexer.py +++ b/gptme_rag/indexing/indexer.py @@ -92,6 +92,7 @@ def __init__( self.persist_directory = Path(persist_directory).expanduser().resolve() self.persist_directory.mkdir(parents=True, exist_ok=True) logger.info(f"Using persist directory: {self.persist_directory}") + settings.persist_directory = str(self.persist_directory) self.client = chromadb.PersistentClient( path=str(self.persist_directory), settings=settings @@ -516,6 +517,9 @@ def list_documents(self, group_by_source: bool = True) -> list[Document]: """ # Get all documents from collection results = self.collection.get() + logger.debug("ChromaDB returned %d documents", len(results["ids"])) + if results["ids"]: + logger.debug("First document metadata: %s", results["metadatas"][0]) if not results["ids"]: return [] @@ -912,4 +916,8 @@ def get_all_documents(self) -> list[Document]: Returns: List of all documents in the index, including all chunks. """ - return self.list_documents(group_by_source=False) + logger.debug("Getting all documents from index") + docs = self.list_documents(group_by_source=False) + for doc in docs: + logger.debug("Retrieved document with metadata: %s", doc.metadata) + return docs diff --git a/gptme_rag/indexing/watcher.py b/gptme_rag/indexing/watcher.py index 6148a1a..a1ebd9b 100644 --- a/gptme_rag/indexing/watcher.py +++ b/gptme_rag/indexing/watcher.py @@ -3,6 +3,7 @@ import logging import time from pathlib import Path +from datetime import datetime from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer @@ -321,7 +322,9 @@ def _process_updates(self) -> None: # Sort updates by modification time to get latest versions updates = sorted( - existing_updates, key=lambda p: p.stat().st_mtime, reverse=True + existing_updates, + key=lambda p: datetime.fromtimestamp(p.stat().st_mtime), + reverse=True, ) logger.debug(f"Sorted updates: {[str(p) for p in updates]}") From 0b71d3c98bb631d891197fa7c7d186a1feb7c87e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Sat, 18 Jan 2025 15:41:59 +0100 Subject: [PATCH 2/2] refactor: moved document timestamp handling from cli into indexer --- gptme_rag/cli.py | 66 ++---------------------- gptme_rag/indexing/indexer.py | 96 +++++++++++++++++++++++++++++++++-- 2 files changed, 97 insertions(+), 65 deletions(-) diff --git a/gptme_rag/cli.py b/gptme_rag/cli.py index 89a580a..5440b9f 100644 --- a/gptme_rag/cli.py +++ b/gptme_rag/cli.py @@ -4,7 +4,6 @@ import signal import sys import time -from datetime import datetime from pathlib import Path import click @@ -58,33 +57,7 @@ def index(paths: list[Path], pattern: str, persist_dir: Path): try: indexer = Indexer(persist_directory=persist_dir, enable_persist=True) - # Get existing files and their metadata from the index, using absolute paths - existing_docs = indexer.get_all_documents() - logger.debug("Found %d existing documents in index", len(existing_docs)) - - existing_files = {} - for doc in existing_docs: - if "source" in doc.metadata: - abs_path = os.path.abspath(doc.metadata["source"]) - last_modified = doc.metadata.get("last_modified") - if last_modified: - try: - # Parse ISO format timestamp to float - existing_files[abs_path] = datetime.fromisoformat( - last_modified - ).timestamp() - except ValueError: - logger.warning( - "Invalid last_modified format: %s", last_modified - ) - existing_files[abs_path] = 0 - else: - existing_files[abs_path] = 0 - # logger.debug("Existing file: %s", abs_path) # Too spammy - - logger.debug("Loaded %d existing files from index", len(existing_files)) - - # First, collect all documents and filter for new/modified + # Collect all documents (indexer handles modification checking) all_documents = [] with console.status("Collecting documents...") as status: for path in paths: @@ -93,47 +66,18 @@ def index(paths: list[Path], pattern: str, persist_dir: Path): else: status.update(f"Processing directory: {path}") - documents = indexer.collect_documents(path) - - # Filter for new or modified documents - filtered_documents = [] - for doc in documents: - source = doc.metadata.get("source") - if source: - # Resolve to absolute path for consistent comparison - abs_source = os.path.abspath(source) - doc.metadata["source"] = abs_source - current_mtime = os.path.getmtime(abs_source) - - # Include if file is new or modified - if abs_source not in existing_files: - logger.debug("New file: %s", abs_source) - filtered_documents.append(doc) - # Round to microseconds (6 decimal places) for comparison - elif round(current_mtime, 6) > round( - existing_files[abs_source], 6 - ): - logger.debug( - "Modified file: %s (current: %s, stored: %s)", - abs_source, - current_mtime, - existing_files[abs_source], - ) - filtered_documents.append(doc) - else: - logger.debug("Unchanged file: %s", abs_source) - - all_documents.extend(filtered_documents) + documents = indexer.collect_documents(path, check_modified=True) + all_documents.extend(documents) if not all_documents: console.print("No new or modified documents to index", style="yellow") return - # Then process them with a progress bar + # Process with progress bar n_files = len(set(doc.metadata.get("source", "") for doc in all_documents)) n_chunks = len(all_documents) - logger.info(f"Found {n_files} new/modified files to index ({n_chunks} chunks)") + logger.info(f"Processing {n_files} files ({n_chunks} chunks)") with tqdm( total=n_chunks, diff --git a/gptme_rag/indexing/indexer.py b/gptme_rag/indexing/indexer.py index e412231..d2e587b 100644 --- a/gptme_rag/indexing/indexer.py +++ b/gptme_rag/indexing/indexer.py @@ -1,7 +1,9 @@ import logging +import os import subprocess import time from collections.abc import Generator +from datetime import datetime from fnmatch import fnmatch as fnmatch_path from logging import Filter from pathlib import Path @@ -176,7 +178,22 @@ def add_documents(self, documents: list[Document], batch_size: int = 10) -> None documents: List of documents to add batch_size: Number of documents to process in each batch """ - list(self.add_documents_progress(documents, batch_size=batch_size)) + # Process documents in batches + for i in range(0, len(documents), batch_size): + batch = documents[i : i + batch_size] + self._add_documents(batch) + + # Update stored timestamps after successful indexing + for doc in batch: + if "source" in doc.metadata: + abs_path = str(Path(doc.metadata["source"]).resolve()) + current_mtime = int(os.path.getmtime(abs_path)) + doc.metadata["last_modified"] = current_mtime + # Update the document in the collection + self.collection.update( + ids=[doc.doc_id], + metadatas=[doc.metadata], + ) def add_documents_progress( self, documents: list[Document], batch_size: int = 10 @@ -201,6 +218,14 @@ def _add_documents(self, documents: list[Document]) -> None: doc = self._generate_doc_id(doc) assert doc.doc_id is not None + # Update timestamp in metadata to current time + if "source" in doc.metadata: + abs_path = str(Path(doc.metadata["source"]).resolve()) + current_mtime = os.path.getmtime(abs_path) + doc.metadata["last_modified"] = self._normalize_timestamp( + current_mtime + ) + contents.append(doc.content) metadatas.append(doc.metadata) ids.append(doc.doc_id) @@ -869,14 +894,54 @@ def _get_valid_files( return valid_files + def _normalize_timestamp(self, timestamp: str | float | int | None) -> str: + """Normalize timestamp to ISO format string.""" + if timestamp is None: + return datetime.fromtimestamp(0).isoformat() + try: + if isinstance(timestamp, int | float): + return datetime.fromtimestamp(float(timestamp)).isoformat() + # If it's already an ISO string, validate and return + if isinstance(timestamp, str): + datetime.fromisoformat(timestamp) # Validate format + return timestamp + raise ValueError(f"Unsupported timestamp type: {type(timestamp)}") + except (ValueError, TypeError) as e: + logger.warning("Invalid timestamp format: %s (%s)", timestamp, e) + return datetime.fromtimestamp(0).isoformat() + + def _compare_timestamps(self, stored: str, current: float) -> bool: + """Compare stored ISO timestamp with current Unix timestamp. + + Returns True if current is newer than stored.""" + try: + stored_ts = datetime.fromisoformat(stored).timestamp() + # Round to seconds for comparison + return int(current) > int(stored_ts) + except (ValueError, TypeError) as e: + logger.warning("Error comparing timestamps: %s", e) + return True # If we can't compare, assume modified + + def _get_stored_timestamps(self) -> dict[str, str]: + """Get stored timestamps for all indexed files.""" + stored = {} + for doc in self.get_all_documents(): + if "source" in doc.metadata: + abs_path = str(Path(doc.metadata["source"]).resolve()) + timestamp = self._normalize_timestamp(doc.metadata.get("last_modified")) + stored[abs_path] = timestamp + logger.debug("Stored timestamp for %s: %s", abs_path, timestamp) + return stored + def collect_documents( - self, path: Path, glob_pattern: str = "**/*.*" + self, path: Path, glob_pattern: str = "**/*.*", check_modified: bool = True ) -> list[Document]: """Collect documents from a file or directory without processing them. Args: path: Path to collect documents from glob_pattern: Pattern to match files (only used for directories) + check_modified: Whether to check for modifications (skip unchanged files) Returns: List of documents ready for processing @@ -888,8 +953,33 @@ def collect_documents( logger.debug(f"No valid files found in {path}") return documents + if check_modified: + stored_timestamps = self._get_stored_timestamps() + # Process files in order (least deep first) for file_path in sorted(valid_files, key=lambda x: len(x.parts)): + abs_path = str(file_path.resolve()) + + if check_modified: + current_mtime = os.path.getmtime(file_path) + stored_timestamp = stored_timestamps.get(abs_path) + + if stored_timestamp and not self._compare_timestamps( + stored_timestamp, current_mtime + ): + logger.debug("Skipping unchanged file: %s", abs_path) + continue + + if not stored_timestamp: + logger.debug("New file: %s", abs_path) + else: + logger.debug( + "Modified file: %s (current: %s, stored: %s)", + abs_path, + self._normalize_timestamp(current_mtime), + stored_timestamp, + ) + logger.debug(f"Processing {file_path}") documents.extend(Document.from_file(file_path, processor=self.processor)) @@ -918,6 +1008,4 @@ def get_all_documents(self) -> list[Document]: """ logger.debug("Getting all documents from index") docs = self.list_documents(group_by_source=False) - for doc in docs: - logger.debug("Retrieved document with metadata: %s", doc.metadata) return docs