Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 77 additions & 6 deletions agent/burr_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ def respond(state: State) -> State:


@action(
reads=["service_name"],
reads=["service_name", "analysis_mode", "changed_components"],
writes=[
"components",
"depth_order",
Expand All @@ -1227,11 +1227,15 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
- /tmp/{service_name}/service_discovery/components.json
- /tmp/{service_name}/dependency_graphs/analysis_order.json

Populates state with components and depth ordering for analysis.
In incremental mode, also pre-populates component_analyses from
existing artifacts for components that have NOT changed, so that
analyze_current_depth can skip re-running them.
"""
from pathlib import Path

service_name = state.get("service_name", "unknown")
analysis_mode = state.get("analysis_mode", "full")
changed_components = state.get("changed_components", [])
work_dir = Path(f"/tmp/{service_name}")
components_file = work_dir / "service_discovery" / "components.json"
analysis_order_file = work_dir / "dependency_graphs" / "analysis_order.json"
Expand Down Expand Up @@ -1269,12 +1273,36 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
# Build component lookup by name
component_map = {c["name"]: c for c in components}

# Pre-populate analyses for unchanged components in incremental mode
# so analyze_current_depth can skip them while still having context
# for downstream dependencies.
preloaded_analyses: Dict[str, str] = {}
if analysis_mode == "incremental" and changed_components:
changed_set = set(changed_components)
analyses_dir = work_dir / "service_analyses"
if analyses_dir.exists():
for comp_name in component_map:
if comp_name not in changed_set:
md_file = analyses_dir / f"{comp_name}.md"
if md_file.exists():
try:
preloaded_analyses[comp_name] = md_file.read_text(
encoding="utf-8"
)
except OSError:
pass
if preloaded_analyses:
logger.info(
"Pre-loaded %d unchanged component analyses for incremental run",
len(preloaded_analyses),
)

return state.update(
components=component_map,
depth_order=depth_order,
current_depth=0,
total_depths=len(depth_order),
component_analyses={}, # Will be populated as we analyze
component_analyses=preloaded_analyses,
)


Expand All @@ -1285,6 +1313,8 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
"current_depth",
"component_analyses",
"service_name",
"analysis_mode",
"changed_components",
],
writes=["component_analyses", "current_depth"],
tags=[
Expand All @@ -1301,6 +1331,11 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
2. Run a component analyzer ReAct loop
3. Store the analysis result

In incremental mode (analysis_mode == "incremental"), components that
are NOT in changed_components are skipped — their pre-loaded analyses
from read_discovery are kept as-is. Only changed components (and any
at depth 0 without pre-loaded analysis) get re-analyzed.

Components at the same depth run concurrently on a ThreadPoolExecutor,
bounded by FLASHLIGHT_MAX_PARALLEL (default 4). Threads are appropriate
because the work is I/O-bound (LLM HTTP calls + subprocess grep/bash).
Expand All @@ -1322,20 +1357,39 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
current_depth = state.get("current_depth", 0)
analyses = dict(state.get("component_analyses", {}))
service_name = state.get("service_name", "unknown")
analysis_mode = state.get("analysis_mode", "full")
changed_components = state.get("changed_components", [])

if current_depth >= len(depth_order):
# No more depths to analyze
return state.update(current_depth=current_depth)

# In incremental mode, determine which components to skip.
# Unchanged components that already have a pre-loaded analysis are
# carried forward without re-running the LLM.
changed_set = set(changed_components) if analysis_mode == "incremental" else None

# Resolve the work items for this depth up-front. Upstream context is
# built from the already-complete `analyses` dict before any threads
# start, so worker threads never read mutable orchestrator state.
work_items: List[tuple[str, Dict[str, Any], str]] = []
skipped: List[str] = []
for comp_name in depth_order[current_depth]:
comp = components.get(comp_name)
if not comp:
logger.warning(f"Component not found in inventory: {comp_name}")
continue

# Skip unchanged components in incremental mode (they already have
# pre-loaded analysis from read_discovery).
if (
changed_set is not None
and comp_name not in changed_set
and comp_name in analyses
):
skipped.append(comp_name)
continue

upstream_context = _build_upstream_context(
comp.get("internal_dependencies", []), analyses
)
Expand All @@ -1348,6 +1402,8 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
depth=current_depth,
component_count=len(work_items),
component_names=[name for name, _, _ in work_items],
skipped_components=skipped,
skipped_count=len(skipped),
max_parallel=max_parallel,
pool_size=pool_size,
)
Expand Down Expand Up @@ -1738,9 +1794,22 @@ def build_analysis_pipeline(
"""

# Simple input action for analysis mode
@action(reads=[], writes=["task"], tags=["phase:input"])
def receive_analysis_input(state: State, task: str) -> State:
return state.update(task=task)
@action(
reads=[],
writes=["task", "analysis_mode", "changed_components"],
tags=["phase:input"],
)
def receive_analysis_input(
state: State,
task: str,
analysis_mode: str = "full",
changed_components: list | None = None,
) -> State:
return state.update(
task=task,
analysis_mode=analysis_mode,
changed_components=changed_components or [],
)

app = (
ApplicationBuilder()
Expand Down Expand Up @@ -1769,6 +1838,8 @@ def receive_analysis_input(state: State, task: str) -> State:
service_name=service_name,
synthesis_result="",
final_response="",
analysis_mode="full",
changed_components=[],
)
.with_tracker(project=project_name)
.build()
Expand Down
145 changes: 129 additions & 16 deletions agent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ def load_components(artifacts_dir: Path) -> list[dict]:


def git_diff_files(repo_path: Path, last_sha: str, head_sha: str) -> list[str]:
"""Get list of changed files between two commits."""
"""Get list of changed files between two commits.

Tries ``git diff`` first. If that fails (e.g. shallow clone missing
last_sha), falls back to the GitHub Compare API.
"""
result = subprocess.run(
["git", "diff", "--name-only", f"{last_sha}..{head_sha}"],
capture_output=True,
Expand All @@ -74,14 +78,71 @@ def git_diff_files(repo_path: Path, last_sha: str, head_sha: str) -> list[str]:
)
if result.returncode != 0:
logger.warning(
"git diff failed (rc=%d): %s — falling back to full analysis",
"git diff failed (rc=%d): %s — trying GitHub Compare API",
result.returncode,
result.stderr.strip(),
)
# Fallback: use GitHub Compare API if this is a GitHub repo
api_files = _github_diff_files(repo_path, last_sha, head_sha)
if api_files is not None:
return api_files
return []

return [f for f in result.stdout.strip().split("\n") if f]


def _github_diff_files(
repo_path: Path, last_sha: str, head_sha: str
) -> list[str] | None:
"""Fetch changed files between two commits via the GitHub Compare API.

Returns None if the repo is not a GitHub repo or the API call fails.
"""
# Try to extract owner/repo from the remote URL
try:
remote_result = subprocess.run(
["git", "remote", "get-url", "origin"],
capture_output=True,
text=True,
cwd=repo_path,
)
if remote_result.returncode != 0:
return None
remote_url = remote_result.stdout.strip()
# Handle both https://github.com/owner/repo.git and git@github.com:owner/repo.git
import re
match = re.search(r"github\.com[:/]([^/]+/[^/\s]+?)(?:\.git)?$", remote_url)
if not match:
return None
repo_slug = match.group(1)
except Exception:
return None

try:
gh_cmd = ["gh", "api", f"repos/{repo_slug}/compare/{last_sha}...{head_sha}",
"--jq", ".files[].filename"]
# Use private token if available for private repos
private_token = os.environ.get("FLASHLIGHT_REPO_TOKEN", "")
if private_token:
gh_cmd = ["gh", "api",
"-H", f"Authorization: token {private_token}",
f"repos/{repo_slug}/compare/{last_sha}...{head_sha}",
"--jq", ".files[].filename"]
api_result = subprocess.run(
gh_cmd,
capture_output=True,
text=True,
timeout=30,
)
if api_result.returncode != 0:
logger.warning("GitHub Compare API failed: %s", api_result.stderr.strip())
return None
return [f for f in api_result.stdout.strip().split("\n") if f]
except Exception as exc:
logger.warning("GitHub Compare API error: %s", exc)
return None


def map_files_to_components(
changed_files: list[str],
components: list[dict],
Expand Down Expand Up @@ -409,17 +470,38 @@ def analyze(
)
print()

# Prepare pipeline inputs
pipeline_inputs = {
"task": f"Analyze {service_name}",
"service_name": service_name,
}

# In incremental mode, copy existing service_analyses into the work dir
# so that read_discovery can pre-load unchanged component analyses.
# This also ensures upstream context is available for changed components
# that depend on unchanged ones.
if diff_context["mode"] == "incremental" and output.exists():
existing_analyses = output / "service_analyses"
target_analyses = work_dir / "service_analyses"
if existing_analyses.exists() and not target_analyses.exists():
shutil.copytree(existing_analyses, target_analyses)
print(f" Pre-copied existing analyses to {target_analyses}")
pipeline_inputs["analysis_mode"] = "incremental"
pipeline_inputs["changed_components"] = sorted(
diff_context.get("changed_components", [])
)
print(
f" Incremental: re-analyzing {len(pipeline_inputs['changed_components'])} changed component(s)"
)

try:
transcript.write_to_file(f"\nAnalyzing {service_name}...\n")

# Run the Burr analysis pipeline
# The pipeline: receive_input -> read_discovery -> analyze_current_depth (loop) -> synthesize -> respond
action, result, state = app.run(
halt_after=["respond"],
inputs={
"task": f"Analyze {service_name}",
"service_name": service_name,
},
inputs=pipeline_inputs,
)

# Get the response
Expand Down Expand Up @@ -516,9 +598,21 @@ def analyze(
src = tmp_artifacts / dirname
dst = output / dirname
if src.exists():
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
if dirname == "service_analyses" and diff_context["mode"] == "incremental" and dst.exists():
# Merge: copy individual files from src into dst, keeping
# unchanged analyses from the prior run intact.
for item in src.iterdir():
target = dst / item.name
if target.exists():
target.unlink()
if item.is_dir():
shutil.copytree(item, target)
else:
shutil.copy2(item, target)
else:
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)

# Copy manifest.json
manifest_src = tmp_artifacts / "manifest.json"
Expand Down Expand Up @@ -596,12 +690,15 @@ def _setup_project_dir(repo: Path, work_dir: Path) -> Path:
return project_dir


def clone_repo(url: str, target_dir: Path) -> Path:
def clone_repo(url: str, target_dir: Path, last_sha: str = "") -> Path:
"""Clone a git repository to a target directory.

Args:
url: Git repository URL (https://github.com/org/repo)
target_dir: Directory to clone into
last_sha: Previous commit SHA (for incremental). When provided,
the clone is deepened to include this commit so that
``git diff last_sha..head_sha`` works.

Returns:
Path to the cloned repository
Expand All @@ -625,14 +722,30 @@ def clone_repo(url: str, target_dir: Path) -> Path:
check=True,
capture_output=True,
)
# If we need a specific SHA for diffing, ensure it's available
if last_sha:
subprocess.run(
["git", "fetch", "--depth", "100", "origin", last_sha],
cwd=repo_path,
capture_output=True,
)
else:
print(f" Cloning {url} to {repo_path}...")
target_dir.mkdir(parents=True, exist_ok=True)
subprocess.run(
["git", "clone", "--depth", "1", url, str(repo_path)],
check=True,
capture_output=True,
)
if last_sha:
# Clone with enough depth to include last_sha for diffing.
# Use --no-single-branch so we get the full tip history.
subprocess.run(
["git", "clone", "--depth", "100", url, str(repo_path)],
check=True,
capture_output=True,
)
else:
subprocess.run(
["git", "clone", "--depth", "1", url, str(repo_path)],
check=True,
capture_output=True,
)

return repo_path

Expand Down Expand Up @@ -704,7 +817,7 @@ def main():
):
print("Detected repository URL, cloning...")
clone_dir = Path("/tmp/flashlight-repos")
repo_path = str(clone_repo(repo_path, clone_dir))
repo_path = str(clone_repo(repo_path, clone_dir, last_sha=args.last_sha))

analyze(
repo_path=repo_path,
Expand Down
Loading