Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ wheels/
.pytest_cache/
.claude/
.qdrant_code_embeddings/

# Popular VibeCoding Agents
.roo/
.augment
.vscode
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,23 @@ The system automatically detects and processes files for all supported languages

### Step 2: Query the Codebase

**Interactive mode:**

Start the interactive RAG CLI:

```bash
python -m codebase_rag.main start --repo-path /path/to/your/repo
```

**Non-interactive mode (single query):**

Run a single query and exit, with output sent to stdout (useful for scripting):

```bash
python -m codebase_rag.main start --repo-path /path/to/your/repo \
--ask-agent "What functions call UserService.create_user?"
```

### Step 2.5: Real-Time Graph Updates (Optional)

For active development, you can keep your knowledge graph automatically synchronized with code changes using the realtime updater. This is particularly useful when you're actively modifying code and want the AI assistant to always work with the latest codebase structure.
Expand Down
56 changes: 52 additions & 4 deletions codebase_rag/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,21 @@ def is_edit_operation_response(response_text: str) -> bool:
return tool_usage or content_indicators or pattern_match


def _setup_common_initialization(repo_path: str) -> Path:
"""Common setup logic for both main and optimize functions."""
def _setup_common_initialization(repo_path: str, question_mode: bool = False) -> Path:
"""Common setup logic for both main and optimize functions.

Args:
repo_path: Path to the repository
question_mode: If True, suppress INFO/DEBUG/WARNING logs (only show errors and direct output)
"""
# Logger initialization
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {message}")
if question_mode:
# In question mode, only show ERROR level logs
logger.add(sys.stderr, level="ERROR", format="{message}")
else:
# In interactive mode, show all logs
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {message}")

# Temporary directory cleanup
project_root = Path(repo_path).resolve()
Expand Down Expand Up @@ -774,6 +784,29 @@ def _validate_provider_config(role: str, config: Any) -> None:
return rag_agent


async def main_async_single_query(
repo_path: str, batch_size: int, question: str
) -> None:
"""Initializes services and runs a single query in non-interactive mode."""
project_root = _setup_common_initialization(repo_path, question_mode=True)

with MemgraphIngestor(
host=settings.MEMGRAPH_HOST,
port=settings.MEMGRAPH_PORT,
batch_size=batch_size,
) as ingestor:
rag_agent = _initialize_services_and_agent(repo_path, ingestor)

# Handle images in the question
question_with_context = _handle_chat_images(question, project_root)

# Run the query
response = await rag_agent.run(question_with_context, message_history=[])

# Output response to stdout
print(response.output)


async def main_async(repo_path: str, batch_size: int) -> None:
"""Initializes services and runs the main application loop."""
project_root = _setup_common_initialization(repo_path)
Expand Down Expand Up @@ -840,6 +873,12 @@ def start(
min=1,
help="Number of buffered nodes/relationships before flushing to Memgraph",
),
question: str | None = typer.Option(
None,
"-q",
"--question",
help="Run a single query and exit (non-interactive mode). Output is sent to stdout.",
),
) -> None:
"""Starts the Codebase RAG CLI."""
global confirm_edits_globally
Expand Down Expand Up @@ -892,7 +931,16 @@ def start(
return

try:
asyncio.run(main_async(target_repo_path, effective_batch_size))
if question:
# Non-interactive mode: run single query and exit
asyncio.run(
main_async_single_query(
target_repo_path, effective_batch_size, question
)
)
else:
# Interactive mode: run chat loop
asyncio.run(main_async(target_repo_path, effective_batch_size))
except KeyboardInterrupt:
console.print("\n[bold red]Application terminated by user.[/bold red]")
except ValueError as e:
Expand Down
87 changes: 87 additions & 0 deletions codebase_rag/mcp/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""MCP client for querying the code graph via the MCP server.

This module provides a simple CLI client that connects to the MCP server
and executes the ask_agent tool with a provided question.
"""

import asyncio
import json
import os
import sys
from typing import Any

import typer
from mcp import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client

app = typer.Typer()


async def query_mcp_server(question: str) -> dict[str, Any]:
"""Query the MCP server with a question.

Args:
question: The question to ask about the codebase

Returns:
Dictionary with the response from the server
"""
# Start the MCP server as a subprocess with stderr redirected to /dev/null
# This suppresses all server logs while keeping stdout/stdin for MCP communication
with open(os.devnull, "w") as devnull:
server_params = StdioServerParameters(
command="python",
args=["-m", "codebase_rag.main", "mcp-server"],
)

async with stdio_client(server=server_params, errlog=devnull) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the session
await session.initialize()

# Call the ask_agent tool
result = await session.call_tool("ask_agent", {"question": question})

# Extract the response text
if result.content:
response_text = result.content[0].text
# Parse JSON response
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict):
return parsed
return {"output": str(parsed)}
except json.JSONDecodeError:
return {"output": response_text}
return {"output": "No response from server"}


@app.command()
def main(
question: str = typer.Option(
..., "--ask-agent", "-a", help="Question to ask about the codebase"
),
) -> None:
"""Query the code graph via MCP server.

Example:
python -m codebase_rag.mcp.client --ask-agent "What functions call UserService.create_user?"
"""
try:
# Run the async query
result = asyncio.run(query_mcp_server(question))

# Print only the output (clean for scripting)
if isinstance(result, dict) and "output" in result:
print(result["output"])
else:
print(json.dumps(result))

except Exception as e:
# Print error to stderr and exit with error code
print(f"Error: {str(e)}", file=sys.stderr)
sys.exit(1)


if __name__ == "__main__":
app()
95 changes: 94 additions & 1 deletion codebase_rag/mcp/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,25 @@
from codebase_rag.graph_updater import GraphUpdater
from codebase_rag.parser_loader import load_parsers
from codebase_rag.services.graph_service import MemgraphIngestor
from codebase_rag.services.llm import CypherGenerator
from codebase_rag.services.llm import CypherGenerator, create_rag_orchestrator
from codebase_rag.tools.code_retrieval import CodeRetriever, create_code_retrieval_tool
from codebase_rag.tools.codebase_query import create_query_tool
from codebase_rag.tools.directory_lister import (
DirectoryLister,
create_directory_lister_tool,
)
from codebase_rag.tools.document_analyzer import (
DocumentAnalyzer,
create_document_analyzer_tool,
)
from codebase_rag.tools.file_editor import FileEditor, create_file_editor_tool
from codebase_rag.tools.file_reader import FileReader, create_file_reader_tool
from codebase_rag.tools.file_writer import FileWriter, create_file_writer_tool
from codebase_rag.tools.semantic_search import (
create_get_function_source_tool,
create_semantic_search_tool,
)
from codebase_rag.tools.shell_command import ShellCommander, create_shell_command_tool


@dataclass
Expand Down Expand Up @@ -66,6 +75,8 @@ def __init__(
self.file_reader = FileReader(project_root=project_root)
self.file_writer = FileWriter(project_root=project_root)
self.directory_lister = DirectoryLister(project_root=project_root)
self.shell_commander = ShellCommander(project_root=project_root)
self.document_analyzer = DocumentAnalyzer(project_root=project_root)

# Create pydantic-ai tools - we'll call the underlying functions directly
self._query_tool = create_query_tool(
Expand All @@ -78,6 +89,17 @@ def __init__(
self._directory_lister_tool = create_directory_lister_tool(
directory_lister=self.directory_lister
)
self._shell_command_tool = create_shell_command_tool(
shell_commander=self.shell_commander
)
self._document_analyzer_tool = create_document_analyzer_tool(
self.document_analyzer
)
self._semantic_search_tool = create_semantic_search_tool()
self._function_source_tool = create_get_function_source_tool()

# Create RAG orchestrator agent (lazy initialization for testing)
self._rag_agent: Any = None

# Build tool registry - single source of truth for all tool metadata
self._tools: dict[str, ToolMetadata] = {
Expand Down Expand Up @@ -214,8 +236,57 @@ def __init__(
handler=self.list_directory,
returns_json=False,
),
"ask_agent": ToolMetadata(
name="ask_agent",
description="Ask the Code Graph RAG agent a question about the codebase. "
"Use this tool for general questions about the codebase, architecture, functionality, and code relationships. "
"Examples: 'How is the authentication implemented?', "
"'What are the main components of the system?', 'Where is the database connection configured?'",
input_schema={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "A question about the codebase, architecture, functionality, and code relationships. "
"Examples: 'What functions call UserService.create_user?', "
"'How is error handling implemented?', 'What are the main entry points?'",
}
},
"required": ["question"],
},
handler=self.ask_agent,
returns_json=True,
),
}

@property
def rag_agent(self) -> Any:
"""Lazy-initialize the RAG orchestrator agent on first access.

This allows tests to mock the agent without triggering LLM initialization.
"""
if self._rag_agent is None:
self._rag_agent = create_rag_orchestrator(
tools=[
self._query_tool,
self._code_tool,
self._file_reader_tool,
self._file_writer_tool,
self._file_editor_tool,
self._shell_command_tool,
self._directory_lister_tool,
self._document_analyzer_tool,
self._semantic_search_tool,
self._function_source_tool,
]
)
return self._rag_agent

@rag_agent.setter
def rag_agent(self, value: Any) -> None:
"""Allow setting the RAG agent (useful for testing)."""
self._rag_agent = value

async def index_repository(self) -> str:
"""Parse and ingest the repository into the Memgraph knowledge graph.

Expand Down Expand Up @@ -439,6 +510,28 @@ async def list_directory(self, directory_path: str = ".") -> str:
logger.error(f"[MCP] Error listing directory: {e}")
return f"Error: {str(e)}"

async def ask_agent(self, question: str) -> dict[str, Any]:
"""Ask a single question about the codebase and get an answer.

This tool executes the question using the RAG agent and returns the response
in a structured format suitable for MCP clients.

Args:
question: The question to ask about the codebase

Returns:
Dictionary with 'output' key containing the answer
"""
logger.info(f"[MCP] ask_agent: {question}")
try:
# Run the query using the RAG agent
response = await self.rag_agent.run(question, message_history=[])

return {"output": response.output}
except Exception as e:
logger.error(f"[MCP] Error asking code graph: {e}", exc_info=True)
return {"output": f"Error: {str(e)}", "error": True}

def get_tool_schemas(self) -> list[dict[str, Any]]:
"""Get MCP tool schemas for all registered tools.

Expand Down