Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ wheels/
.pytest_cache/
.claude/
.qdrant_code_embeddings/

# Popular VibeCoding Agents
.roo/
.augment
.vscode
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,23 @@ The system automatically detects and processes files for all supported languages

### Step 2: Query the Codebase

**Interactive mode:**

Start the interactive RAG CLI:

```bash
python -m codebase_rag.main start --repo-path /path/to/your/repo
```

**Non-interactive mode (single query):**

Run a single query and exit, with output sent to stdout (useful for scripting):

```bash
python -m codebase_rag.main start --repo-path /path/to/your/repo \
--ask-agent "What functions call UserService.create_user?"
```

### Step 2.5: Real-Time Graph Updates (Optional)

For active development, you can keep your knowledge graph automatically synchronized with code changes using the realtime updater. This is particularly useful when you're actively modifying code and want the AI assistant to always work with the latest codebase structure.
Expand Down
56 changes: 52 additions & 4 deletions codebase_rag/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,21 @@ def is_edit_operation_response(response_text: str) -> bool:
return tool_usage or content_indicators or pattern_match


def _setup_common_initialization(repo_path: str) -> Path:
"""Common setup logic for both main and optimize functions."""
def _setup_common_initialization(repo_path: str, question_mode: bool = False) -> Path:
"""Common setup logic for both main and optimize functions.

Args:
repo_path: Path to the repository
question_mode: If True, suppress INFO/DEBUG/WARNING logs (only show errors and direct output)
"""
# Logger initialization
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {message}")
if question_mode:
# In question mode, only show ERROR level logs
logger.add(sys.stderr, level="ERROR", format="{message}")
else:
# In interactive mode, show all logs
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {message}")

# Temporary directory cleanup
project_root = Path(repo_path).resolve()
Expand Down Expand Up @@ -774,6 +784,29 @@ def _validate_provider_config(role: str, config: Any) -> None:
return rag_agent


async def main_async_single_query(
repo_path: str, batch_size: int, question: str
) -> None:
"""Initializes services and runs a single query in non-interactive mode."""
project_root = _setup_common_initialization(repo_path, question_mode=True)

with MemgraphIngestor(
host=settings.MEMGRAPH_HOST,
port=settings.MEMGRAPH_PORT,
batch_size=batch_size,
) as ingestor:
rag_agent = _initialize_services_and_agent(repo_path, ingestor)

# Handle images in the question
question_with_context = _handle_chat_images(question, project_root)

# Run the query
response = await rag_agent.run(question_with_context, message_history=[])

# Output response to stdout
print(response.output)


async def main_async(repo_path: str, batch_size: int) -> None:
"""Initializes services and runs the main application loop."""
project_root = _setup_common_initialization(repo_path)
Expand Down Expand Up @@ -840,6 +873,12 @@ def start(
min=1,
help="Number of buffered nodes/relationships before flushing to Memgraph",
),
ask_agent: str | None = typer.Option(
None,
"-a",
"--ask-agent",
help="Run a single query and exit (non-interactive mode). Output is sent to stdout.",
),
) -> None:
"""Starts the Codebase RAG CLI."""
global confirm_edits_globally
Expand Down Expand Up @@ -892,7 +931,16 @@ def start(
return

try:
asyncio.run(main_async(target_repo_path, effective_batch_size))
if ask_agent:
# Non-interactive mode: run single query and exit
asyncio.run(
main_async_single_query(
target_repo_path, effective_batch_size, ask_agent
)
)
else:
# Interactive mode: run chat loop
asyncio.run(main_async(target_repo_path, effective_batch_size))
except KeyboardInterrupt:
console.print("\n[bold red]Application terminated by user.[/bold red]")
except ValueError as e:
Expand Down
87 changes: 87 additions & 0 deletions codebase_rag/mcp/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""MCP client for querying the code graph via the MCP server.

This module provides a simple CLI client that connects to the MCP server
and executes the ask_agent tool with a provided question.
"""

import asyncio
import json
import os
import sys
from typing import Any

import typer
from mcp import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client

app = typer.Typer()


async def query_mcp_server(question: str) -> dict[str, Any]:
"""Query the MCP server with a question.

Args:
question: The question to ask about the codebase

Returns:
Dictionary with the response from the server
"""
# Start the MCP server as a subprocess with stderr redirected to /dev/null
# This suppresses all server logs while keeping stdout/stdin for MCP communication
with open(os.devnull, "w") as devnull:
server_params = StdioServerParameters(
command="python",
args=["-m", "codebase_rag.main", "mcp-server"],
)

async with stdio_client(server=server_params, errlog=devnull) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the session
await session.initialize()

# Call the ask_agent tool
result = await session.call_tool("ask_agent", {"question": question})

# Extract the response text
if result.content:
response_text = result.content[0].text
# Parse JSON response
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict):
return parsed
return {"output": str(parsed)}
except json.JSONDecodeError:
return {"output": response_text}
return {"output": "No response from server"}


@app.command()
def main(
question: str = typer.Option(
..., "--ask-agent", "-a", help="Question to ask about the codebase"
),
) -> None:
"""Query the code graph via MCP server.

Example:
python -m codebase_rag.mcp.client --ask-agent "What functions call UserService.create_user?"
"""
try:
# Run the async query
result = asyncio.run(query_mcp_server(question))

# Print only the output (clean for scripting)
if isinstance(result, dict) and "output" in result:
print(result["output"])
else:
print(json.dumps(result))

except Exception as e:
# Print error to stderr and exit with error code
print(f"Error: {str(e)}", file=sys.stderr)
sys.exit(1)


if __name__ == "__main__":
app()
100 changes: 75 additions & 25 deletions codebase_rag/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import json
import os
import sys
from pathlib import Path

from loguru import logger
Expand All @@ -20,14 +19,54 @@
from codebase_rag.services.llm import CypherGenerator


def setup_logging() -> None:
"""Configure logging to stderr for MCP stdio transport."""
def setup_logging(enable_logging: bool = False) -> None:
"""Configure logging for MCP stdio transport.

By default, logging is disabled to prevent token waste in LLM context.
Can be enabled via environment variable MCP_ENABLE_LOGGING=1 for debugging.

When enabled, logs are written to a file to avoid polluting STDIO transport.
The log file path can be configured via MCP_LOG_FILE environment variable.

Args:
enable_logging: Whether to enable logging output. Defaults to False.
Can also be controlled via MCP_ENABLE_LOGGING environment variable.
"""
logger.remove() # Remove default handler
logger.add(
sys.stderr,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",

# Check environment variable to override enable_logging parameter
env_enable = os.environ.get("MCP_ENABLE_LOGGING", "").lower() in (
"1",
"true",
"yes",
)
should_enable = enable_logging or env_enable

if should_enable:
# Get log file path from environment or use default
log_file = os.environ.get("MCP_LOG_FILE")
if not log_file:
# Use ~/.cache/code-graph-rag/mcp.log as default
cache_dir = Path.home() / ".cache" / "code-graph-rag"
cache_dir.mkdir(parents=True, exist_ok=True)
log_file = str(cache_dir / "mcp.log")

# Ensure log file directory exists
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)

# Add file handler - logs go to file, not STDERR/STDOUT
logger.add(
log_file,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
colorize=False, # Disable ANSI color codes
rotation="10 MB", # Rotate when file reaches 10MB
retention="7 days", # Keep logs for 7 days
)
else:
# Disable all logging by default for MCP mode
logger.disable("codebase_rag")


def get_project_root() -> Path:
Expand Down Expand Up @@ -143,34 +182,45 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]:

Tool handlers are dynamically resolved from the MCPToolsRegistry,
ensuring consistency with tool definitions.

Logging is suppressed during tool execution to prevent token waste in LLM context.
"""
logger.info(f"[GraphCode MCP] Calling tool: {name}")
import io
from contextlib import redirect_stderr, redirect_stdout

try:
# Resolve handler from registry
handler_info = tools.get_tool_handler(name)
if not handler_info:
error_msg = f"Unknown tool: {name}"
logger.error(f"[GraphCode MCP] {error_msg}")
error_msg = "Unknown tool"
return [TextContent(type="text", text=f"Error: {error_msg}")]

handler, returns_json = handler_info

# Call handler with unpacked arguments
result = await handler(**arguments)

# Format result based on output type
if returns_json:
result_text = json.dumps(result, indent=2)
else:
result_text = str(result)

return [TextContent(type="text", text=result_text)]

except Exception as e:
error_msg = f"Error executing tool '{name}': {str(e)}"
logger.error(f"[GraphCode MCP] {error_msg}", exc_info=True)
return [TextContent(type="text", text=f"Error: {error_msg}")]
# Suppress all logging output during tool execution
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
logger.disable("codebase_rag")
try:
# Call handler with unpacked arguments
result = await handler(**arguments)

# Format result based on output type
if returns_json:
result_text = json.dumps(result, indent=2)
else:
result_text = str(result)

return [TextContent(type="text", text=result_text)]
finally:
logger.enable("codebase_rag")

except Exception:
# Fail silently without logging or printing error details
return [
TextContent(
type="text", text="Error: There was an error executing the tool"
)
]

return server, ingestor

Expand Down
Loading