diff --git a/gptme_rag/cli.py b/gptme_rag/cli.py index 2c4c1f6..5440b9f 100644 --- a/gptme_rag/cli.py +++ b/gptme_rag/cli.py @@ -57,21 +57,7 @@ def index(paths: list[Path], pattern: str, persist_dir: Path): try: indexer = Indexer(persist_directory=persist_dir, enable_persist=True) - # Get existing files and their metadata from the index, using absolute paths - existing_docs = indexer.get_all_documents() - logger.debug("Found %d existing documents in index", len(existing_docs)) - - existing_files = {} - for doc in existing_docs: - if "source" in doc.metadata: - abs_path = os.path.abspath(doc.metadata["source"]) - mtime = doc.metadata.get("mtime", 0) - existing_files[abs_path] = mtime - logger.debug("Existing file: %s (mtime: %s)", abs_path, mtime) - - logger.debug("Loaded %d existing files from index", len(existing_files)) - - # First, collect all documents and filter for new/modified + # Collect all documents (indexer handles modification checking) all_documents = [] with console.status("Collecting documents...") as status: for path in paths: @@ -80,45 +66,18 @@ def index(paths: list[Path], pattern: str, persist_dir: Path): else: status.update(f"Processing directory: {path}") - documents = indexer.collect_documents(path) - - # Filter for new or modified documents - filtered_documents = [] - for doc in documents: - source = doc.metadata.get("source") - if source: - # Resolve to absolute path for consistent comparison - abs_source = os.path.abspath(source) - doc.metadata["source"] = abs_source - current_mtime = os.path.getmtime(abs_source) - doc.metadata["mtime"] = current_mtime - - # Include if file is new or modified - if abs_source not in existing_files: - logger.debug("New file: %s", abs_source) - filtered_documents.append(doc) - elif current_mtime > existing_files[abs_source]: - logger.debug( - "Modified file: %s (current: %s, stored: %s)", - abs_source, - current_mtime, - existing_files[abs_source], - ) - filtered_documents.append(doc) - else: - logger.debug("Unchanged file: %s", abs_source) - - all_documents.extend(filtered_documents) + documents = indexer.collect_documents(path, check_modified=True) + all_documents.extend(documents) if not all_documents: console.print("No new or modified documents to index", style="yellow") return - # Then process them with a progress bar + # Process with progress bar n_files = len(set(doc.metadata.get("source", "") for doc in all_documents)) n_chunks = len(all_documents) - logger.info(f"Found {n_files} new/modified files to index ({n_chunks} chunks)") + logger.info(f"Processing {n_files} files ({n_chunks} chunks)") with tqdm( total=n_chunks, diff --git a/gptme_rag/indexing/indexer.py b/gptme_rag/indexing/indexer.py index 0d06af6..d2e587b 100644 --- a/gptme_rag/indexing/indexer.py +++ b/gptme_rag/indexing/indexer.py @@ -1,7 +1,9 @@ import logging +import os import subprocess import time from collections.abc import Generator +from datetime import datetime from fnmatch import fnmatch as fnmatch_path from logging import Filter from pathlib import Path @@ -92,6 +94,7 @@ def __init__( self.persist_directory = Path(persist_directory).expanduser().resolve() self.persist_directory.mkdir(parents=True, exist_ok=True) logger.info(f"Using persist directory: {self.persist_directory}") + settings.persist_directory = str(self.persist_directory) self.client = chromadb.PersistentClient( path=str(self.persist_directory), settings=settings @@ -175,7 +178,22 @@ def add_documents(self, documents: list[Document], batch_size: int = 10) -> None documents: List of documents to add batch_size: Number of documents to process in each batch """ - list(self.add_documents_progress(documents, batch_size=batch_size)) + # Process documents in batches + for i in range(0, len(documents), batch_size): + batch = documents[i : i + batch_size] + self._add_documents(batch) + + # Update stored timestamps after successful indexing + for doc in batch: + if "source" in doc.metadata: + abs_path = str(Path(doc.metadata["source"]).resolve()) + current_mtime = int(os.path.getmtime(abs_path)) + doc.metadata["last_modified"] = current_mtime + # Update the document in the collection + self.collection.update( + ids=[doc.doc_id], + metadatas=[doc.metadata], + ) def add_documents_progress( self, documents: list[Document], batch_size: int = 10 @@ -200,6 +218,14 @@ def _add_documents(self, documents: list[Document]) -> None: doc = self._generate_doc_id(doc) assert doc.doc_id is not None + # Update timestamp in metadata to current time + if "source" in doc.metadata: + abs_path = str(Path(doc.metadata["source"]).resolve()) + current_mtime = os.path.getmtime(abs_path) + doc.metadata["last_modified"] = self._normalize_timestamp( + current_mtime + ) + contents.append(doc.content) metadatas.append(doc.metadata) ids.append(doc.doc_id) @@ -516,6 +542,9 @@ def list_documents(self, group_by_source: bool = True) -> list[Document]: """ # Get all documents from collection results = self.collection.get() + logger.debug("ChromaDB returned %d documents", len(results["ids"])) + if results["ids"]: + logger.debug("First document metadata: %s", results["metadatas"][0]) if not results["ids"]: return [] @@ -865,14 +894,54 @@ def _get_valid_files( return valid_files + def _normalize_timestamp(self, timestamp: str | float | int | None) -> str: + """Normalize timestamp to ISO format string.""" + if timestamp is None: + return datetime.fromtimestamp(0).isoformat() + try: + if isinstance(timestamp, int | float): + return datetime.fromtimestamp(float(timestamp)).isoformat() + # If it's already an ISO string, validate and return + if isinstance(timestamp, str): + datetime.fromisoformat(timestamp) # Validate format + return timestamp + raise ValueError(f"Unsupported timestamp type: {type(timestamp)}") + except (ValueError, TypeError) as e: + logger.warning("Invalid timestamp format: %s (%s)", timestamp, e) + return datetime.fromtimestamp(0).isoformat() + + def _compare_timestamps(self, stored: str, current: float) -> bool: + """Compare stored ISO timestamp with current Unix timestamp. + + Returns True if current is newer than stored.""" + try: + stored_ts = datetime.fromisoformat(stored).timestamp() + # Round to seconds for comparison + return int(current) > int(stored_ts) + except (ValueError, TypeError) as e: + logger.warning("Error comparing timestamps: %s", e) + return True # If we can't compare, assume modified + + def _get_stored_timestamps(self) -> dict[str, str]: + """Get stored timestamps for all indexed files.""" + stored = {} + for doc in self.get_all_documents(): + if "source" in doc.metadata: + abs_path = str(Path(doc.metadata["source"]).resolve()) + timestamp = self._normalize_timestamp(doc.metadata.get("last_modified")) + stored[abs_path] = timestamp + logger.debug("Stored timestamp for %s: %s", abs_path, timestamp) + return stored + def collect_documents( - self, path: Path, glob_pattern: str = "**/*.*" + self, path: Path, glob_pattern: str = "**/*.*", check_modified: bool = True ) -> list[Document]: """Collect documents from a file or directory without processing them. Args: path: Path to collect documents from glob_pattern: Pattern to match files (only used for directories) + check_modified: Whether to check for modifications (skip unchanged files) Returns: List of documents ready for processing @@ -884,8 +953,33 @@ def collect_documents( logger.debug(f"No valid files found in {path}") return documents + if check_modified: + stored_timestamps = self._get_stored_timestamps() + # Process files in order (least deep first) for file_path in sorted(valid_files, key=lambda x: len(x.parts)): + abs_path = str(file_path.resolve()) + + if check_modified: + current_mtime = os.path.getmtime(file_path) + stored_timestamp = stored_timestamps.get(abs_path) + + if stored_timestamp and not self._compare_timestamps( + stored_timestamp, current_mtime + ): + logger.debug("Skipping unchanged file: %s", abs_path) + continue + + if not stored_timestamp: + logger.debug("New file: %s", abs_path) + else: + logger.debug( + "Modified file: %s (current: %s, stored: %s)", + abs_path, + self._normalize_timestamp(current_mtime), + stored_timestamp, + ) + logger.debug(f"Processing {file_path}") documents.extend(Document.from_file(file_path, processor=self.processor)) @@ -912,4 +1006,6 @@ def get_all_documents(self) -> list[Document]: Returns: List of all documents in the index, including all chunks. """ - return self.list_documents(group_by_source=False) + logger.debug("Getting all documents from index") + docs = self.list_documents(group_by_source=False) + return docs diff --git a/gptme_rag/indexing/watcher.py b/gptme_rag/indexing/watcher.py index 6148a1a..a1ebd9b 100644 --- a/gptme_rag/indexing/watcher.py +++ b/gptme_rag/indexing/watcher.py @@ -3,6 +3,7 @@ import logging import time from pathlib import Path +from datetime import datetime from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer @@ -321,7 +322,9 @@ def _process_updates(self) -> None: # Sort updates by modification time to get latest versions updates = sorted( - existing_updates, key=lambda p: p.stat().st_mtime, reverse=True + existing_updates, + key=lambda p: datetime.fromtimestamp(p.stat().st_mtime), + reverse=True, ) logger.debug(f"Sorted updates: {[str(p) for p in updates]}")