Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fixed reindexing of unchanged files, now uses last_modified stamp #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 5 additions & 46 deletions gptme_rag/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,7 @@ def index(paths: list[Path], pattern: str, persist_dir: Path):
try:
indexer = Indexer(persist_directory=persist_dir, enable_persist=True)

# Get existing files and their metadata from the index, using absolute paths
existing_docs = indexer.get_all_documents()
logger.debug("Found %d existing documents in index", len(existing_docs))

existing_files = {}
for doc in existing_docs:
if "source" in doc.metadata:
abs_path = os.path.abspath(doc.metadata["source"])
mtime = doc.metadata.get("mtime", 0)
existing_files[abs_path] = mtime
logger.debug("Existing file: %s (mtime: %s)", abs_path, mtime)

logger.debug("Loaded %d existing files from index", len(existing_files))

# First, collect all documents and filter for new/modified
# Collect all documents (indexer handles modification checking)
all_documents = []
with console.status("Collecting documents...") as status:
for path in paths:
Expand All @@ -80,45 +66,18 @@ def index(paths: list[Path], pattern: str, persist_dir: Path):
else:
status.update(f"Processing directory: {path}")

documents = indexer.collect_documents(path)

# Filter for new or modified documents
filtered_documents = []
for doc in documents:
source = doc.metadata.get("source")
if source:
# Resolve to absolute path for consistent comparison
abs_source = os.path.abspath(source)
doc.metadata["source"] = abs_source
current_mtime = os.path.getmtime(abs_source)
doc.metadata["mtime"] = current_mtime

# Include if file is new or modified
if abs_source not in existing_files:
logger.debug("New file: %s", abs_source)
filtered_documents.append(doc)
elif current_mtime > existing_files[abs_source]:
logger.debug(
"Modified file: %s (current: %s, stored: %s)",
abs_source,
current_mtime,
existing_files[abs_source],
)
filtered_documents.append(doc)
else:
logger.debug("Unchanged file: %s", abs_source)

all_documents.extend(filtered_documents)
documents = indexer.collect_documents(path, check_modified=True)
all_documents.extend(documents)

if not all_documents:
console.print("No new or modified documents to index", style="yellow")
return

# Then process them with a progress bar
# Process with progress bar
n_files = len(set(doc.metadata.get("source", "") for doc in all_documents))
n_chunks = len(all_documents)

logger.info(f"Found {n_files} new/modified files to index ({n_chunks} chunks)")
logger.info(f"Processing {n_files} files ({n_chunks} chunks)")

with tqdm(
total=n_chunks,
Expand Down
102 changes: 99 additions & 3 deletions gptme_rag/indexing/indexer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import os
import subprocess
import time
from collections.abc import Generator
from datetime import datetime
from fnmatch import fnmatch as fnmatch_path
from logging import Filter
from pathlib import Path
Expand Down Expand Up @@ -92,6 +94,7 @@ def __init__(
self.persist_directory = Path(persist_directory).expanduser().resolve()
self.persist_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Using persist directory: {self.persist_directory}")

settings.persist_directory = str(self.persist_directory)
self.client = chromadb.PersistentClient(
path=str(self.persist_directory), settings=settings
Expand Down Expand Up @@ -175,7 +178,22 @@ def add_documents(self, documents: list[Document], batch_size: int = 10) -> None
documents: List of documents to add
batch_size: Number of documents to process in each batch
"""
list(self.add_documents_progress(documents, batch_size=batch_size))
# Process documents in batches
for i in range(0, len(documents), batch_size):
batch = documents[i : i + batch_size]
self._add_documents(batch)

# Update stored timestamps after successful indexing
for doc in batch:
if "source" in doc.metadata:
abs_path = str(Path(doc.metadata["source"]).resolve())
current_mtime = int(os.path.getmtime(abs_path))
doc.metadata["last_modified"] = current_mtime
# Update the document in the collection
self.collection.update(
ids=[doc.doc_id],
metadatas=[doc.metadata],
)

def add_documents_progress(
self, documents: list[Document], batch_size: int = 10
Expand All @@ -200,6 +218,14 @@ def _add_documents(self, documents: list[Document]) -> None:
doc = self._generate_doc_id(doc)
assert doc.doc_id is not None

# Update timestamp in metadata to current time
if "source" in doc.metadata:
abs_path = str(Path(doc.metadata["source"]).resolve())
current_mtime = os.path.getmtime(abs_path)
doc.metadata["last_modified"] = self._normalize_timestamp(
current_mtime
)

contents.append(doc.content)
metadatas.append(doc.metadata)
ids.append(doc.doc_id)
Expand Down Expand Up @@ -516,6 +542,9 @@ def list_documents(self, group_by_source: bool = True) -> list[Document]:
"""
# Get all documents from collection
results = self.collection.get()
logger.debug("ChromaDB returned %d documents", len(results["ids"]))
if results["ids"]:
logger.debug("First document metadata: %s", results["metadatas"][0])

if not results["ids"]:
return []
Expand Down Expand Up @@ -865,14 +894,54 @@ def _get_valid_files(

return valid_files

def _normalize_timestamp(self, timestamp: str | float | int | None) -> str:
"""Normalize timestamp to ISO format string."""
if timestamp is None:
return datetime.fromtimestamp(0).isoformat()
try:
if isinstance(timestamp, int | float):
return datetime.fromtimestamp(float(timestamp)).isoformat()
# If it's already an ISO string, validate and return
if isinstance(timestamp, str):
datetime.fromisoformat(timestamp) # Validate format
return timestamp
raise ValueError(f"Unsupported timestamp type: {type(timestamp)}")
except (ValueError, TypeError) as e:
logger.warning("Invalid timestamp format: %s (%s)", timestamp, e)
return datetime.fromtimestamp(0).isoformat()

def _compare_timestamps(self, stored: str, current: float) -> bool:
"""Compare stored ISO timestamp with current Unix timestamp.

Returns True if current is newer than stored."""
try:
stored_ts = datetime.fromisoformat(stored).timestamp()
# Round to seconds for comparison
return int(current) > int(stored_ts)
except (ValueError, TypeError) as e:
logger.warning("Error comparing timestamps: %s", e)
return True # If we can't compare, assume modified

def _get_stored_timestamps(self) -> dict[str, str]:
"""Get stored timestamps for all indexed files."""
stored = {}
for doc in self.get_all_documents():
if "source" in doc.metadata:
abs_path = str(Path(doc.metadata["source"]).resolve())
timestamp = self._normalize_timestamp(doc.metadata.get("last_modified"))
stored[abs_path] = timestamp
logger.debug("Stored timestamp for %s: %s", abs_path, timestamp)
return stored

def collect_documents(
self, path: Path, glob_pattern: str = "**/*.*"
self, path: Path, glob_pattern: str = "**/*.*", check_modified: bool = True
) -> list[Document]:
"""Collect documents from a file or directory without processing them.

Args:
path: Path to collect documents from
glob_pattern: Pattern to match files (only used for directories)
check_modified: Whether to check for modifications (skip unchanged files)

Returns:
List of documents ready for processing
Expand All @@ -884,8 +953,33 @@ def collect_documents(
logger.debug(f"No valid files found in {path}")
return documents

if check_modified:
stored_timestamps = self._get_stored_timestamps()

# Process files in order (least deep first)
for file_path in sorted(valid_files, key=lambda x: len(x.parts)):
abs_path = str(file_path.resolve())

if check_modified:
current_mtime = os.path.getmtime(file_path)
stored_timestamp = stored_timestamps.get(abs_path)

if stored_timestamp and not self._compare_timestamps(
stored_timestamp, current_mtime
):
logger.debug("Skipping unchanged file: %s", abs_path)
continue

if not stored_timestamp:
logger.debug("New file: %s", abs_path)
else:
logger.debug(
"Modified file: %s (current: %s, stored: %s)",
abs_path,
self._normalize_timestamp(current_mtime),
stored_timestamp,
)

logger.debug(f"Processing {file_path}")
documents.extend(Document.from_file(file_path, processor=self.processor))

Expand All @@ -912,4 +1006,6 @@ def get_all_documents(self) -> list[Document]:
Returns:
List of all documents in the index, including all chunks.
"""
return self.list_documents(group_by_source=False)
logger.debug("Getting all documents from index")
docs = self.list_documents(group_by_source=False)
return docs
5 changes: 4 additions & 1 deletion gptme_rag/indexing/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import time
from pathlib import Path
from datetime import datetime

from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
Expand Down Expand Up @@ -321,7 +322,9 @@ def _process_updates(self) -> None:

# Sort updates by modification time to get latest versions
updates = sorted(
existing_updates, key=lambda p: p.stat().st_mtime, reverse=True
existing_updates,
key=lambda p: datetime.fromtimestamp(p.stat().st_mtime),
reverse=True,
)
logger.debug(f"Sorted updates: {[str(p) for p in updates]}")

Expand Down