Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 248 additions & 0 deletions backend/rag_chunk_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
#!/usr/bin/env python3
"""
RAG Chunk Testing Script - LangChain Implementation

This script tests RAG indexing chunk effects using LangChain's
MarkdownHeaderTextSplitter and RecursiveCharacterTextSplitter.

Usage:
python rag_chunk_test.py [directory_path]

Example:
python rag_chunk_test.py ./documents
python rag_chunk_test.py
"""

import argparse
import os
import sys
from pathlib import Path
from typing import List, Dict, Any

from docling.document_converter import DocumentConverter
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter


class RAGChunkTester:
"""Test RAG chunking with LangChain splitters."""

def __init__(self, chunk_size: int = 1024, chunk_overlap: int = 50):
"""
Initialize the chunk tester.

Args:
chunk_size: Maximum size of each chunk
chunk_overlap: Number of characters to overlap between chunks
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap

# Initialize Docling document converter
self.converter = DocumentConverter()

# Initialize MarkdownHeaderTextSplitter for H1, H2, H3
self.md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
)

# Initialize RecursiveCharacterTextSplitter for fine-grained chunking
self.recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)

def read_directory(self, directory_path: str) -> List[Dict[str, Any]]:
"""
Read all supported files from the directory using DoclingReader.

Args:
directory_path: Path to the directory containing files

Returns:
List of documents with metadata
"""
documents = []
directory = Path(directory_path)

if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory_path}")

if not directory.is_dir():
raise NotADirectoryError(f"Path is not a directory: {directory_path}")

# Supported file extensions
supported_extensions = {'.md', '.txt', '.pdf', '.docx', '.doc', '.html', '.htm'}

# Find all supported files
files = []
for ext in supported_extensions:
files.extend(directory.glob(f"**/*{ext}"))

if not files:
print(f"Warning: No supported files found in {directory_path}")
return documents

print(f"Found {len(files)} files to process...")

for file_path in files:
try:
# Convert document using Docling
result = self.converter.convert(str(file_path))
content = result.document.export_to_markdown()

documents.append({
'source': str(file_path),
'content': content,
'filename': file_path.name
})
print(f" ✓ Read: {file_path.name}")
except Exception as e:
print(f" ✗ Error reading {file_path.name}: {e}")

return documents

def chunk_document(self, document: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Apply two-stage chunking to a document.

Stage 1: MarkdownHeaderTextSplitter (H1, H2, H3)
Stage 2: RecursiveCharacterTextSplitter

Args:
document: Document dictionary with 'content' and 'source'

Returns:
List of chunks with metadata
"""
chunks = []

# Stage 1: Split by markdown headers
md_chunks = self.md_splitter.split_text(document['content'])

# Stage 2: Further split each markdown chunk with recursive splitter
for i, md_chunk in enumerate(md_chunks):
# Extract header information from metadata
headers = []
if hasattr(md_chunk, 'metadata'):
for level in ['Header 1', 'Header 2', 'Header 3']:
if level in md_chunk.metadata and md_chunk.metadata[level]:
headers.append(f"{level}: {md_chunk.metadata[level]}")

# Apply recursive splitting
recursive_chunks = self.recursive_splitter.split_documents([md_chunk])

for j, chunk in enumerate(recursive_chunks):
chunks.append({
'source': document['source'],
'filename': document['filename'],
'headers': ' | '.join(headers) if headers else 'No headers',
'content': chunk.page_content,
'length': len(chunk.page_content),
'md_chunk_index': i,
'recursive_chunk_index': j
})

return chunks

def print_chunks(self, chunks: List[Dict[str, Any]]):
"""
Print all chunks with metadata to console.

Args:
chunks: List of chunk dictionaries
"""
print(f"\n{'='*80}")
print(f"Total Chunks: {len(chunks)}")
print(f"{'='*80}\n")

for idx, chunk in enumerate(chunks, 1):
print(f"{'='*80}")
print(f"=== Chunk {idx} ===")
print(f"Source: {chunk['source']}")
print(f"Headers: {chunk['headers']}")
print(f"Length: {chunk['length']} characters")
print(f"{'-'*80}")
print(f"Content:")
print(chunk['content'])
print(f"{'='*80}\n")

def run(self, directory_path: str):
"""
Run the complete chunking pipeline.

Args:
directory_path: Path to the directory containing files
"""
print(f"RAG Chunk Testing - LangChain Implementation")
print(f"{'='*80}")
print(f"Directory: {directory_path}")
print(f"Chunk Size: {self.chunk_size}")
print(f"Chunk Overlap: {self.chunk_overlap}")
print(f"{'='*80}\n")

# Read documents
documents = self.read_directory(directory_path)

if not documents:
print("No documents to process. Exiting.")
return

# Chunk all documents
all_chunks = []
for doc in documents:
print(f"\nChunking: {doc['filename']}...")
chunks = self.chunk_document(doc)
all_chunks.extend(chunks)
print(f" Generated {len(chunks)} chunks")

# Print results
self.print_chunks(all_chunks)


def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description='Test RAG chunking with LangChain splitters'
)
parser.add_argument(
'directory',
nargs='?',
default='./documents',
help='Path to directory containing documents (default: ./documents)'
)
parser.add_argument(
'--chunk-size',
type=int,
default=1024,
help='Maximum chunk size (default: 1024)'
)
parser.add_argument(
'--chunk-overlap',
type=int,
default=50,
help='Chunk overlap size (default: 50)'
)

args = parser.parse_args()

# Create tester and run
tester = RAGChunkTester(
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap
)

try:
tester.run(args.directory)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)


if __name__ == '__main__':
main()