Skip to content

Commit ba75e4c

Browse files
fix: incremental analysis now skips unchanged components
Previously, the sweep CI regenerated ALL component analyses on every run even when --last-sha was provided. Two root causes: 1. analyze_current_depth iterated over ALL components regardless of diff_context.changed_components. The incremental hint was only a text note in the prompt that the LLM could ignore. 2. clone_repo used --depth 1 (shallow clone), so git diff last_sha..head_sha failed silently because the shallow clone didn't contain last_sha, falling back to full analysis. Fixes: - analyze_current_depth now reads analysis_mode and changed_components from pipeline state. In incremental mode, unchanged components with pre-loaded analyses are skipped entirely (no LLM calls). - read_discovery pre-populates component_analyses from existing service_analyses/ for unchanged components, so upstream context is available for changed components that depend on them. - receive_analysis_input passes analysis_mode and changed_components into pipeline state from CLI inputs. - clone_repo uses --depth 100 when last_sha is provided, and fetches the specific SHA if the repo already exists. - git_diff_files falls back to the GitHub Compare API when local git diff fails (e.g. shallow clone missing the base commit). - Artifact copy now merges service_analyses/ in incremental mode instead of nuking and replacing the entire directory.
1 parent a066fb4 commit ba75e4c

2 files changed

Lines changed: 206 additions & 22 deletions

File tree

agent/burr_app.py

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,7 +1206,7 @@ def respond(state: State) -> State:
12061206

12071207

12081208
@action(
1209-
reads=["service_name"],
1209+
reads=["service_name", "analysis_mode", "changed_components"],
12101210
writes=[
12111211
"components",
12121212
"depth_order",
@@ -1227,11 +1227,15 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
12271227
- /tmp/{service_name}/service_discovery/components.json
12281228
- /tmp/{service_name}/dependency_graphs/analysis_order.json
12291229
1230-
Populates state with components and depth ordering for analysis.
1230+
In incremental mode, also pre-populates component_analyses from
1231+
existing artifacts for components that have NOT changed, so that
1232+
analyze_current_depth can skip re-running them.
12311233
"""
12321234
from pathlib import Path
12331235

12341236
service_name = state.get("service_name", "unknown")
1237+
analysis_mode = state.get("analysis_mode", "full")
1238+
changed_components = state.get("changed_components", [])
12351239
work_dir = Path(f"/tmp/{service_name}")
12361240
components_file = work_dir / "service_discovery" / "components.json"
12371241
analysis_order_file = work_dir / "dependency_graphs" / "analysis_order.json"
@@ -1269,12 +1273,36 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
12691273
# Build component lookup by name
12701274
component_map = {c["name"]: c for c in components}
12711275

1276+
# Pre-populate analyses for unchanged components in incremental mode
1277+
# so analyze_current_depth can skip them while still having context
1278+
# for downstream dependencies.
1279+
preloaded_analyses: Dict[str, str] = {}
1280+
if analysis_mode == "incremental" and changed_components:
1281+
changed_set = set(changed_components)
1282+
analyses_dir = work_dir / "service_analyses"
1283+
if analyses_dir.exists():
1284+
for comp_name in component_map:
1285+
if comp_name not in changed_set:
1286+
md_file = analyses_dir / f"{comp_name}.md"
1287+
if md_file.exists():
1288+
try:
1289+
preloaded_analyses[comp_name] = md_file.read_text(
1290+
encoding="utf-8"
1291+
)
1292+
except OSError:
1293+
pass
1294+
if preloaded_analyses:
1295+
logger.info(
1296+
"Pre-loaded %d unchanged component analyses for incremental run",
1297+
len(preloaded_analyses),
1298+
)
1299+
12721300
return state.update(
12731301
components=component_map,
12741302
depth_order=depth_order,
12751303
current_depth=0,
12761304
total_depths=len(depth_order),
1277-
component_analyses={}, # Will be populated as we analyze
1305+
component_analyses=preloaded_analyses,
12781306
)
12791307

12801308

@@ -1285,6 +1313,8 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
12851313
"current_depth",
12861314
"component_analyses",
12871315
"service_name",
1316+
"analysis_mode",
1317+
"changed_components",
12881318
],
12891319
writes=["component_analyses", "current_depth"],
12901320
tags=[
@@ -1301,6 +1331,11 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
13011331
2. Run a component analyzer ReAct loop
13021332
3. Store the analysis result
13031333
1334+
In incremental mode (analysis_mode == "incremental"), components that
1335+
are NOT in changed_components are skipped — their pre-loaded analyses
1336+
from read_discovery are kept as-is. Only changed components (and any
1337+
at depth 0 without pre-loaded analysis) get re-analyzed.
1338+
13041339
Components at the same depth run concurrently on a ThreadPoolExecutor,
13051340
bounded by FLASHLIGHT_MAX_PARALLEL (default 4). Threads are appropriate
13061341
because the work is I/O-bound (LLM HTTP calls + subprocess grep/bash).
@@ -1322,20 +1357,39 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
13221357
current_depth = state.get("current_depth", 0)
13231358
analyses = dict(state.get("component_analyses", {}))
13241359
service_name = state.get("service_name", "unknown")
1360+
analysis_mode = state.get("analysis_mode", "full")
1361+
changed_components = state.get("changed_components", [])
13251362

13261363
if current_depth >= len(depth_order):
13271364
# No more depths to analyze
13281365
return state.update(current_depth=current_depth)
13291366

1367+
# In incremental mode, determine which components to skip.
1368+
# Unchanged components that already have a pre-loaded analysis are
1369+
# carried forward without re-running the LLM.
1370+
changed_set = set(changed_components) if analysis_mode == "incremental" else None
1371+
13301372
# Resolve the work items for this depth up-front. Upstream context is
13311373
# built from the already-complete `analyses` dict before any threads
13321374
# start, so worker threads never read mutable orchestrator state.
13331375
work_items: List[tuple[str, Dict[str, Any], str]] = []
1376+
skipped: List[str] = []
13341377
for comp_name in depth_order[current_depth]:
13351378
comp = components.get(comp_name)
13361379
if not comp:
13371380
logger.warning(f"Component not found in inventory: {comp_name}")
13381381
continue
1382+
1383+
# Skip unchanged components in incremental mode (they already have
1384+
# pre-loaded analysis from read_discovery).
1385+
if (
1386+
changed_set is not None
1387+
and comp_name not in changed_set
1388+
and comp_name in analyses
1389+
):
1390+
skipped.append(comp_name)
1391+
continue
1392+
13391393
upstream_context = _build_upstream_context(
13401394
comp.get("internal_dependencies", []), analyses
13411395
)
@@ -1348,6 +1402,8 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
13481402
depth=current_depth,
13491403
component_count=len(work_items),
13501404
component_names=[name for name, _, _ in work_items],
1405+
skipped_components=skipped,
1406+
skipped_count=len(skipped),
13511407
max_parallel=max_parallel,
13521408
pool_size=pool_size,
13531409
)
@@ -1738,9 +1794,22 @@ def build_analysis_pipeline(
17381794
"""
17391795

17401796
# Simple input action for analysis mode
1741-
@action(reads=[], writes=["task"], tags=["phase:input"])
1742-
def receive_analysis_input(state: State, task: str) -> State:
1743-
return state.update(task=task)
1797+
@action(
1798+
reads=[],
1799+
writes=["task", "analysis_mode", "changed_components"],
1800+
tags=["phase:input"],
1801+
)
1802+
def receive_analysis_input(
1803+
state: State,
1804+
task: str,
1805+
analysis_mode: str = "full",
1806+
changed_components: list | None = None,
1807+
) -> State:
1808+
return state.update(
1809+
task=task,
1810+
analysis_mode=analysis_mode,
1811+
changed_components=changed_components or [],
1812+
)
17441813

17451814
app = (
17461815
ApplicationBuilder()
@@ -1769,6 +1838,8 @@ def receive_analysis_input(state: State, task: str) -> State:
17691838
service_name=service_name,
17701839
synthesis_result="",
17711840
final_response="",
1841+
analysis_mode="full",
1842+
changed_components=[],
17721843
)
17731844
.with_tracker(project=project_name)
17741845
.build()

agent/cli.py

Lines changed: 129 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ def load_components(artifacts_dir: Path) -> list[dict]:
6565

6666

6767
def git_diff_files(repo_path: Path, last_sha: str, head_sha: str) -> list[str]:
68-
"""Get list of changed files between two commits."""
68+
"""Get list of changed files between two commits.
69+
70+
Tries ``git diff`` first. If that fails (e.g. shallow clone missing
71+
last_sha), falls back to the GitHub Compare API.
72+
"""
6973
result = subprocess.run(
7074
["git", "diff", "--name-only", f"{last_sha}..{head_sha}"],
7175
capture_output=True,
@@ -74,14 +78,71 @@ def git_diff_files(repo_path: Path, last_sha: str, head_sha: str) -> list[str]:
7478
)
7579
if result.returncode != 0:
7680
logger.warning(
77-
"git diff failed (rc=%d): %s — falling back to full analysis",
81+
"git diff failed (rc=%d): %s — trying GitHub Compare API",
7882
result.returncode,
7983
result.stderr.strip(),
8084
)
85+
# Fallback: use GitHub Compare API if this is a GitHub repo
86+
api_files = _github_diff_files(repo_path, last_sha, head_sha)
87+
if api_files is not None:
88+
return api_files
8189
return []
90+
8291
return [f for f in result.stdout.strip().split("\n") if f]
8392

8493

94+
def _github_diff_files(
95+
repo_path: Path, last_sha: str, head_sha: str
96+
) -> list[str] | None:
97+
"""Fetch changed files between two commits via the GitHub Compare API.
98+
99+
Returns None if the repo is not a GitHub repo or the API call fails.
100+
"""
101+
# Try to extract owner/repo from the remote URL
102+
try:
103+
remote_result = subprocess.run(
104+
["git", "remote", "get-url", "origin"],
105+
capture_output=True,
106+
text=True,
107+
cwd=repo_path,
108+
)
109+
if remote_result.returncode != 0:
110+
return None
111+
remote_url = remote_result.stdout.strip()
112+
# Handle both https://github.com/owner/repo.git and git@github.com:owner/repo.git
113+
import re
114+
match = re.search(r"github\.com[:/]([^/]+/[^/\s]+?)(?:\.git)?$", remote_url)
115+
if not match:
116+
return None
117+
repo_slug = match.group(1)
118+
except Exception:
119+
return None
120+
121+
try:
122+
gh_cmd = ["gh", "api", f"repos/{repo_slug}/compare/{last_sha}...{head_sha}",
123+
"--jq", ".files[].filename"]
124+
# Use private token if available for private repos
125+
private_token = os.environ.get("FLASHLIGHT_REPO_TOKEN", "")
126+
if private_token:
127+
gh_cmd = ["gh", "api",
128+
"-H", f"Authorization: token {private_token}",
129+
f"repos/{repo_slug}/compare/{last_sha}...{head_sha}",
130+
"--jq", ".files[].filename"]
131+
api_result = subprocess.run(
132+
gh_cmd,
133+
capture_output=True,
134+
text=True,
135+
timeout=30,
136+
)
137+
if api_result.returncode != 0:
138+
logger.warning("GitHub Compare API failed: %s", api_result.stderr.strip())
139+
return None
140+
return [f for f in api_result.stdout.strip().split("\n") if f]
141+
except Exception as exc:
142+
logger.warning("GitHub Compare API error: %s", exc)
143+
return None
144+
145+
85146
def map_files_to_components(
86147
changed_files: list[str],
87148
components: list[dict],
@@ -409,17 +470,38 @@ def analyze(
409470
)
410471
print()
411472

473+
# Prepare pipeline inputs
474+
pipeline_inputs = {
475+
"task": f"Analyze {service_name}",
476+
"service_name": service_name,
477+
}
478+
479+
# In incremental mode, copy existing service_analyses into the work dir
480+
# so that read_discovery can pre-load unchanged component analyses.
481+
# This also ensures upstream context is available for changed components
482+
# that depend on unchanged ones.
483+
if diff_context["mode"] == "incremental" and output.exists():
484+
existing_analyses = output / "service_analyses"
485+
target_analyses = work_dir / "service_analyses"
486+
if existing_analyses.exists() and not target_analyses.exists():
487+
shutil.copytree(existing_analyses, target_analyses)
488+
print(f" Pre-copied existing analyses to {target_analyses}")
489+
pipeline_inputs["analysis_mode"] = "incremental"
490+
pipeline_inputs["changed_components"] = sorted(
491+
diff_context.get("changed_components", [])
492+
)
493+
print(
494+
f" Incremental: re-analyzing {len(pipeline_inputs['changed_components'])} changed component(s)"
495+
)
496+
412497
try:
413498
transcript.write_to_file(f"\nAnalyzing {service_name}...\n")
414499

415500
# Run the Burr analysis pipeline
416501
# The pipeline: receive_input -> read_discovery -> analyze_current_depth (loop) -> synthesize -> respond
417502
action, result, state = app.run(
418503
halt_after=["respond"],
419-
inputs={
420-
"task": f"Analyze {service_name}",
421-
"service_name": service_name,
422-
},
504+
inputs=pipeline_inputs,
423505
)
424506

425507
# Get the response
@@ -516,9 +598,21 @@ def analyze(
516598
src = tmp_artifacts / dirname
517599
dst = output / dirname
518600
if src.exists():
519-
if dst.exists():
520-
shutil.rmtree(dst)
521-
shutil.copytree(src, dst)
601+
if dirname == "service_analyses" and diff_context["mode"] == "incremental" and dst.exists():
602+
# Merge: copy individual files from src into dst, keeping
603+
# unchanged analyses from the prior run intact.
604+
for item in src.iterdir():
605+
target = dst / item.name
606+
if target.exists():
607+
target.unlink()
608+
if item.is_dir():
609+
shutil.copytree(item, target)
610+
else:
611+
shutil.copy2(item, target)
612+
else:
613+
if dst.exists():
614+
shutil.rmtree(dst)
615+
shutil.copytree(src, dst)
522616

523617
# Copy manifest.json
524618
manifest_src = tmp_artifacts / "manifest.json"
@@ -596,12 +690,15 @@ def _setup_project_dir(repo: Path, work_dir: Path) -> Path:
596690
return project_dir
597691

598692

599-
def clone_repo(url: str, target_dir: Path) -> Path:
693+
def clone_repo(url: str, target_dir: Path, last_sha: str = "") -> Path:
600694
"""Clone a git repository to a target directory.
601695
602696
Args:
603697
url: Git repository URL (https://github.com/org/repo)
604698
target_dir: Directory to clone into
699+
last_sha: Previous commit SHA (for incremental). When provided,
700+
the clone is deepened to include this commit so that
701+
``git diff last_sha..head_sha`` works.
605702
606703
Returns:
607704
Path to the cloned repository
@@ -625,14 +722,30 @@ def clone_repo(url: str, target_dir: Path) -> Path:
625722
check=True,
626723
capture_output=True,
627724
)
725+
# If we need a specific SHA for diffing, ensure it's available
726+
if last_sha:
727+
subprocess.run(
728+
["git", "fetch", "--depth", "100", "origin", last_sha],
729+
cwd=repo_path,
730+
capture_output=True,
731+
)
628732
else:
629733
print(f" Cloning {url} to {repo_path}...")
630734
target_dir.mkdir(parents=True, exist_ok=True)
631-
subprocess.run(
632-
["git", "clone", "--depth", "1", url, str(repo_path)],
633-
check=True,
634-
capture_output=True,
635-
)
735+
if last_sha:
736+
# Clone with enough depth to include last_sha for diffing.
737+
# Use --no-single-branch so we get the full tip history.
738+
subprocess.run(
739+
["git", "clone", "--depth", "100", url, str(repo_path)],
740+
check=True,
741+
capture_output=True,
742+
)
743+
else:
744+
subprocess.run(
745+
["git", "clone", "--depth", "1", url, str(repo_path)],
746+
check=True,
747+
capture_output=True,
748+
)
636749

637750
return repo_path
638751

@@ -704,7 +817,7 @@ def main():
704817
):
705818
print("Detected repository URL, cloning...")
706819
clone_dir = Path("/tmp/flashlight-repos")
707-
repo_path = str(clone_repo(repo_path, clone_dir))
820+
repo_path = str(clone_repo(repo_path, clone_dir, last_sha=args.last_sha))
708821

709822
analyze(
710823
repo_path=repo_path,

0 commit comments

Comments
 (0)