From de897067fd71930c09f1b59c214d7cbaa65a625b Mon Sep 17 00:00:00 2001 From: 2561056571 <112464849+2561056571@users.noreply.github.com> Date: Tue, 10 Feb 2026 16:26:58 +0800 Subject: [PATCH] feat(rag-test): add RAG Pipeline testing and evaluation module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a comprehensive RAG Pipeline testing module for evaluating different document conversion methods and splitting strategies. Key features: - Document converters: Docling, Marker, Pandoc, PyPDF, Unstructured - Markdown splitters: LangChain (Header+Recursive), LlamaIndex (Node+Sentence) - Vector store: FAISS for efficient similarity search - Embedding: Qwen3 Embedding (reuses existing RAGAS config) - LLM: GLM for query rewriting and answer generation - Evaluation: RAGAS metrics (faithfulness, relevancy, precision) CLI commands: - convert: Document to Markdown conversion - index: Split and build vector index - evaluate: Run RAGAS evaluation - run: Full pipeline test - compare: Test all converter×splitter combinations - report: Generate comparison report The module operates independently as a CLI tool, reusing existing RAGAS LLM and Embedding configurations from the main project. --- backend/app/services/rag_test/__init__.py | 15 + backend/app/services/rag_test/cli.py | 606 ++++++++++++++++++ backend/app/services/rag_test/config.py | 94 +++ .../services/rag_test/converters/__init__.py | 48 ++ .../app/services/rag_test/converters/base.py | 182 ++++++ .../rag_test/converters/docling_converter.py | 75 +++ .../rag_test/converters/marker_converter.py | 70 ++ .../rag_test/converters/pandoc_converter.py | 93 +++ .../rag_test/converters/pypdf_converter.py | 85 +++ .../converters/unstructured_converter.py | 95 +++ .../services/rag_test/embeddings/__init__.py | 7 + .../rag_test/embeddings/qwen_embedding.py | 115 ++++ .../services/rag_test/evaluation/__init__.py | 7 + .../rag_test/evaluation/ragas_evaluator.py | 195 ++++++ backend/app/services/rag_test/llm/__init__.py | 7 + .../app/services/rag_test/llm/glm_client.py | 149 +++++ backend/app/services/rag_test/output.py | 255 ++++++++ backend/app/services/rag_test/pipeline.py | 497 ++++++++++++++ .../services/rag_test/retrieval/__init__.py | 7 + .../services/rag_test/retrieval/retriever.py | 86 +++ .../services/rag_test/splitters/__init__.py | 44 ++ .../app/services/rag_test/splitters/base.py | 153 +++++ .../rag_test/splitters/langchain_splitter.py | 133 ++++ .../rag_test/splitters/llamaindex_splitter.py | 113 ++++ backend/app/services/rag_test/utils.py | 127 ++++ .../services/rag_test/vectorstore/__init__.py | 7 + .../rag_test/vectorstore/faiss_store.py | 218 +++++++ backend/pyproject.toml | 19 + 28 files changed, 3502 insertions(+) create mode 100644 backend/app/services/rag_test/__init__.py create mode 100644 backend/app/services/rag_test/cli.py create mode 100644 backend/app/services/rag_test/config.py create mode 100644 backend/app/services/rag_test/converters/__init__.py create mode 100644 backend/app/services/rag_test/converters/base.py create mode 100644 backend/app/services/rag_test/converters/docling_converter.py create mode 100644 backend/app/services/rag_test/converters/marker_converter.py create mode 100644 backend/app/services/rag_test/converters/pandoc_converter.py create mode 100644 backend/app/services/rag_test/converters/pypdf_converter.py create mode 100644 backend/app/services/rag_test/converters/unstructured_converter.py create mode 100644 backend/app/services/rag_test/embeddings/__init__.py create mode 100644 backend/app/services/rag_test/embeddings/qwen_embedding.py create mode 100644 backend/app/services/rag_test/evaluation/__init__.py create mode 100644 backend/app/services/rag_test/evaluation/ragas_evaluator.py create mode 100644 backend/app/services/rag_test/llm/__init__.py create mode 100644 backend/app/services/rag_test/llm/glm_client.py create mode 100644 backend/app/services/rag_test/output.py create mode 100644 backend/app/services/rag_test/pipeline.py create mode 100644 backend/app/services/rag_test/retrieval/__init__.py create mode 100644 backend/app/services/rag_test/retrieval/retriever.py create mode 100644 backend/app/services/rag_test/splitters/__init__.py create mode 100644 backend/app/services/rag_test/splitters/base.py create mode 100644 backend/app/services/rag_test/splitters/langchain_splitter.py create mode 100644 backend/app/services/rag_test/splitters/llamaindex_splitter.py create mode 100644 backend/app/services/rag_test/utils.py create mode 100644 backend/app/services/rag_test/vectorstore/__init__.py create mode 100644 backend/app/services/rag_test/vectorstore/faiss_store.py diff --git a/backend/app/services/rag_test/__init__.py b/backend/app/services/rag_test/__init__.py new file mode 100644 index 0000000..5cfe14f --- /dev/null +++ b/backend/app/services/rag_test/__init__.py @@ -0,0 +1,15 @@ +""" +RAG Pipeline Testing and Evaluation Module. + +This module provides tools for testing different document conversion methods, +splitting strategies, and evaluating RAG generation quality using RAGAS framework. +""" + +from app.services.rag_test.config import RAGTestConfig, SplitterConfig +from app.services.rag_test.pipeline import RAGTestPipeline + +__all__ = [ + "RAGTestConfig", + "SplitterConfig", + "RAGTestPipeline", +] diff --git a/backend/app/services/rag_test/cli.py b/backend/app/services/rag_test/cli.py new file mode 100644 index 0000000..1a78aa4 --- /dev/null +++ b/backend/app/services/rag_test/cli.py @@ -0,0 +1,606 @@ +""" +CLI interface for RAG Pipeline Testing and Evaluation. + +Usage: + python -m app.services.rag_test.cli [options] + +Commands: + convert - Convert documents to Markdown + index - Split and index Markdown files + evaluate - Run evaluation on queries + run - Run complete test pipeline + compare - Run comparison tests across all combinations + report - Generate comparison report +""" + +import json +import sys +from pathlib import Path +from typing import List, Optional + +import structlog +import typer +from rich.console import Console +from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TaskProgressColumn, + TextColumn, + TimeElapsedColumn, +) +from rich.table import Table + +from app.services.rag_test.config import ( + AVAILABLE_CONVERTERS, + AVAILABLE_SPLITTERS, + RAGTestConfig, + SplitterConfig, +) +from app.services.rag_test.converters import get_converter +from app.services.rag_test.output import ComparisonReport, TestReport, load_test_reports +from app.services.rag_test.pipeline import RAGTestPipeline +from app.services.rag_test.splitters import get_splitter +from app.services.rag_test.utils import ensure_directory, get_timestamp + +logger = structlog.get_logger(__name__) + +app = typer.Typer( + name="rag_test", + help="RAG Pipeline Testing and Evaluation Tool", + add_completion=False, +) +console = Console() + + +def create_progress() -> Progress: + """Create a Rich progress bar.""" + return Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + TimeElapsedColumn(), + console=console, + ) + + +@app.command() +def convert( + source_dir: Path = typer.Option( + ..., "--source-dir", "-s", help="Source documents directory" + ), + converter: str = typer.Option( + "docling", + "--converter", + "-c", + help=f"Converter: {', '.join(AVAILABLE_CONVERTERS)}", + ), + output_dir: Path = typer.Option( + ..., "--output-dir", "-o", help="Output directory for Markdown files" + ), +): + """Convert documents to Markdown format.""" + console.print(f"\n[bold blue]RAG Test: Document Conversion[/bold blue]") + console.print(f"Converter: [cyan]{converter}[/cyan]") + console.print(f"Source: [cyan]{source_dir}[/cyan]") + console.print(f"Output: [cyan]{output_dir}[/cyan]\n") + + if converter not in AVAILABLE_CONVERTERS: + console.print( + f"[red]Error: Unknown converter '{converter}'. " + f"Available: {', '.join(AVAILABLE_CONVERTERS)}[/red]" + ) + raise typer.Exit(1) + + try: + conv = get_converter(converter) + + with create_progress() as progress: + task = progress.add_task("[cyan]Converting documents...", total=None) + + results, success, failure = conv.convert_directory( + source_dir=source_dir, + output_dir=output_dir, + recursive=True, + ) + + progress.update(task, completed=True) + + console.print(f"\n[green]Conversion complete![/green]") + console.print(f" Success: [green]{success}[/green]") + console.print(f" Failed: [red]{failure}[/red]") + console.print(f" Output: [cyan]{output_dir}[/cyan]") + + except Exception as e: + console.print(f"\n[red]Error: {e}[/red]") + raise typer.Exit(1) + + +@app.command() +def index( + markdown_dir: Path = typer.Option( + ..., "--markdown-dir", "-m", help="Markdown files directory" + ), + splitter: str = typer.Option( + "langchain", + "--splitter", + "-sp", + help=f"Splitter: {', '.join(AVAILABLE_SPLITTERS)}", + ), + chunk_size: int = typer.Option(1024, "--chunk-size", help="Chunk size"), + chunk_overlap: int = typer.Option(50, "--chunk-overlap", help="Chunk overlap"), + output_index: Path = typer.Option( + ..., "--output-index", "-o", help="Output index directory" + ), +): + """Split Markdown files and build vector index.""" + console.print(f"\n[bold blue]RAG Test: Indexing[/bold blue]") + console.print(f"Splitter: [cyan]{splitter}[/cyan]") + console.print(f"Chunk size: [cyan]{chunk_size}[/cyan]") + console.print(f"Chunk overlap: [cyan]{chunk_overlap}[/cyan]\n") + + if splitter not in AVAILABLE_SPLITTERS: + console.print( + f"[red]Error: Unknown splitter '{splitter}'. " + f"Available: {', '.join(AVAILABLE_SPLITTERS)}[/red]" + ) + raise typer.Exit(1) + + try: + # Create minimal config for pipeline + config = RAGTestConfig( + source_directory=str(markdown_dir), + queries_file="", + output_directory=str(output_index), + ) + pipeline = RAGTestPipeline(config) + + with create_progress() as progress: + # Track progress + current_stage = {"name": "", "task": None} + + def progress_callback(stage: str, current: int, total: int): + if current_stage["name"] != stage: + if current_stage["task"]: + progress.update(current_stage["task"], completed=total) + current_stage["name"] = stage + current_stage["task"] = progress.add_task( + f"[cyan]{stage.capitalize()}...", total=total + ) + progress.update(current_stage["task"], completed=current) + + pipeline.set_progress_callback(progress_callback) + + chunks, index_path = pipeline.split_and_index( + splitter_name=splitter, + markdown_dir=markdown_dir, + index_dir=output_index, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + console.print(f"\n[green]Indexing complete![/green]") + console.print(f" Total chunks: [cyan]{len(chunks)}[/cyan]") + console.print(f" Index saved to: [cyan]{index_path}[/cyan]") + + except Exception as e: + console.print(f"\n[red]Error: {e}[/red]") + raise typer.Exit(1) + + +@app.command() +def evaluate( + index_path: Path = typer.Option( + ..., "--index-path", "-i", help="FAISS index path" + ), + queries: Path = typer.Option( + ..., "--queries", "-q", help="Queries file (txt, one per line)" + ), + output: Path = typer.Option(..., "--output", "-o", help="Output JSON file"), + converter: str = typer.Option( + "unknown", "--converter", "-c", help="Converter name for report" + ), + splitter: str = typer.Option( + "unknown", "--splitter", "-sp", help="Splitter name for report" + ), +): + """Run evaluation on queries using pre-built index.""" + console.print(f"\n[bold blue]RAG Test: Evaluation[/bold blue]") + console.print(f"Index: [cyan]{index_path}[/cyan]") + console.print(f"Queries: [cyan]{queries}[/cyan]\n") + + try: + config = RAGTestConfig( + source_directory="", + queries_file=str(queries), + output_directory=str(output.parent), + ) + pipeline = RAGTestPipeline(config) + + with create_progress() as progress: + task = progress.add_task("[cyan]Evaluating queries...", total=None) + + def progress_callback(stage: str, current: int, total: int): + progress.update(task, total=total, completed=current) + + pipeline.set_progress_callback(progress_callback) + + report = pipeline.evaluate_queries( + index_path=index_path, + queries_file=queries, + output_file=output, + converter_name=converter, + splitter_name=splitter, + ) + + # Display summary + summary = report.calculate_summary() + console.print(f"\n[green]Evaluation complete![/green]") + console.print(f" Total queries: [cyan]{summary['total_queries']}[/cyan]") + console.print( + f" Avg Faithfulness: [cyan]{summary['avg_faithfulness'] or 'N/A'}[/cyan]" + ) + console.print( + f" Avg Answer Relevancy: [cyan]{summary['avg_answer_relevancy'] or 'N/A'}[/cyan]" + ) + console.print( + f" Avg Context Precision: [cyan]{summary['avg_context_precision'] or 'N/A'}[/cyan]" + ) + console.print( + f" Avg Overall Score: [cyan]{summary['avg_overall_score'] or 'N/A'}[/cyan]" + ) + + except Exception as e: + console.print(f"\n[red]Error: {e}[/red]") + raise typer.Exit(1) + + +@app.command() +def run( + source_dir: Path = typer.Option( + ..., "--source-dir", "-s", help="Source documents directory" + ), + queries: Path = typer.Option(..., "--queries", "-q", help="Queries file"), + converter: str = typer.Option("docling", "--converter", "-c"), + splitter: str = typer.Option("langchain", "--splitter", "-sp"), + chunk_size: int = typer.Option(1024, "--chunk-size"), + chunk_overlap: int = typer.Option(50, "--chunk-overlap"), + output: Path = typer.Option( + ..., "--output", "-o", help="Output directory for results" + ), +): + """Run complete test pipeline.""" + console.print(f"\n[bold blue]RAG Test: Full Pipeline[/bold blue]") + console.print(f"Converter: [cyan]{converter}[/cyan]") + console.print(f"Splitter: [cyan]{splitter}[/cyan]") + console.print(f"Chunk size: [cyan]{chunk_size}[/cyan]") + console.print(f"Chunk overlap: [cyan]{chunk_overlap}[/cyan]\n") + + if converter not in AVAILABLE_CONVERTERS: + console.print(f"[red]Error: Unknown converter '{converter}'[/red]") + raise typer.Exit(1) + + if splitter not in AVAILABLE_SPLITTERS: + console.print(f"[red]Error: Unknown splitter '{splitter}'[/red]") + raise typer.Exit(1) + + try: + config = RAGTestConfig( + source_directory=str(source_dir), + queries_file=str(queries), + output_directory=str(output), + ) + pipeline = RAGTestPipeline(config) + + with create_progress() as progress: + tasks = {} + + def progress_callback(stage: str, current: int, total: int): + if stage not in tasks: + tasks[stage] = progress.add_task( + f"[cyan]{stage.capitalize()}...", total=total + ) + progress.update(tasks[stage], completed=current, total=total) + + pipeline.set_progress_callback(progress_callback) + + report = pipeline.run_full_test( + converter_name=converter, + splitter_name=splitter, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + # Display results + summary = report.calculate_summary() + console.print(f"\n[green]Test complete![/green]") + _print_summary_table(summary, converter, splitter) + + except Exception as e: + console.print(f"\n[red]Error: {e}[/red]") + logger.exception("Pipeline failed", error=str(e)) + raise typer.Exit(1) + + +@app.command() +def compare( + source_dir: Path = typer.Option( + ..., "--source-dir", "-s", help="Source documents directory" + ), + queries: Path = typer.Option(..., "--queries", "-q", help="Queries file"), + converters: Optional[List[str]] = typer.Option( + None, "--converters", help="Converters to test (comma-separated or multiple flags)" + ), + splitters: Optional[List[str]] = typer.Option( + None, "--splitters", help="Splitters to test (comma-separated or multiple flags)" + ), + chunk_size: int = typer.Option(1024, "--chunk-size"), + chunk_overlap: int = typer.Option(50, "--chunk-overlap"), + output_dir: Path = typer.Option( + ..., "--output-dir", "-o", help="Output directory for results" + ), +): + """Run comparison tests across all converter+splitter combinations.""" + # Default to all converters and splitters if not specified + conv_list = converters or AVAILABLE_CONVERTERS + split_list = splitters or AVAILABLE_SPLITTERS + + total_tests = len(conv_list) * len(split_list) + + console.print(f"\n[bold blue]RAG Test: Comparison[/bold blue]") + console.print(f"Converters: [cyan]{', '.join(conv_list)}[/cyan]") + console.print(f"Splitters: [cyan]{', '.join(split_list)}[/cyan]") + console.print(f"Total tests: [cyan]{total_tests}[/cyan]\n") + + ensure_directory(output_dir) + reports = [] + + with create_progress() as progress: + main_task = progress.add_task( + f"[bold blue]Running {total_tests} test combinations...", + total=total_tests, + ) + + test_num = 0 + for conv in conv_list: + for split in split_list: + test_num += 1 + console.print( + f"\n[yellow]Test {test_num}/{total_tests}: {conv} + {split}[/yellow]" + ) + + try: + config = RAGTestConfig( + source_directory=str(source_dir), + queries_file=str(queries), + output_directory=str(output_dir), + ) + pipeline = RAGTestPipeline(config) + + report = pipeline.run_full_test( + converter_name=conv, + splitter_name=split, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + reports.append(report) + + summary = report.calculate_summary() + console.print( + f" [green]Overall: {summary['avg_overall_score'] or 'N/A'}[/green]" + ) + + except Exception as e: + console.print(f" [red]Failed: {e}[/red]") + logger.exception(f"Test {conv}+{split} failed", error=str(e)) + + progress.update(main_task, advance=1) + + # Generate comparison report + if reports: + comparison = ComparisonReport(reports) + report_path = output_dir / f"comparison_report_{get_timestamp()}.json" + comparison.save(report_path) + + console.print(f"\n[green]All {total_tests} tests completed![/green]") + console.print(f"Results saved to: [cyan]{output_dir}[/cyan]") + console.print(f"Comparison report: [cyan]{report_path}[/cyan]") + + # Print comparison table + _print_comparison_table(reports) + else: + console.print(f"\n[red]All tests failed![/red]") + raise typer.Exit(1) + + +@app.command() +def report( + results_dir: Path = typer.Option( + ..., + "--results-dir", + "-r", + help="Directory containing test result JSON files", + ), + output: Path = typer.Option( + ..., "--output", "-o", help="Output comparison report JSON" + ), +): + """Generate comparison report from existing test results.""" + console.print(f"\n[bold blue]RAG Test: Generate Report[/bold blue]") + console.print(f"Results directory: [cyan]{results_dir}[/cyan]\n") + + try: + # Load existing reports + report_data = load_test_reports(results_dir) + + if not report_data: + console.print(f"[red]No test reports found in {results_dir}[/red]") + raise typer.Exit(1) + + console.print(f"Found [cyan]{len(report_data)}[/cyan] test reports") + + # Create comparison data + comparison = { + "generated_time": get_timestamp(), + "total_tests": len(report_data), + "comparisons": [], + } + + for data in report_data: + summary = data.get("summary", {}) + comparison["comparisons"].append({ + "converter": data.get("test_config", {}).get("converter", "unknown"), + "splitter": data.get("test_config", {}).get("splitter", "unknown"), + "statistics": data.get("statistics", {}), + "scores": { + "avg_faithfulness": summary.get("avg_faithfulness"), + "avg_answer_relevancy": summary.get("avg_answer_relevancy"), + "avg_context_precision": summary.get("avg_context_precision"), + "avg_overall_score": summary.get("avg_overall_score"), + }, + "total_queries": summary.get("total_queries", 0), + }) + + # Sort by overall score + comparison["comparisons"].sort( + key=lambda x: x["scores"]["avg_overall_score"] or 0, + reverse=True, + ) + + if comparison["comparisons"]: + comparison["best_combination"] = comparison["comparisons"][0] + + # Save comparison report + ensure_directory(output.parent) + with open(output, "w", encoding="utf-8") as f: + json.dump(comparison, f, ensure_ascii=False, indent=2) + + console.print(f"\n[green]Report generated![/green]") + console.print(f"Saved to: [cyan]{output}[/cyan]") + + # Print table + _print_comparison_data(comparison["comparisons"]) + + except Exception as e: + console.print(f"\n[red]Error: {e}[/red]") + raise typer.Exit(1) + + +def _print_summary_table(summary: dict, converter: str, splitter: str): + """Print a summary table.""" + table = Table(title="Test Results Summary") + table.add_column("Metric", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Converter", converter) + table.add_row("Splitter", splitter) + table.add_row("Total Queries", str(summary.get("total_queries", 0))) + table.add_row( + "Avg Faithfulness", + f"{summary.get('avg_faithfulness', 'N/A'):.4f}" + if summary.get("avg_faithfulness") + else "N/A", + ) + table.add_row( + "Avg Answer Relevancy", + f"{summary.get('avg_answer_relevancy', 'N/A'):.4f}" + if summary.get("avg_answer_relevancy") + else "N/A", + ) + table.add_row( + "Avg Context Precision", + f"{summary.get('avg_context_precision', 'N/A'):.4f}" + if summary.get("avg_context_precision") + else "N/A", + ) + table.add_row( + "Avg Overall Score", + f"{summary.get('avg_overall_score', 'N/A'):.4f}" + if summary.get("avg_overall_score") + else "N/A", + ) + + console.print(table) + + +def _print_comparison_table(reports: List[TestReport]): + """Print a comparison table from test reports.""" + table = Table(title="Comparison Results") + table.add_column("Converter", style="cyan") + table.add_column("Splitter", style="cyan") + table.add_column("Faithfulness", style="green") + table.add_column("Relevancy", style="green") + table.add_column("Precision", style="green") + table.add_column("Overall", style="bold green") + + # Sort by overall score + sorted_reports = sorted( + reports, + key=lambda r: r.calculate_summary().get("avg_overall_score") or 0, + reverse=True, + ) + + for report in sorted_reports: + summary = report.calculate_summary() + table.add_row( + report.converter, + report.splitter, + f"{summary.get('avg_faithfulness', 0):.4f}" + if summary.get("avg_faithfulness") + else "N/A", + f"{summary.get('avg_answer_relevancy', 0):.4f}" + if summary.get("avg_answer_relevancy") + else "N/A", + f"{summary.get('avg_context_precision', 0):.4f}" + if summary.get("avg_context_precision") + else "N/A", + f"{summary.get('avg_overall_score', 0):.4f}" + if summary.get("avg_overall_score") + else "N/A", + ) + + console.print(table) + + +def _print_comparison_data(comparisons: List[dict]): + """Print comparison data from loaded reports.""" + table = Table(title="Comparison Results") + table.add_column("Rank", style="bold") + table.add_column("Converter", style="cyan") + table.add_column("Splitter", style="cyan") + table.add_column("Faithfulness", style="green") + table.add_column("Relevancy", style="green") + table.add_column("Precision", style="green") + table.add_column("Overall", style="bold green") + + for idx, comp in enumerate(comparisons, 1): + scores = comp.get("scores", {}) + table.add_row( + str(idx), + comp.get("converter", "unknown"), + comp.get("splitter", "unknown"), + f"{scores.get('avg_faithfulness', 0):.4f}" + if scores.get("avg_faithfulness") + else "N/A", + f"{scores.get('avg_answer_relevancy', 0):.4f}" + if scores.get("avg_answer_relevancy") + else "N/A", + f"{scores.get('avg_context_precision', 0):.4f}" + if scores.get("avg_context_precision") + else "N/A", + f"{scores.get('avg_overall_score', 0):.4f}" + if scores.get("avg_overall_score") + else "N/A", + ) + + console.print(table) + + +def main(): + """Main entry point.""" + app() + + +if __name__ == "__main__": + main() diff --git a/backend/app/services/rag_test/config.py b/backend/app/services/rag_test/config.py new file mode 100644 index 0000000..fd2c692 --- /dev/null +++ b/backend/app/services/rag_test/config.py @@ -0,0 +1,94 @@ +""" +Configuration models for RAG Pipeline Testing. + +This module defines Pydantic models for configuring RAG tests, +reusing existing RAGAS LLM and Embedding configurations from the main project. +""" + +import os +from typing import List, Literal, Optional, Tuple + +from pydantic import BaseModel, Field + +from app.core.config import settings + + +class SplitterConfig(BaseModel): + """Splitter configuration.""" + + name: Literal["langchain", "llamaindex"] = "langchain" + chunk_size: int = 1024 + chunk_overlap: int = 50 + headers_to_split_on: List[Tuple[str, str]] = [ + ("#", "Header 1"), + ("##", "Header 2"), + ("###", "Header 3"), + ] + + class Config: + frozen = False + + +class EmbeddingConfig(BaseModel): + """Embedding configuration - reuses existing RAGAS configuration.""" + + model: str = Field(default_factory=lambda: settings.RAGAS_EMBEDDING_MODEL) + api_key: str = Field(default_factory=lambda: settings.RAGAS_EMBEDDING_API_KEY) + base_url: str = Field(default_factory=lambda: settings.RAGAS_EMBEDDING_BASE_URL) + + +class LLMConfig(BaseModel): + """LLM configuration for query rewriting, answer generation, and RAGAS evaluation.""" + + # GLM for query rewriting and answer generation + glm_model: str = Field(default_factory=lambda: os.getenv("GLM_MODEL", "glm-4")) + glm_api_key: str = Field(default_factory=lambda: os.getenv("GLM_API_KEY", "")) + glm_base_url: str = Field( + default_factory=lambda: os.getenv( + "GLM_BASE_URL", "https://open.bigmodel.cn/api/paas/v4" + ) + ) + + # RAGAS evaluation LLM - reuses existing configuration + ragas_model: str = Field(default_factory=lambda: settings.RAGAS_LLM_MODEL) + ragas_api_key: str = Field(default_factory=lambda: settings.RAGAS_LLM_API_KEY) + ragas_base_url: str = Field(default_factory=lambda: settings.RAGAS_LLM_BASE_URL) + + +class RetrievalConfig(BaseModel): + """Retrieval configuration.""" + + top_k: int = 5 + + +class RAGTestConfig(BaseModel): + """Complete RAG test configuration.""" + + # Required paths + source_directory: str + queries_file: str + output_directory: str + + # Converter and splitter selection + converters: List[ + Literal["docling", "marker", "pandoc", "pypdf", "unstructured"] + ] = ["docling"] + splitter: SplitterConfig = Field(default_factory=SplitterConfig) + + # Component configurations + embedding: EmbeddingConfig = Field(default_factory=EmbeddingConfig) + llm: LLMConfig = Field(default_factory=LLMConfig) + retrieval: RetrievalConfig = Field(default_factory=RetrievalConfig) + + # Optional: temporary directory for intermediate files + temp_directory: Optional[str] = None + + class Config: + arbitrary_types_allowed = True + + +# Available converters +AVAILABLE_CONVERTERS = ["docling", "marker", "pandoc", "pypdf", "unstructured"] + +# Available splitters +AVAILABLE_SPLITTERS = ["langchain", "llamaindex"] diff --git a/backend/app/services/rag_test/converters/__init__.py b/backend/app/services/rag_test/converters/__init__.py new file mode 100644 index 0000000..08d38d7 --- /dev/null +++ b/backend/app/services/rag_test/converters/__init__.py @@ -0,0 +1,48 @@ +""" +Document converters package for converting various document formats to Markdown. +""" + +from app.services.rag_test.converters.base import BaseConverter +from app.services.rag_test.converters.docling_converter import DoclingConverter +from app.services.rag_test.converters.marker_converter import MarkerConverter +from app.services.rag_test.converters.pandoc_converter import PandocConverter +from app.services.rag_test.converters.pypdf_converter import PyPDFConverter +from app.services.rag_test.converters.unstructured_converter import UnstructuredConverter + +__all__ = [ + "BaseConverter", + "DoclingConverter", + "MarkerConverter", + "PandocConverter", + "PyPDFConverter", + "UnstructuredConverter", +] + + +def get_converter(name: str) -> BaseConverter: + """ + Get a converter instance by name. + + Args: + name: Converter name ('docling', 'marker', 'pandoc', 'pypdf', 'unstructured') + + Returns: + Converter instance + + Raises: + ValueError: If converter name is not recognized + """ + converters = { + "docling": DoclingConverter, + "marker": MarkerConverter, + "pandoc": PandocConverter, + "pypdf": PyPDFConverter, + "unstructured": UnstructuredConverter, + } + + if name not in converters: + raise ValueError( + f"Unknown converter: {name}. Available converters: {list(converters.keys())}" + ) + + return converters[name]() diff --git a/backend/app/services/rag_test/converters/base.py b/backend/app/services/rag_test/converters/base.py new file mode 100644 index 0000000..6eeb81b --- /dev/null +++ b/backend/app/services/rag_test/converters/base.py @@ -0,0 +1,182 @@ +""" +Base class for document to Markdown converters. +""" + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, Optional, Tuple + +import structlog + +from app.services.rag_test.utils import ensure_directory, sanitize_filename + +logger = structlog.get_logger(__name__) + + +class ConversionResult: + """Result of a document conversion.""" + + def __init__( + self, + source_file: Path, + output_file: Optional[Path] = None, + success: bool = True, + error: Optional[str] = None, + markdown_content: Optional[str] = None, + ): + self.source_file = source_file + self.output_file = output_file + self.success = success + self.error = error + self.markdown_content = markdown_content + + +class BaseConverter(ABC): + """Abstract base class for document to Markdown converters.""" + + @property + @abstractmethod + def name(self) -> str: + """Get the converter name.""" + pass + + @property + @abstractmethod + def supported_extensions(self) -> List[str]: + """Get list of supported file extensions (with dot, e.g., '.pdf').""" + pass + + @abstractmethod + def convert(self, file_path: Path) -> str: + """ + Convert a single file to Markdown. + + Args: + file_path: Path to the file to convert + + Returns: + Markdown content as string + + Raises: + Exception: If conversion fails + """ + pass + + def is_supported(self, file_path: Path) -> bool: + """Check if the file is supported by this converter.""" + return file_path.suffix.lower() in [ + ext.lower() for ext in self.supported_extensions + ] + + def convert_file( + self, file_path: Path, output_dir: Optional[Path] = None + ) -> ConversionResult: + """ + Convert a single file and optionally save to output directory. + + Args: + file_path: Path to the file to convert + output_dir: Optional output directory to save the Markdown file + + Returns: + ConversionResult with conversion status and output + """ + file_path = Path(file_path) + + if not file_path.exists(): + return ConversionResult( + source_file=file_path, + success=False, + error=f"File not found: {file_path}", + ) + + if not self.is_supported(file_path): + return ConversionResult( + source_file=file_path, + success=False, + error=f"Unsupported file format: {file_path.suffix}", + ) + + try: + logger.info(f"Converting file with {self.name}", file=str(file_path)) + markdown_content = self.convert(file_path) + + output_file = None + if output_dir: + output_dir = ensure_directory(output_dir) + output_filename = sanitize_filename(file_path.stem) + ".md" + output_file = output_dir / output_filename + with open(output_file, "w", encoding="utf-8") as f: + f.write(markdown_content) + logger.info(f"Saved markdown to {output_file}") + + return ConversionResult( + source_file=file_path, + output_file=output_file, + success=True, + markdown_content=markdown_content, + ) + + except Exception as e: + logger.exception(f"Conversion failed for {file_path}", error=str(e)) + return ConversionResult( + source_file=file_path, + success=False, + error=str(e), + ) + + def convert_directory( + self, source_dir: Path, output_dir: Path, recursive: bool = True + ) -> Tuple[List[ConversionResult], int, int]: + """ + Convert all supported files in a directory. + + Args: + source_dir: Source directory containing documents + output_dir: Output directory for Markdown files + recursive: Whether to search recursively + + Returns: + Tuple of (results list, success count, failure count) + """ + source_dir = Path(source_dir) + output_dir = ensure_directory(output_dir) + + if not source_dir.exists(): + logger.error(f"Source directory does not exist: {source_dir}") + return [], 0, 0 + + # Find all supported files + files = [] + for ext in self.supported_extensions: + pattern = f"**/*{ext}" if recursive else f"*{ext}" + files.extend(source_dir.glob(pattern)) + + files = sorted(set(files)) + logger.info( + f"Found {len(files)} files to convert with {self.name}", + source_dir=str(source_dir), + ) + + results = [] + success_count = 0 + failure_count = 0 + + for file_path in files: + # Maintain directory structure in output + relative_path = file_path.relative_to(source_dir) + file_output_dir = output_dir / relative_path.parent + ensure_directory(file_output_dir) + + result = self.convert_file(file_path, file_output_dir) + results.append(result) + + if result.success: + success_count += 1 + else: + failure_count += 1 + + logger.info( + f"Conversion complete: {success_count} succeeded, {failure_count} failed" + ) + return results, success_count, failure_count diff --git a/backend/app/services/rag_test/converters/docling_converter.py b/backend/app/services/rag_test/converters/docling_converter.py new file mode 100644 index 0000000..e36604c --- /dev/null +++ b/backend/app/services/rag_test/converters/docling_converter.py @@ -0,0 +1,75 @@ +""" +Docling converter for converting documents to Markdown. + +Docling supports: PDF, PPT, Word, MD, TXT with complex document structure extraction. +""" + +from pathlib import Path +from typing import List + +import structlog + +from app.services.rag_test.converters.base import BaseConverter + +logger = structlog.get_logger(__name__) + + +class DoclingConverter(BaseConverter): + """ + Docling-based document converter. + + Supports PDF, PowerPoint, Word, Markdown, and plain text files. + Preserves document structure including headings, tables, and lists. + """ + + @property + def name(self) -> str: + return "docling" + + @property + def supported_extensions(self) -> List[str]: + return [".pdf", ".pptx", ".ppt", ".docx", ".doc", ".md", ".txt"] + + def __init__(self): + self._converter = None + + def _get_converter(self): + """Lazy initialization of docling converter.""" + if self._converter is None: + try: + from docling.document_converter import DocumentConverter + + self._converter = DocumentConverter() + except ImportError as e: + raise ImportError( + "docling is not installed. Please install it with: pip install docling" + ) from e + return self._converter + + def convert(self, file_path: Path) -> str: + """ + Convert a document to Markdown using Docling. + + Args: + file_path: Path to the document + + Returns: + Markdown content + """ + file_path = Path(file_path) + converter = self._get_converter() + + logger.info(f"Converting with Docling: {file_path}") + + # For plain text and markdown, read directly + if file_path.suffix.lower() in [".txt", ".md"]: + with open(file_path, "r", encoding="utf-8") as f: + return f.read() + + # Use Docling for other formats + result = converter.convert(str(file_path)) + + # Export to markdown + markdown_content = result.document.export_to_markdown() + + return markdown_content diff --git a/backend/app/services/rag_test/converters/marker_converter.py b/backend/app/services/rag_test/converters/marker_converter.py new file mode 100644 index 0000000..fc7c563 --- /dev/null +++ b/backend/app/services/rag_test/converters/marker_converter.py @@ -0,0 +1,70 @@ +""" +Marker converter for converting PDF documents to Markdown. + +Marker is specialized for PDF conversion with good structure preservation. +""" + +from pathlib import Path +from typing import List + +import structlog + +from app.services.rag_test.converters.base import BaseConverter + +logger = structlog.get_logger(__name__) + + +class MarkerConverter(BaseConverter): + """ + Marker-based PDF converter. + + Specialized for PDF to Markdown conversion with structure preservation. + """ + + @property + def name(self) -> str: + return "marker" + + @property + def supported_extensions(self) -> List[str]: + return [".pdf"] + + def __init__(self): + self._converter = None + + def _get_converter(self): + """Lazy initialization of marker converter.""" + if self._converter is None: + try: + from marker.converters.pdf import PdfConverter + from marker.models import create_model_dict + + # Create model dict for marker + self._model_dict = create_model_dict() + self._converter = PdfConverter(artifact_dict=self._model_dict) + except ImportError as e: + raise ImportError( + "marker-pdf is not installed. Please install it with: pip install marker-pdf" + ) from e + return self._converter + + def convert(self, file_path: Path) -> str: + """ + Convert a PDF to Markdown using Marker. + + Args: + file_path: Path to the PDF file + + Returns: + Markdown content + """ + file_path = Path(file_path) + converter = self._get_converter() + + logger.info(f"Converting with Marker: {file_path}") + + # Convert PDF to markdown + rendered = converter(str(file_path)) + markdown_content = rendered.markdown + + return markdown_content diff --git a/backend/app/services/rag_test/converters/pandoc_converter.py b/backend/app/services/rag_test/converters/pandoc_converter.py new file mode 100644 index 0000000..e30530a --- /dev/null +++ b/backend/app/services/rag_test/converters/pandoc_converter.py @@ -0,0 +1,93 @@ +""" +Pandoc converter for converting documents to Markdown. + +Pandoc is a universal document converter supporting many formats. +""" + +from pathlib import Path +from typing import List + +import structlog + +from app.services.rag_test.converters.base import BaseConverter + +logger = structlog.get_logger(__name__) + + +class PandocConverter(BaseConverter): + """ + Pandoc-based document converter. + + Uses pypandoc to convert various document formats to Markdown. + Requires pandoc to be installed on the system. + """ + + @property + def name(self) -> str: + return "pandoc" + + @property + def supported_extensions(self) -> List[str]: + return [".pdf", ".pptx", ".ppt", ".docx", ".doc", ".md", ".txt", ".html", ".rst"] + + def __init__(self): + self._pypandoc = None + + def _get_pypandoc(self): + """Lazy initialization of pypandoc.""" + if self._pypandoc is None: + try: + import pypandoc + + self._pypandoc = pypandoc + except ImportError as e: + raise ImportError( + "pypandoc is not installed. Please install it with: pip install pypandoc" + ) from e + return self._pypandoc + + def convert(self, file_path: Path) -> str: + """ + Convert a document to Markdown using Pandoc. + + Args: + file_path: Path to the document + + Returns: + Markdown content + """ + file_path = Path(file_path) + pypandoc = self._get_pypandoc() + + logger.info(f"Converting with Pandoc: {file_path}") + + # For plain text, read directly + if file_path.suffix.lower() == ".txt": + with open(file_path, "r", encoding="utf-8") as f: + return f.read() + + # For markdown, read directly + if file_path.suffix.lower() == ".md": + with open(file_path, "r", encoding="utf-8") as f: + return f.read() + + # Determine input format + format_map = { + ".pdf": "pdf", + ".docx": "docx", + ".doc": "doc", + ".pptx": "pptx", + ".ppt": "ppt", + ".html": "html", + ".rst": "rst", + } + input_format = format_map.get(file_path.suffix.lower()) + + # Convert using pandoc + markdown_content = pypandoc.convert_file( + str(file_path), + "markdown", + format=input_format, + ) + + return markdown_content diff --git a/backend/app/services/rag_test/converters/pypdf_converter.py b/backend/app/services/rag_test/converters/pypdf_converter.py new file mode 100644 index 0000000..843ab22 --- /dev/null +++ b/backend/app/services/rag_test/converters/pypdf_converter.py @@ -0,0 +1,85 @@ +""" +PyPDF converter for extracting text from PDF documents. + +Uses PyMuPDF (fitz) for lightweight PDF text extraction. +""" + +from pathlib import Path +from typing import List + +import structlog + +from app.services.rag_test.converters.base import BaseConverter + +logger = structlog.get_logger(__name__) + + +class PyPDFConverter(BaseConverter): + """ + PyMuPDF-based PDF converter. + + Lightweight PDF text extraction using PyMuPDF (fitz). + Good for simple PDFs without complex formatting needs. + """ + + @property + def name(self) -> str: + return "pypdf" + + @property + def supported_extensions(self) -> List[str]: + return [".pdf"] + + def __init__(self): + self._fitz = None + + def _get_fitz(self): + """Lazy initialization of PyMuPDF.""" + if self._fitz is None: + try: + import fitz # PyMuPDF + + self._fitz = fitz + except ImportError as e: + raise ImportError( + "PyMuPDF is not installed. Please install it with: pip install pymupdf" + ) from e + return self._fitz + + def convert(self, file_path: Path) -> str: + """ + Convert a PDF to Markdown using PyMuPDF. + + Args: + file_path: Path to the PDF file + + Returns: + Markdown content (plain text with basic formatting) + """ + file_path = Path(file_path) + fitz = self._get_fitz() + + logger.info(f"Converting with PyMuPDF: {file_path}") + + doc = fitz.open(str(file_path)) + markdown_parts = [] + + for page_num, page in enumerate(doc, 1): + # Add page header + markdown_parts.append(f"\n## Page {page_num}\n") + + # Extract text with basic formatting + text = page.get_text("text") + + # Clean up the text + lines = [] + for line in text.split("\n"): + line = line.strip() + if line: + lines.append(line) + + markdown_parts.append("\n".join(lines)) + + doc.close() + + return "\n\n".join(markdown_parts) diff --git a/backend/app/services/rag_test/converters/unstructured_converter.py b/backend/app/services/rag_test/converters/unstructured_converter.py new file mode 100644 index 0000000..c07cbfa --- /dev/null +++ b/backend/app/services/rag_test/converters/unstructured_converter.py @@ -0,0 +1,95 @@ +""" +Unstructured.io converter for converting documents to Markdown. + +Unstructured provides general-purpose document parsing. +""" + +from pathlib import Path +from typing import List + +import structlog + +from app.services.rag_test.converters.base import BaseConverter + +logger = structlog.get_logger(__name__) + + +class UnstructuredConverter(BaseConverter): + """ + Unstructured.io-based document converter. + + Uses the unstructured library for general document parsing. + Supports multiple file formats with element-based extraction. + """ + + @property + def name(self) -> str: + return "unstructured" + + @property + def supported_extensions(self) -> List[str]: + return [".pdf", ".pptx", ".ppt", ".docx", ".doc", ".md", ".txt", ".html"] + + def convert(self, file_path: Path) -> str: + """ + Convert a document to Markdown using Unstructured. + + Args: + file_path: Path to the document + + Returns: + Markdown content + """ + file_path = Path(file_path) + + logger.info(f"Converting with Unstructured: {file_path}") + + # For plain text and markdown, read directly + if file_path.suffix.lower() in [".txt", ".md"]: + with open(file_path, "r", encoding="utf-8") as f: + return f.read() + + try: + from unstructured.partition.auto import partition + except ImportError as e: + raise ImportError( + "unstructured is not installed. Please install it with: pip install unstructured" + ) from e + + # Partition the document into elements + elements = partition(str(file_path)) + + # Convert elements to markdown + markdown_parts = [] + current_heading_level = 0 + + for element in elements: + element_type = type(element).__name__ + text = str(element) + + if not text.strip(): + continue + + # Format based on element type + if element_type == "Title": + markdown_parts.append(f"\n# {text}\n") + current_heading_level = 1 + elif element_type == "Header": + # Increment heading level for nested headers + level = min(current_heading_level + 1, 6) + markdown_parts.append(f"\n{'#' * level} {text}\n") + elif element_type == "ListItem": + markdown_parts.append(f"- {text}") + elif element_type == "NarrativeText": + markdown_parts.append(f"\n{text}\n") + elif element_type == "Table": + # Tables are already formatted by unstructured + markdown_parts.append(f"\n{text}\n") + elif element_type == "Image": + # Skip images or add placeholder + markdown_parts.append(f"\n[Image: {text}]\n") + else: + # Default: treat as paragraph + markdown_parts.append(f"\n{text}\n") + + return "\n".join(markdown_parts) diff --git a/backend/app/services/rag_test/embeddings/__init__.py b/backend/app/services/rag_test/embeddings/__init__.py new file mode 100644 index 0000000..be6901d --- /dev/null +++ b/backend/app/services/rag_test/embeddings/__init__.py @@ -0,0 +1,7 @@ +""" +Embeddings package for text embedding generation. +""" + +from app.services.rag_test.embeddings.qwen_embedding import QwenEmbedding + +__all__ = ["QwenEmbedding"] diff --git a/backend/app/services/rag_test/embeddings/qwen_embedding.py b/backend/app/services/rag_test/embeddings/qwen_embedding.py new file mode 100644 index 0000000..06da71d --- /dev/null +++ b/backend/app/services/rag_test/embeddings/qwen_embedding.py @@ -0,0 +1,115 @@ +""" +Qwen Embedding client for text embedding generation. + +Reuses the existing RAGAS embedding configuration from the main project. +""" + +from typing import List, Optional + +import structlog +from langchain_openai import OpenAIEmbeddings + +from app.core.config import settings + +logger = structlog.get_logger(__name__) + + +class QwenEmbedding: + """ + Qwen Embedding client using OpenAI-compatible API. + + Reuses the existing RAGAS embedding configuration. + """ + + def __init__( + self, + model: Optional[str] = None, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + ): + """ + Initialize the embedding client. + + Args: + model: Embedding model name (default: from settings) + api_key: API key (default: from settings) + base_url: API base URL (default: from settings) + """ + self.model = model or settings.RAGAS_EMBEDDING_MODEL + self.api_key = api_key or settings.RAGAS_EMBEDDING_API_KEY + self.base_url = base_url or settings.RAGAS_EMBEDDING_BASE_URL + self._client = None + self._dimension = None + + @property + def client(self) -> OpenAIEmbeddings: + """Get or create the embedding client.""" + if self._client is None: + self._client = OpenAIEmbeddings( + model=self.model, + api_key=self.api_key, + base_url=self.base_url, + ) + logger.info( + f"Initialized Qwen embedding client", + model=self.model, + base_url=self.base_url, + ) + return self._client + + @property + def dimension(self) -> int: + """Get the embedding dimension.""" + if self._dimension is None: + # Generate a test embedding to determine dimension + test_embedding = self.embed("test") + self._dimension = len(test_embedding) + logger.info(f"Embedding dimension: {self._dimension}") + return self._dimension + + def embed(self, text: str) -> List[float]: + """ + Generate embedding for a single text. + + Args: + text: Text to embed + + Returns: + Embedding vector as list of floats + """ + try: + embedding = self.client.embed_query(text) + return embedding + except Exception as e: + logger.exception(f"Embedding failed for text", error=str(e)) + raise + + def embed_batch( + self, texts: List[str], batch_size: int = 32 + ) -> List[List[float]]: + """ + Generate embeddings for multiple texts in batches. + + Args: + texts: List of texts to embed + batch_size: Number of texts per batch + + Returns: + List of embedding vectors + """ + all_embeddings = [] + + for i in range(0, len(texts), batch_size): + batch = texts[i : i + batch_size] + try: + embeddings = self.client.embed_documents(batch) + all_embeddings.extend(embeddings) + logger.debug( + f"Embedded batch {i // batch_size + 1}", + batch_size=len(batch), + ) + except Exception as e: + logger.exception(f"Batch embedding failed", error=str(e), batch_index=i) + raise + + return all_embeddings diff --git a/backend/app/services/rag_test/evaluation/__init__.py b/backend/app/services/rag_test/evaluation/__init__.py new file mode 100644 index 0000000..8d0a53e --- /dev/null +++ b/backend/app/services/rag_test/evaluation/__init__.py @@ -0,0 +1,7 @@ +""" +Evaluation package for RAGAS-based RAG evaluation. +""" + +from app.services.rag_test.evaluation.ragas_evaluator import RAGTestEvaluator + +__all__ = ["RAGTestEvaluator"] diff --git a/backend/app/services/rag_test/evaluation/ragas_evaluator.py b/backend/app/services/rag_test/evaluation/ragas_evaluator.py new file mode 100644 index 0000000..8a4bb60 --- /dev/null +++ b/backend/app/services/rag_test/evaluation/ragas_evaluator.py @@ -0,0 +1,195 @@ +""" +RAGAS evaluator for RAG test evaluation. + +Reuses the existing RAGAS configuration from the main project. +""" + +import time +from typing import Any, Dict, List, Optional + +import structlog +from langchain_openai import ChatOpenAI, OpenAIEmbeddings +from ragas import EvaluationDataset, SingleTurnSample, evaluate +from ragas.embeddings import LangchainEmbeddingsWrapper +from ragas.llms import LangchainLLMWrapper +from ragas.metrics import ( + Faithfulness, + LLMContextPrecisionWithoutReference, + ResponseRelevancy, +) + +from app.core.config import settings + +logger = structlog.get_logger(__name__) + + +class RAGTestEvaluator: + """ + RAGAS evaluator for RAG test evaluation. + + Reuses existing RAGAS LLM and embedding configurations. + """ + + def __init__( + self, + llm_model: Optional[str] = None, + llm_api_key: Optional[str] = None, + llm_base_url: Optional[str] = None, + embedding_model: Optional[str] = None, + embedding_api_key: Optional[str] = None, + embedding_base_url: Optional[str] = None, + ): + """ + Initialize the evaluator with optional custom configurations. + + Args: + llm_model: LLM model name (default: from settings) + llm_api_key: LLM API key (default: from settings) + llm_base_url: LLM API base URL (default: from settings) + embedding_model: Embedding model name (default: from settings) + embedding_api_key: Embedding API key (default: from settings) + embedding_base_url: Embedding API base URL (default: from settings) + """ + # LLM configuration - reuse from settings if not provided + self.llm_model = llm_model or settings.RAGAS_LLM_MODEL + self.llm_api_key = llm_api_key or settings.RAGAS_LLM_API_KEY + self.llm_base_url = llm_base_url or settings.RAGAS_LLM_BASE_URL + + # Embedding configuration - reuse from settings if not provided + self.embedding_model = embedding_model or settings.RAGAS_EMBEDDING_MODEL + self.embedding_api_key = embedding_api_key or settings.RAGAS_EMBEDDING_API_KEY + self.embedding_base_url = embedding_base_url or settings.RAGAS_EMBEDDING_BASE_URL + + self._llm = None + self._embeddings = None + + @property + def llm(self) -> LangchainLLMWrapper: + """Get or create LLM instance.""" + if self._llm is None: + base_llm = ChatOpenAI( + model=self.llm_model, + api_key=self.llm_api_key, + base_url=self.llm_base_url, + temperature=0, + ) + self._llm = LangchainLLMWrapper(base_llm) + logger.info( + f"Initialized RAGAS LLM", + model=self.llm_model, + ) + return self._llm + + @property + def embeddings(self) -> LangchainEmbeddingsWrapper: + """Get or create embeddings instance.""" + if self._embeddings is None: + base_embeddings = OpenAIEmbeddings( + model=self.embedding_model, + api_key=self.embedding_api_key, + base_url=self.embedding_base_url, + ) + self._embeddings = LangchainEmbeddingsWrapper(base_embeddings) + logger.info( + f"Initialized RAGAS embeddings", + model=self.embedding_model, + ) + return self._embeddings + + def evaluate_single( + self, + question: str, + answer: str, + contexts: List[str], + ) -> Dict[str, Any]: + """ + Evaluate a single RAG response using RAGAS metrics. + + Args: + question: The user's question + answer: The generated answer + contexts: List of retrieved context texts + + Returns: + Dictionary containing: + - faithfulness: How faithful the answer is to the context + - answer_relevancy: How relevant the answer is to the question + - context_precision: Quality of the retrieved context + - overall_score: Average of all scores + - duration_ms: Evaluation duration + """ + start_time = time.time() + + try: + # Create RAGAS sample + sample = SingleTurnSample( + user_input=question, + response=answer, + retrieved_contexts=contexts, + ) + + # Create dataset + dataset = EvaluationDataset(samples=[sample]) + + # Define metrics + metrics = [ + Faithfulness(llm=self.llm), + ResponseRelevancy(llm=self.llm, embeddings=self.embeddings), + LLMContextPrecisionWithoutReference(llm=self.llm), + ] + + # Run evaluation + result = evaluate( + dataset=dataset, + metrics=metrics, + ) + + # Extract scores + result_df = result.to_pandas() + row = result_df.iloc[0] + + faithfulness = self._safe_float(row.get("faithfulness")) + answer_relevancy = self._safe_float(row.get("answer_relevancy")) + context_precision = self._safe_float( + row.get("llm_context_precision_without_reference") + ) + + # Calculate overall score + scores = [s for s in [faithfulness, answer_relevancy, context_precision] if s is not None] + overall_score = sum(scores) / len(scores) if scores else None + + duration_ms = int((time.time() - start_time) * 1000) + + return { + "faithfulness": faithfulness, + "answer_relevancy": answer_relevancy, + "context_precision": context_precision, + "overall_score": overall_score, + "duration_ms": duration_ms, + } + + except Exception as e: + logger.exception("RAGAS evaluation failed", error=str(e)) + duration_ms = int((time.time() - start_time) * 1000) + return { + "faithfulness": None, + "answer_relevancy": None, + "context_precision": None, + "overall_score": None, + "duration_ms": duration_ms, + "error": str(e), + } + + def _safe_float(self, value) -> Optional[float]: + """Safely convert value to float.""" + if value is None: + return None + try: + import math + + f = float(value) + if math.isnan(f) or math.isinf(f): + return None + return f + except (ValueError, TypeError): + return None diff --git a/backend/app/services/rag_test/llm/__init__.py b/backend/app/services/rag_test/llm/__init__.py new file mode 100644 index 0000000..218cb4f --- /dev/null +++ b/backend/app/services/rag_test/llm/__init__.py @@ -0,0 +1,7 @@ +""" +LLM package for language model interactions. +""" + +from app.services.rag_test.llm.glm_client import GLMClient + +__all__ = ["GLMClient"] diff --git a/backend/app/services/rag_test/llm/glm_client.py b/backend/app/services/rag_test/llm/glm_client.py new file mode 100644 index 0000000..a9d5c99 --- /dev/null +++ b/backend/app/services/rag_test/llm/glm_client.py @@ -0,0 +1,149 @@ +""" +GLM (Zhipu AI) client for query rewriting and answer generation. +""" + +import os +from typing import List, Optional + +import structlog + +logger = structlog.get_logger(__name__) + + +class GLMClient: + """ + Zhipu GLM client for query rewriting and answer generation. + """ + + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + model: str = "glm-4", + ): + """ + Initialize the GLM client. + + Args: + api_key: Zhipu API key (default: from GLM_API_KEY env var) + base_url: API base URL (default: from GLM_BASE_URL env var) + model: Model name (default: glm-4) + """ + self.api_key = api_key or os.getenv("GLM_API_KEY", "") + self.base_url = base_url or os.getenv( + "GLM_BASE_URL", "https://open.bigmodel.cn/api/paas/v4" + ) + self.model = model or os.getenv("GLM_MODEL", "glm-4") + self._client = None + + @property + def client(self): + """Get or create the Zhipu client.""" + if self._client is None: + try: + from zhipuai import ZhipuAI + + self._client = ZhipuAI(api_key=self.api_key) + logger.info(f"Initialized GLM client", model=self.model) + except ImportError as e: + raise ImportError( + "zhipuai is not installed. Please install it with: pip install zhipuai" + ) from e + return self._client + + def rewrite_query(self, original_query: str) -> str: + """ + Rewrite/optimize a query for better RAG retrieval. + + Args: + original_query: Original user query + + Returns: + Optimized query string + """ + prompt = f"""请对以下用户查询进行改写优化,使其更适合用于RAG检索: + +原始查询:{original_query} + +要求: +1. 保持原意 +2. 补充可能的上下文 +3. 使用更精确的关键词 + +请直接输出改写后的查询,不要有其他说明。""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + temperature=0.3, + ) + rewritten = response.choices[0].message.content.strip() + logger.debug( + f"Query rewritten", + original=original_query[:50], + rewritten=rewritten[:50], + ) + return rewritten + except Exception as e: + logger.warning(f"Query rewriting failed, using original: {e}") + return original_query + + def generate_answer( + self, + query: str, + contexts: List[str], + max_context_length: int = 8000, + ) -> str: + """ + Generate an answer based on retrieved contexts. + + Args: + query: User query + contexts: List of retrieved context texts + max_context_length: Maximum total context length + + Returns: + Generated answer string + """ + # Format contexts + context_parts = [] + total_length = 0 + for i, ctx in enumerate(contexts): + if total_length + len(ctx) > max_context_length: + break + context_parts.append(f"[文档{i+1}]\n{ctx}") + total_length += len(ctx) + + context_text = "\n\n".join(context_parts) + + prompt = f"""请基于以下检索到的文档内容回答用户问题。 + +检索到的文档: +{context_text} + +用户问题:{query} + +要求: +1. 仅基于提供的文档内容回答 +2. 如果文档中没有相关信息,请明确说明 +3. 回答要准确、完整 + +请直接回答:""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + temperature=0.1, + ) + answer = response.choices[0].message.content.strip() + logger.debug( + f"Answer generated", + query_preview=query[:50], + answer_preview=answer[:100], + ) + return answer + except Exception as e: + logger.exception(f"Answer generation failed", error=str(e)) + raise diff --git a/backend/app/services/rag_test/output.py b/backend/app/services/rag_test/output.py new file mode 100644 index 0000000..bc8d2f1 --- /dev/null +++ b/backend/app/services/rag_test/output.py @@ -0,0 +1,255 @@ +""" +Output handling for RAG test results. +""" + +import json +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +import structlog + +from app.services.rag_test.utils import ensure_directory, get_timestamp + +logger = structlog.get_logger(__name__) + + +class TestResult: + """Represents a single query test result.""" + + def __init__( + self, + query_id: int, + original_query: str, + rewritten_query: str, + retrieved_chunks: List[Dict[str, Any]], + generated_answer: str, + ragas_scores: Dict[str, Any], + ): + self.query_id = query_id + self.original_query = original_query + self.rewritten_query = rewritten_query + self.retrieved_chunks = retrieved_chunks + self.generated_answer = generated_answer + self.ragas_scores = ragas_scores + self.overall_score = ragas_scores.get("overall_score") + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "query_id": self.query_id, + "original_query": self.original_query, + "rewritten_query": self.rewritten_query, + "retrieved_chunks": self.retrieved_chunks, + "generated_answer": self.generated_answer, + "ragas_scores": self.ragas_scores, + "overall_score": self.overall_score, + } + + +class TestReport: + """Complete test report for a single converter+splitter combination.""" + + def __init__( + self, + test_id: str, + converter: str, + splitter: str, + splitter_config: Dict[str, Any], + embedding_model: str, + llm_model: str, + top_k: int, + source_directory: str, + ): + self.test_id = test_id + self.test_time = datetime.now().isoformat() + self.converter = converter + self.splitter = splitter + self.splitter_config = splitter_config + self.embedding_model = embedding_model + self.llm_model = llm_model + self.top_k = top_k + self.source_directory = source_directory + self.statistics: Dict[str, Any] = {} + self.results: List[TestResult] = [] + + def add_result(self, result: TestResult): + """Add a test result.""" + self.results.append(result) + + def set_statistics( + self, + total_source_files: int, + total_markdown_files: int, + total_chunks: int, + ): + """Set statistics.""" + self.statistics = { + "total_source_files": total_source_files, + "total_markdown_files": total_markdown_files, + "total_chunks": total_chunks, + } + + def calculate_summary(self) -> Dict[str, Any]: + """Calculate summary statistics.""" + if not self.results: + return { + "total_queries": 0, + "avg_faithfulness": None, + "avg_answer_relevancy": None, + "avg_context_precision": None, + "avg_overall_score": None, + } + + # Collect scores + faithfulness_scores = [] + relevancy_scores = [] + precision_scores = [] + overall_scores = [] + + for r in self.results: + if r.ragas_scores.get("faithfulness") is not None: + faithfulness_scores.append(r.ragas_scores["faithfulness"]) + if r.ragas_scores.get("answer_relevancy") is not None: + relevancy_scores.append(r.ragas_scores["answer_relevancy"]) + if r.ragas_scores.get("context_precision") is not None: + precision_scores.append(r.ragas_scores["context_precision"]) + if r.overall_score is not None: + overall_scores.append(r.overall_score) + + def safe_avg(scores: List[float]) -> Optional[float]: + return round(sum(scores) / len(scores), 4) if scores else None + + return { + "total_queries": len(self.results), + "avg_faithfulness": safe_avg(faithfulness_scores), + "avg_answer_relevancy": safe_avg(relevancy_scores), + "avg_context_precision": safe_avg(precision_scores), + "avg_overall_score": safe_avg(overall_scores), + } + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "test_id": self.test_id, + "test_config": { + "converter": self.converter, + "splitter": self.splitter, + "splitter_config": self.splitter_config, + "embedding_model": self.embedding_model, + "llm_model": self.llm_model, + "top_k": self.top_k, + }, + "test_time": self.test_time, + "source_directory": self.source_directory, + "statistics": self.statistics, + "results": [r.to_dict() for r in self.results], + "summary": self.calculate_summary(), + } + + def save(self, output_dir: Path) -> Path: + """ + Save the report to a JSON file. + + Args: + output_dir: Output directory + + Returns: + Path to the saved file + """ + output_dir = ensure_directory(output_dir) + timestamp = get_timestamp() + filename = f"rag_test_{self.converter}_{self.splitter}_{timestamp}.json" + output_path = output_dir / filename + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(self.to_dict(), f, ensure_ascii=False, indent=2) + + logger.info(f"Saved test report to {output_path}") + return output_path + + +class ComparisonReport: + """Comparison report aggregating multiple test reports.""" + + def __init__(self, test_reports: List[TestReport]): + self.test_reports = test_reports + self.generated_time = datetime.now().isoformat() + + def generate_comparison(self) -> Dict[str, Any]: + """Generate comparison data.""" + comparisons = [] + + for report in self.test_reports: + summary = report.calculate_summary() + comparisons.append({ + "converter": report.converter, + "splitter": report.splitter, + "statistics": report.statistics, + "scores": { + "avg_faithfulness": summary["avg_faithfulness"], + "avg_answer_relevancy": summary["avg_answer_relevancy"], + "avg_context_precision": summary["avg_context_precision"], + "avg_overall_score": summary["avg_overall_score"], + }, + "total_queries": summary["total_queries"], + }) + + # Sort by overall score descending + comparisons.sort( + key=lambda x: x["scores"]["avg_overall_score"] or 0, + reverse=True, + ) + + return { + "generated_time": self.generated_time, + "total_tests": len(self.test_reports), + "comparisons": comparisons, + "best_combination": comparisons[0] if comparisons else None, + } + + def save(self, output_path: Path) -> Path: + """ + Save the comparison report. + + Args: + output_path: Output file path + + Returns: + Path to the saved file + """ + output_path = Path(output_path) + ensure_directory(output_path.parent) + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(self.generate_comparison(), f, ensure_ascii=False, indent=2) + + logger.info(f"Saved comparison report to {output_path}") + return output_path + + +def load_test_reports(results_dir: Path) -> List[Dict[str, Any]]: + """ + Load all test report JSON files from a directory. + + Args: + results_dir: Directory containing test result JSON files + + Returns: + List of report dictionaries + """ + results_dir = Path(results_dir) + if not results_dir.exists(): + raise FileNotFoundError(f"Results directory not found: {results_dir}") + + reports = [] + for json_file in results_dir.glob("rag_test_*.json"): + try: + with open(json_file, "r", encoding="utf-8") as f: + report = json.load(f) + reports.append(report) + logger.info(f"Loaded report: {json_file.name}") + except Exception as e: + logger.warning(f"Failed to load {json_file}: {e}") + + return reports diff --git a/backend/app/services/rag_test/pipeline.py b/backend/app/services/rag_test/pipeline.py new file mode 100644 index 0000000..9398ecc --- /dev/null +++ b/backend/app/services/rag_test/pipeline.py @@ -0,0 +1,497 @@ +""" +RAG Test Pipeline orchestration. + +Coordinates the complete testing pipeline: +1. Document conversion to Markdown +2. Markdown splitting into chunks +3. Embedding generation and indexing +4. Query processing and retrieval +5. Answer generation +6. RAGAS evaluation +""" + +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple + +import structlog + +from app.services.rag_test.config import RAGTestConfig, SplitterConfig +from app.services.rag_test.converters import get_converter +from app.services.rag_test.converters.base import BaseConverter, ConversionResult +from app.services.rag_test.embeddings.qwen_embedding import QwenEmbedding +from app.services.rag_test.evaluation.ragas_evaluator import RAGTestEvaluator +from app.services.rag_test.llm.glm_client import GLMClient +from app.services.rag_test.output import TestReport, TestResult +from app.services.rag_test.retrieval.retriever import Retriever +from app.services.rag_test.splitters import get_splitter +from app.services.rag_test.splitters.base import BaseSplitter, Chunk +from app.services.rag_test.utils import ( + ensure_directory, + get_temp_directory, + get_timestamp, + load_queries_from_file, +) +from app.services.rag_test.vectorstore.faiss_store import FAISSStore + +logger = structlog.get_logger(__name__) + + +class RAGTestPipeline: + """ + Complete RAG testing pipeline. + + Orchestrates document conversion, splitting, indexing, retrieval, + generation, and evaluation. + """ + + def __init__(self, config: RAGTestConfig): + """ + Initialize the pipeline with configuration. + + Args: + config: Pipeline configuration + """ + self.config = config + self.converter: Optional[BaseConverter] = None + self.splitter: Optional[BaseSplitter] = None + self.embedding_client: Optional[QwenEmbedding] = None + self.vectorstore: Optional[FAISSStore] = None + self.llm_client: Optional[GLMClient] = None + self.retriever: Optional[Retriever] = None + self.evaluator: Optional[RAGTestEvaluator] = None + + # Progress callback for UI updates + self._progress_callback: Optional[Callable[[str, int, int], None]] = None + + def set_progress_callback( + self, callback: Callable[[str, int, int], None] + ): + """Set a progress callback function(stage, current, total).""" + self._progress_callback = callback + + def _report_progress(self, stage: str, current: int, total: int): + """Report progress through callback if set.""" + if self._progress_callback: + self._progress_callback(stage, current, total) + + def _init_components( + self, + converter_name: str, + splitter_name: str, + chunk_size: int = 1024, + chunk_overlap: int = 50, + ): + """Initialize pipeline components.""" + logger.info( + f"Initializing pipeline components", + converter=converter_name, + splitter=splitter_name, + ) + + # Initialize converter + self.converter = get_converter(converter_name) + + # Initialize splitter + self.splitter = get_splitter( + splitter_name, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + # Initialize embedding client + self.embedding_client = QwenEmbedding( + model=self.config.embedding.model, + api_key=self.config.embedding.api_key, + base_url=self.config.embedding.base_url, + ) + + # Initialize vector store + self.vectorstore = FAISSStore() + + # Initialize LLM client + self.llm_client = GLMClient( + api_key=self.config.llm.glm_api_key, + base_url=self.config.llm.glm_base_url, + model=self.config.llm.glm_model, + ) + + # Initialize evaluator + self.evaluator = RAGTestEvaluator( + llm_model=self.config.llm.ragas_model, + llm_api_key=self.config.llm.ragas_api_key, + llm_base_url=self.config.llm.ragas_base_url, + ) + + def convert_documents( + self, + converter_name: str, + source_dir: Path, + output_dir: Path, + ) -> Tuple[List[ConversionResult], int, int]: + """ + Convert documents to Markdown. + + Args: + converter_name: Name of the converter to use + source_dir: Source directory containing documents + output_dir: Output directory for Markdown files + + Returns: + Tuple of (results, success_count, failure_count) + """ + converter = get_converter(converter_name) + logger.info( + f"Converting documents", + converter=converter_name, + source_dir=str(source_dir), + ) + + results, success, failure = converter.convert_directory( + source_dir=Path(source_dir), + output_dir=Path(output_dir), + recursive=True, + ) + + logger.info( + f"Conversion complete", + success=success, + failure=failure, + ) + + return results, success, failure + + def split_and_index( + self, + splitter_name: str, + markdown_dir: Path, + index_dir: Path, + chunk_size: int = 1024, + chunk_overlap: int = 50, + ) -> Tuple[List[Chunk], Path]: + """ + Split Markdown files and build vector index. + + Args: + splitter_name: Name of the splitter to use + markdown_dir: Directory containing Markdown files + index_dir: Directory to save the index + chunk_size: Maximum chunk size + chunk_overlap: Chunk overlap + + Returns: + Tuple of (chunks, index_path) + """ + markdown_dir = Path(markdown_dir) + index_dir = ensure_directory(index_dir) + + # Get all markdown files + md_files = list(markdown_dir.glob("**/*.md")) + logger.info(f"Found {len(md_files)} Markdown files to split") + + # Initialize splitter + splitter = get_splitter(splitter_name, chunk_size, chunk_overlap) + + # Split files + all_chunks = [] + for i, md_file in enumerate(md_files): + self._report_progress("splitting", i + 1, len(md_files)) + try: + chunks = splitter.split_file(md_file) + all_chunks.extend(chunks) + except Exception as e: + logger.warning(f"Failed to split {md_file}: {e}") + + logger.info(f"Created {len(all_chunks)} chunks") + + if not all_chunks: + raise ValueError("No chunks created from Markdown files") + + # Initialize embedding client + if self.embedding_client is None: + self.embedding_client = QwenEmbedding() + + # Generate embeddings + logger.info("Generating embeddings...") + texts = [chunk.content for chunk in all_chunks] + embeddings = [] + batch_size = 32 + + for i in range(0, len(texts), batch_size): + batch = texts[i : i + batch_size] + self._report_progress("embedding", i + len(batch), len(texts)) + batch_embeddings = self.embedding_client.embed_batch(batch) + embeddings.extend(batch_embeddings) + + # Build index + logger.info("Building FAISS index...") + vectorstore = FAISSStore(dimension=len(embeddings[0])) + vectorstore.build_index(all_chunks, embeddings) + + # Save index + vectorstore.save(index_dir) + + return all_chunks, index_dir + + def evaluate_queries( + self, + index_path: Path, + queries_file: Path, + output_file: Path, + converter_name: str = "unknown", + splitter_name: str = "unknown", + chunk_size: int = 1024, + chunk_overlap: int = 50, + ) -> TestReport: + """ + Run evaluation on queries using a pre-built index. + + Args: + index_path: Path to the FAISS index + queries_file: Path to the queries file + output_file: Path to save results + converter_name: Converter name for report + splitter_name: Splitter name for report + chunk_size: Chunk size used + chunk_overlap: Chunk overlap used + + Returns: + TestReport with evaluation results + """ + # Load index + vectorstore = FAISSStore() + vectorstore.load(index_path) + + # Initialize components if not already done + if self.embedding_client is None: + self.embedding_client = QwenEmbedding() + if self.llm_client is None: + self.llm_client = GLMClient() + if self.evaluator is None: + self.evaluator = RAGTestEvaluator() + + # Create retriever + retriever = Retriever( + embedding_client=self.embedding_client, + vectorstore=vectorstore, + top_k=self.config.retrieval.top_k, + ) + + # Load queries + queries = load_queries_from_file(queries_file) + logger.info(f"Loaded {len(queries)} queries") + + # Create report + report = TestReport( + test_id=f"test_{get_timestamp()}", + converter=converter_name, + splitter=splitter_name, + splitter_config={ + "chunk_size": chunk_size, + "chunk_overlap": chunk_overlap, + }, + embedding_model=self.config.embedding.model, + llm_model=self.config.llm.glm_model, + top_k=self.config.retrieval.top_k, + source_directory=self.config.source_directory, + ) + + # Set statistics + stats = vectorstore.get_stats() + report.set_statistics( + total_source_files=stats["num_chunks"], + total_markdown_files=stats["num_chunks"], + total_chunks=stats["num_chunks"], + ) + + # Process each query + for idx, query in enumerate(queries): + self._report_progress("evaluating", idx + 1, len(queries)) + + try: + result = self._process_single_query( + query=query, + query_id=idx + 1, + retriever=retriever, + ) + report.add_result(result) + except Exception as e: + logger.exception(f"Failed to process query {idx + 1}", error=str(e)) + + # Save report + output_path = report.save(Path(output_file).parent) + logger.info(f"Evaluation complete, results saved to {output_path}") + + return report + + def run_full_test( + self, + converter_name: str, + splitter_name: str, + chunk_size: int = 1024, + chunk_overlap: int = 50, + ) -> TestReport: + """ + Run a complete test with specified converter and splitter. + + Args: + converter_name: Converter to use + splitter_name: Splitter to use + chunk_size: Chunk size + chunk_overlap: Chunk overlap + + Returns: + TestReport with complete results + """ + logger.info( + f"Starting full test", + converter=converter_name, + splitter=splitter_name, + ) + + # Initialize components + self._init_components( + converter_name=converter_name, + splitter_name=splitter_name, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + # Setup directories + source_dir = Path(self.config.source_directory) + output_dir = ensure_directory(Path(self.config.output_directory)) + temp_dir = get_temp_directory() + md_output_dir = temp_dir / f"markdown_{converter_name}" + index_dir = temp_dir / f"index_{converter_name}_{splitter_name}" + + # Step 1: Convert documents + self._report_progress("converting", 0, 1) + conversion_results, conv_success, conv_failure = self.converter.convert_directory( + source_dir=source_dir, + output_dir=md_output_dir, + recursive=True, + ) + self._report_progress("converting", 1, 1) + + # Step 2: Split and index + all_chunks, index_path = self.split_and_index( + splitter_name=splitter_name, + markdown_dir=md_output_dir, + index_dir=index_dir, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + # Create retriever + self.vectorstore.load(index_path) + self.retriever = Retriever( + embedding_client=self.embedding_client, + vectorstore=self.vectorstore, + top_k=self.config.retrieval.top_k, + ) + + # Load queries + queries = load_queries_from_file(Path(self.config.queries_file)) + + # Create report + report = TestReport( + test_id=f"test_{get_timestamp()}", + converter=converter_name, + splitter=splitter_name, + splitter_config={ + "chunk_size": chunk_size, + "chunk_overlap": chunk_overlap, + }, + embedding_model=self.config.embedding.model, + llm_model=self.config.llm.glm_model, + top_k=self.config.retrieval.top_k, + source_directory=str(source_dir), + ) + + report.set_statistics( + total_source_files=conv_success + conv_failure, + total_markdown_files=conv_success, + total_chunks=len(all_chunks), + ) + + # Step 3: Process queries + for idx, query in enumerate(queries): + self._report_progress("evaluating", idx + 1, len(queries)) + + try: + result = self._process_single_query( + query=query, + query_id=idx + 1, + retriever=self.retriever, + ) + report.add_result(result) + except Exception as e: + logger.exception(f"Failed to process query {idx + 1}", error=str(e)) + + # Save report + report.save(output_dir) + + logger.info( + f"Test complete", + converter=converter_name, + splitter=splitter_name, + total_queries=len(queries), + ) + + return report + + def _process_single_query( + self, + query: str, + query_id: int, + retriever: Retriever, + ) -> TestResult: + """ + Process a single query through the RAG pipeline. + + Args: + query: Original query + query_id: Query ID + retriever: Retriever instance + + Returns: + TestResult with evaluation scores + """ + logger.debug(f"Processing query {query_id}", query_preview=query[:50]) + + # Step 1: Query rewriting + rewritten_query = self.llm_client.rewrite_query(query) + + # Step 2: Retrieval + retrieved = retriever.retrieve(rewritten_query) + + # Format retrieved chunks + retrieved_chunks = [] + contexts = [] + for chunk, score in retrieved: + retrieved_chunks.append({ + "chunk_id": chunk.chunk_id, + "content": chunk.content, + "source_file": chunk.source_file, + "metadata": chunk.metadata, + "similarity_score": round(score, 4), + }) + contexts.append(chunk.content) + + # Step 3: Answer generation + answer = self.llm_client.generate_answer(rewritten_query, contexts) + + # Step 4: RAGAS evaluation + ragas_scores = self.evaluator.evaluate_single( + question=rewritten_query, + answer=answer, + contexts=contexts, + ) + + return TestResult( + query_id=query_id, + original_query=query, + rewritten_query=rewritten_query, + retrieved_chunks=retrieved_chunks, + generated_answer=answer, + ragas_scores=ragas_scores, + ) diff --git a/backend/app/services/rag_test/retrieval/__init__.py b/backend/app/services/rag_test/retrieval/__init__.py new file mode 100644 index 0000000..2758e75 --- /dev/null +++ b/backend/app/services/rag_test/retrieval/__init__.py @@ -0,0 +1,7 @@ +""" +Retrieval package for vector-based document retrieval. +""" + +from app.services.rag_test.retrieval.retriever import Retriever + +__all__ = ["Retriever"] diff --git a/backend/app/services/rag_test/retrieval/retriever.py b/backend/app/services/rag_test/retrieval/retriever.py new file mode 100644 index 0000000..e6c7951 --- /dev/null +++ b/backend/app/services/rag_test/retrieval/retriever.py @@ -0,0 +1,86 @@ +""" +Retriever for vector-based document retrieval. +""" + +from typing import List, Tuple + +import structlog + +from app.services.rag_test.embeddings.qwen_embedding import QwenEmbedding +from app.services.rag_test.splitters.base import Chunk +from app.services.rag_test.vectorstore.faiss_store import FAISSStore + +logger = structlog.get_logger(__name__) + + +class Retriever: + """ + Document retriever combining embedding and vector store. + """ + + def __init__( + self, + embedding_client: QwenEmbedding, + vectorstore: FAISSStore, + top_k: int = 5, + ): + """ + Initialize the retriever. + + Args: + embedding_client: Embedding client for query encoding + vectorstore: Vector store for similarity search + top_k: Default number of results to return + """ + self.embedding_client = embedding_client + self.vectorstore = vectorstore + self.top_k = top_k + + def retrieve( + self, + query: str, + top_k: int = None, + ) -> List[Tuple[Chunk, float]]: + """ + Retrieve relevant chunks for a query. + + Args: + query: Query text + top_k: Number of results (default: self.top_k) + + Returns: + List of (Chunk, similarity_score) tuples + """ + top_k = top_k or self.top_k + + # Embed query + query_embedding = self.embedding_client.embed(query) + + # Search vector store + results = self.vectorstore.search(query_embedding, top_k=top_k) + + logger.debug( + f"Retrieved {len(results)} chunks for query", + query_preview=query[:50], + top_k=top_k, + ) + + return results + + def retrieve_texts( + self, + query: str, + top_k: int = None, + ) -> List[str]: + """ + Retrieve relevant chunk texts for a query. + + Args: + query: Query text + top_k: Number of results (default: self.top_k) + + Returns: + List of chunk content strings + """ + results = self.retrieve(query, top_k) + return [chunk.content for chunk, _ in results] diff --git a/backend/app/services/rag_test/splitters/__init__.py b/backend/app/services/rag_test/splitters/__init__.py new file mode 100644 index 0000000..06c2fcd --- /dev/null +++ b/backend/app/services/rag_test/splitters/__init__.py @@ -0,0 +1,44 @@ +""" +Markdown splitters package for splitting Markdown content into chunks. +""" + +from app.services.rag_test.splitters.base import BaseSplitter, Chunk +from app.services.rag_test.splitters.langchain_splitter import LangChainSplitter +from app.services.rag_test.splitters.llamaindex_splitter import LlamaIndexSplitter + +__all__ = [ + "BaseSplitter", + "Chunk", + "LangChainSplitter", + "LlamaIndexSplitter", +] + + +def get_splitter( + name: str, chunk_size: int = 1024, chunk_overlap: int = 50 +) -> BaseSplitter: + """ + Get a splitter instance by name. + + Args: + name: Splitter name ('langchain' or 'llamaindex') + chunk_size: Maximum chunk size + chunk_overlap: Overlap between chunks + + Returns: + Splitter instance + + Raises: + ValueError: If splitter name is not recognized + """ + splitters = { + "langchain": LangChainSplitter, + "llamaindex": LlamaIndexSplitter, + } + + if name not in splitters: + raise ValueError( + f"Unknown splitter: {name}. Available splitters: {list(splitters.keys())}" + ) + + return splitters[name](chunk_size=chunk_size, chunk_overlap=chunk_overlap) diff --git a/backend/app/services/rag_test/splitters/base.py b/backend/app/services/rag_test/splitters/base.py new file mode 100644 index 0000000..18d4ef5 --- /dev/null +++ b/backend/app/services/rag_test/splitters/base.py @@ -0,0 +1,153 @@ +""" +Base class for Markdown text splitters. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +import structlog + +from app.services.rag_test.utils import generate_chunk_id + +logger = structlog.get_logger(__name__) + + +@dataclass +class Chunk: + """Represents a chunk of text after splitting.""" + + chunk_id: str + content: str + source_file: str + metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + """Ensure metadata is a dict.""" + if self.metadata is None: + self.metadata = {} + + def to_dict(self) -> Dict[str, Any]: + """Convert chunk to dictionary.""" + return { + "chunk_id": self.chunk_id, + "content": self.content, + "source_file": self.source_file, + "metadata": self.metadata, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "Chunk": + """Create chunk from dictionary.""" + return cls( + chunk_id=data["chunk_id"], + content=data["content"], + source_file=data["source_file"], + metadata=data.get("metadata", {}), + ) + + +class BaseSplitter(ABC): + """Abstract base class for Markdown text splitters.""" + + def __init__(self, chunk_size: int = 1024, chunk_overlap: int = 50): + """ + Initialize the splitter. + + Args: + chunk_size: Maximum size of each chunk + chunk_overlap: Overlap between consecutive chunks + """ + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + + @property + @abstractmethod + def name(self) -> str: + """Get the splitter name.""" + pass + + @abstractmethod + def split(self, markdown_content: str, source_file: str) -> List[Chunk]: + """ + Split Markdown content into chunks. + + Args: + markdown_content: Markdown text to split + source_file: Source file name for metadata + + Returns: + List of Chunk objects + """ + pass + + def split_file(self, file_path: Path) -> List[Chunk]: + """ + Split a Markdown file into chunks. + + Args: + file_path: Path to the Markdown file + + Returns: + List of Chunk objects + """ + file_path = Path(file_path) + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + return self.split(content, str(file_path)) + + def split_files(self, file_paths: List[Path]) -> List[Chunk]: + """ + Split multiple Markdown files into chunks. + + Args: + file_paths: List of paths to Markdown files + + Returns: + Combined list of Chunk objects from all files + """ + all_chunks = [] + for file_path in file_paths: + try: + chunks = self.split_file(file_path) + all_chunks.extend(chunks) + logger.info( + f"Split {file_path} into {len(chunks)} chunks", + splitter=self.name, + ) + except Exception as e: + logger.exception(f"Failed to split {file_path}", error=str(e)) + + return all_chunks + + def _create_chunk( + self, + content: str, + source_file: str, + index: int, + metadata: Optional[Dict[str, Any]] = None, + ) -> Chunk: + """ + Create a Chunk object with generated ID. + + Args: + content: Chunk content + source_file: Source file name + index: Chunk index + metadata: Optional metadata dict + + Returns: + Chunk object + """ + chunk_id = generate_chunk_id(content, source_file, index) + return Chunk( + chunk_id=chunk_id, + content=content, + source_file=source_file, + metadata=metadata or {}, + ) diff --git a/backend/app/services/rag_test/splitters/langchain_splitter.py b/backend/app/services/rag_test/splitters/langchain_splitter.py new file mode 100644 index 0000000..7590baa --- /dev/null +++ b/backend/app/services/rag_test/splitters/langchain_splitter.py @@ -0,0 +1,133 @@ +""" +LangChain-based Markdown splitter. + +Uses MarkdownHeaderTextSplitter followed by RecursiveCharacterTextSplitter. +""" + +from typing import List, Tuple + +import structlog + +from app.services.rag_test.splitters.base import BaseSplitter, Chunk + +logger = structlog.get_logger(__name__) + + +class LangChainSplitter(BaseSplitter): + """ + LangChain-based Markdown splitter. + + Splitting strategy: + 1. First split by Markdown headers using MarkdownHeaderTextSplitter + 2. Then recursively split large sections using RecursiveCharacterTextSplitter + """ + + def __init__( + self, + chunk_size: int = 1024, + chunk_overlap: int = 50, + headers_to_split_on: List[Tuple[str, str]] = None, + ): + """ + Initialize the LangChain splitter. + + Args: + chunk_size: Maximum size of each chunk + chunk_overlap: Overlap between consecutive chunks + headers_to_split_on: List of (header_marker, header_name) tuples + """ + super().__init__(chunk_size, chunk_overlap) + self.headers_to_split_on = headers_to_split_on or [ + ("#", "Header 1"), + ("##", "Header 2"), + ("###", "Header 3"), + ] + + @property + def name(self) -> str: + return "langchain" + + def split(self, markdown_content: str, source_file: str) -> List[Chunk]: + """ + Split Markdown content using LangChain splitters. + + Args: + markdown_content: Markdown text to split + source_file: Source file name for metadata + + Returns: + List of Chunk objects + """ + try: + from langchain_text_splitters import ( + MarkdownHeaderTextSplitter, + RecursiveCharacterTextSplitter, + ) + except ImportError: + try: + from langchain.text_splitter import ( + MarkdownHeaderTextSplitter, + RecursiveCharacterTextSplitter, + ) + except ImportError as e: + raise ImportError( + "langchain is not installed. Please install it with: pip install langchain" + ) from e + + logger.info( + f"Splitting with LangChain", + source_file=source_file, + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap, + ) + + # Step 1: Split by markdown headers + markdown_splitter = MarkdownHeaderTextSplitter( + headers_to_split_on=self.headers_to_split_on, + strip_headers=False, # Keep headers in content + ) + + try: + md_header_splits = markdown_splitter.split_text(markdown_content) + except Exception as e: + logger.warning(f"Header splitting failed, using full content: {e}") + # Fallback: treat entire content as single document + from langchain_core.documents import Document + + md_header_splits = [ + Document(page_content=markdown_content, metadata={"source": source_file}) + ] + + # Step 2: Recursively split large sections + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap, + length_function=len, + separators=["\n\n", "\n", ". ", " ", ""], + ) + + final_docs = text_splitter.split_documents(md_header_splits) + + # Convert to Chunk objects + chunks = [] + for idx, doc in enumerate(final_docs): + metadata = dict(doc.metadata) if doc.metadata else {} + metadata["source"] = source_file + metadata["splitter"] = self.name + metadata["chunk_index"] = idx + + chunk = self._create_chunk( + content=doc.page_content, + source_file=source_file, + index=idx, + metadata=metadata, + ) + chunks.append(chunk) + + logger.info( + f"LangChain split complete", + source_file=source_file, + num_chunks=len(chunks), + ) + + return chunks diff --git a/backend/app/services/rag_test/splitters/llamaindex_splitter.py b/backend/app/services/rag_test/splitters/llamaindex_splitter.py new file mode 100644 index 0000000..8bf7722 --- /dev/null +++ b/backend/app/services/rag_test/splitters/llamaindex_splitter.py @@ -0,0 +1,113 @@ +""" +LlamaIndex-based Markdown splitter. + +Uses MarkdownNodeParser followed by SentenceSplitter. +""" + +from typing import List + +import structlog + +from app.services.rag_test.splitters.base import BaseSplitter, Chunk + +logger = structlog.get_logger(__name__) + + +class LlamaIndexSplitter(BaseSplitter): + """ + LlamaIndex-based Markdown splitter. + + Splitting strategy: + 1. First parse Markdown structure using MarkdownNodeParser + 2. Then split by sentences using SentenceSplitter + """ + + @property + def name(self) -> str: + return "llamaindex" + + def split(self, markdown_content: str, source_file: str) -> List[Chunk]: + """ + Split Markdown content using LlamaIndex splitters. + + Args: + markdown_content: Markdown text to split + source_file: Source file name for metadata + + Returns: + List of Chunk objects + """ + try: + from llama_index.core import Document + from llama_index.core.node_parser import ( + MarkdownNodeParser, + SentenceSplitter, + ) + except ImportError as e: + raise ImportError( + "llama-index-core is not installed. Please install it with: pip install llama-index-core" + ) from e + + logger.info( + f"Splitting with LlamaIndex", + source_file=source_file, + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap, + ) + + # Create LlamaIndex Document + documents = [ + Document( + text=markdown_content, + metadata={"source": source_file}, + ) + ] + + # Step 1: Parse Markdown structure + markdown_parser = MarkdownNodeParser() + try: + nodes = markdown_parser.get_nodes_from_documents(documents) + except Exception as e: + logger.warning(f"Markdown parsing failed, using raw document: {e}") + nodes = documents + + # Step 2: Split by sentences + sentence_splitter = SentenceSplitter( + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap, + ) + + final_nodes = sentence_splitter.get_nodes_from_documents(nodes) + + # Convert to Chunk objects + chunks = [] + for idx, node in enumerate(final_nodes): + # Extract metadata from node + metadata = dict(node.metadata) if hasattr(node, "metadata") else {} + metadata["source"] = source_file + metadata["splitter"] = self.name + metadata["chunk_index"] = idx + + # Get text content + if hasattr(node, "text"): + content = node.text + elif hasattr(node, "get_content"): + content = node.get_content() + else: + content = str(node) + + chunk = self._create_chunk( + content=content, + source_file=source_file, + index=idx, + metadata=metadata, + ) + chunks.append(chunk) + + logger.info( + f"LlamaIndex split complete", + source_file=source_file, + num_chunks=len(chunks), + ) + + return chunks diff --git a/backend/app/services/rag_test/utils.py b/backend/app/services/rag_test/utils.py new file mode 100644 index 0000000..554d846 --- /dev/null +++ b/backend/app/services/rag_test/utils.py @@ -0,0 +1,127 @@ +""" +Utility functions for RAG Pipeline Testing. +""" + +import hashlib +import os +import tempfile +from datetime import datetime +from pathlib import Path +from typing import List, Optional + +import structlog + +logger = structlog.get_logger(__name__) + + +def get_timestamp() -> str: + """Get current timestamp in format: YYYYMMDD_HHMMSS""" + return datetime.now().strftime("%Y%m%d_%H%M%S") + + +def generate_chunk_id(content: str, source_file: str, index: int) -> str: + """Generate a unique chunk ID based on content hash and index.""" + hash_input = f"{source_file}:{index}:{content[:100]}" + hash_value = hashlib.md5(hash_input.encode()).hexdigest()[:8] + return f"chunk_{hash_value}_{index:04d}" + + +def ensure_directory(path: Path) -> Path: + """Ensure directory exists, create if not.""" + path = Path(path) + path.mkdir(parents=True, exist_ok=True) + return path + + +def get_temp_directory() -> Path: + """Get or create a temporary directory for RAG test.""" + temp_dir = Path(tempfile.gettempdir()) / "rag_test" + temp_dir.mkdir(parents=True, exist_ok=True) + return temp_dir + + +def get_supported_files( + directory: Path, extensions: List[str], recursive: bool = True +) -> List[Path]: + """ + Get all files with specified extensions from a directory. + + Args: + directory: Directory to search + extensions: List of file extensions (e.g., ['.pdf', '.docx']) + recursive: Whether to search recursively + + Returns: + List of file paths + """ + directory = Path(directory) + if not directory.exists(): + logger.warning(f"Directory does not exist: {directory}") + return [] + + files = [] + extensions_lower = [ext.lower() for ext in extensions] + + if recursive: + for ext in extensions_lower: + # Handle both with and without dot prefix + pattern = f"**/*{ext}" if ext.startswith(".") else f"**/*.{ext}" + files.extend(directory.glob(pattern)) + else: + for ext in extensions_lower: + pattern = f"*{ext}" if ext.startswith(".") else f"*.{ext}" + files.extend(directory.glob(pattern)) + + # Sort by file path for consistent ordering + return sorted(set(files)) + + +def sanitize_filename(filename: str) -> str: + """Sanitize filename by removing/replacing invalid characters.""" + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + filename = filename.replace(char, "_") + return filename + + +def calculate_file_hash(file_path: Path) -> str: + """Calculate MD5 hash of a file.""" + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + +def format_file_size(size_bytes: int) -> str: + """Format file size in human-readable format.""" + for unit in ["B", "KB", "MB", "GB"]: + if size_bytes < 1024: + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024 + return f"{size_bytes:.2f} TB" + + +def load_queries_from_file(file_path: Path) -> List[str]: + """ + Load queries from a text file (one query per line). + + Args: + file_path: Path to the queries file + + Returns: + List of query strings + """ + file_path = Path(file_path) + if not file_path.exists(): + raise FileNotFoundError(f"Queries file not found: {file_path}") + + queries = [] + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): # Skip empty lines and comments + queries.append(line) + + logger.info(f"Loaded {len(queries)} queries from {file_path}") + return queries diff --git a/backend/app/services/rag_test/vectorstore/__init__.py b/backend/app/services/rag_test/vectorstore/__init__.py new file mode 100644 index 0000000..ebb1bc7 --- /dev/null +++ b/backend/app/services/rag_test/vectorstore/__init__.py @@ -0,0 +1,7 @@ +""" +Vector store package for storing and retrieving embeddings. +""" + +from app.services.rag_test.vectorstore.faiss_store import FAISSStore + +__all__ = ["FAISSStore"] diff --git a/backend/app/services/rag_test/vectorstore/faiss_store.py b/backend/app/services/rag_test/vectorstore/faiss_store.py new file mode 100644 index 0000000..a91de72 --- /dev/null +++ b/backend/app/services/rag_test/vectorstore/faiss_store.py @@ -0,0 +1,218 @@ +""" +FAISS vector store for storing and retrieving embeddings. +""" + +import json +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import numpy as np +import structlog + +from app.services.rag_test.splitters.base import Chunk +from app.services.rag_test.utils import ensure_directory + +logger = structlog.get_logger(__name__) + + +class FAISSStore: + """ + FAISS-based vector store for efficient similarity search. + """ + + def __init__(self, dimension: int = 1536): + """ + Initialize the FAISS store. + + Args: + dimension: Embedding dimension + """ + self.dimension = dimension + self.index = None + self.chunks: List[Chunk] = [] + self._faiss = None + + @property + def faiss(self): + """Lazy import of faiss.""" + if self._faiss is None: + try: + import faiss + + self._faiss = faiss + except ImportError as e: + raise ImportError( + "faiss is not installed. Please install it with: pip install faiss-cpu" + ) from e + return self._faiss + + def build_index( + self, + chunks: List[Chunk], + embeddings: List[List[float]], + index_type: str = "flat", + ): + """ + Build FAISS index from chunks and their embeddings. + + Args: + chunks: List of Chunk objects + embeddings: Corresponding embedding vectors + index_type: Type of FAISS index ('flat' for exact search) + """ + if len(chunks) != len(embeddings): + raise ValueError( + f"Number of chunks ({len(chunks)}) must match number of embeddings ({len(embeddings)})" + ) + + if not chunks: + logger.warning("No chunks to index") + return + + # Determine dimension from embeddings + self.dimension = len(embeddings[0]) + + # Convert to numpy array + vectors = np.array(embeddings).astype("float32") + + # Create index + if index_type == "flat": + # Exact L2 search + self.index = self.faiss.IndexFlatL2(self.dimension) + elif index_type == "ivf": + # IVF index for larger datasets + nlist = min(100, len(embeddings)) + quantizer = self.faiss.IndexFlatL2(self.dimension) + self.index = self.faiss.IndexIVFFlat(quantizer, self.dimension, nlist) + self.index.train(vectors) + else: + raise ValueError(f"Unknown index type: {index_type}") + + # Add vectors to index + self.index.add(vectors) + self.chunks = chunks + + logger.info( + f"Built FAISS index", + num_vectors=len(chunks), + dimension=self.dimension, + index_type=index_type, + ) + + def search( + self, + query_embedding: List[float], + top_k: int = 5, + ) -> List[Tuple[Chunk, float]]: + """ + Search for similar chunks. + + Args: + query_embedding: Query embedding vector + top_k: Number of results to return + + Returns: + List of (Chunk, similarity_score) tuples + """ + if self.index is None: + raise RuntimeError("Index not built. Call build_index() first.") + + # Ensure we don't request more than available + top_k = min(top_k, len(self.chunks)) + + # Convert query to numpy array + query_vector = np.array([query_embedding]).astype("float32") + + # Search + distances, indices = self.index.search(query_vector, top_k) + + # Build results + results = [] + for idx, distance in zip(indices[0], distances[0]): + if idx >= 0 and idx < len(self.chunks): + # Convert L2 distance to similarity score (0-1 range) + # Using: similarity = 1 / (1 + distance) + similarity = 1.0 / (1.0 + float(distance)) + results.append((self.chunks[idx], similarity)) + + return results + + def save(self, index_path: Path): + """ + Save the index and chunks to disk. + + Args: + index_path: Directory to save the index + """ + if self.index is None: + raise RuntimeError("No index to save. Call build_index() first.") + + index_path = ensure_directory(index_path) + + # Save FAISS index + faiss_path = index_path / "index.faiss" + self.faiss.write_index(self.index, str(faiss_path)) + + # Save chunks metadata + chunks_data = [chunk.to_dict() for chunk in self.chunks] + chunks_path = index_path / "chunks.json" + with open(chunks_path, "w", encoding="utf-8") as f: + json.dump( + { + "dimension": self.dimension, + "num_chunks": len(self.chunks), + "chunks": chunks_data, + }, + f, + ensure_ascii=False, + indent=2, + ) + + logger.info( + f"Saved FAISS index", + path=str(index_path), + num_chunks=len(self.chunks), + ) + + def load(self, index_path: Path): + """ + Load the index and chunks from disk. + + Args: + index_path: Directory containing the saved index + """ + index_path = Path(index_path) + + # Load FAISS index + faiss_path = index_path / "index.faiss" + if not faiss_path.exists(): + raise FileNotFoundError(f"FAISS index not found: {faiss_path}") + + self.index = self.faiss.read_index(str(faiss_path)) + + # Load chunks metadata + chunks_path = index_path / "chunks.json" + if not chunks_path.exists(): + raise FileNotFoundError(f"Chunks metadata not found: {chunks_path}") + + with open(chunks_path, "r", encoding="utf-8") as f: + data = json.load(f) + + self.dimension = data["dimension"] + self.chunks = [Chunk.from_dict(c) for c in data["chunks"]] + + logger.info( + f"Loaded FAISS index", + path=str(index_path), + num_chunks=len(self.chunks), + dimension=self.dimension, + ) + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the index.""" + return { + "dimension": self.dimension, + "num_chunks": len(self.chunks), + "index_trained": self.index is not None, + "total_content_length": sum(len(c.content) for c in self.chunks), + } diff --git a/backend/pyproject.toml b/backend/pyproject.toml index c82d0af..7ec1d32 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -33,6 +33,25 @@ dependencies = [ "orjson>=3.9.0", "structlog>=23.1.0", "python-dateutil>=2.8.2", + # RAG Test Pipeline - Document Converters + "docling>=2.0.0", + "marker-pdf>=0.2.0", + "pypandoc>=1.12", + "pymupdf>=1.23.0", + "unstructured>=0.10.0", + "python-pptx>=0.6.21", + "python-docx>=1.0.0", + # RAG Test Pipeline - LlamaIndex Splitters + "llama-index-core>=0.10.0", + # RAG Test Pipeline - Vector Store + "faiss-cpu>=1.7.4", + # RAG Test Pipeline - LLM Client + "zhipuai>=2.0.0", + # RAG Test Pipeline - CLI Tools + "typer>=0.9.0", + "rich>=13.0.0", + # RAG Test Pipeline - Text Splitters + "langchain-text-splitters>=0.2.0", ] [tool.setuptools.packages.find]