From ba75e4cf3d4e01e8b70974f431906eca737223ab Mon Sep 17 00:00:00 2001 From: hankbob Date: Fri, 1 May 2026 21:48:32 +0700 Subject: [PATCH] fix: incremental analysis now skips unchanged components Previously, the sweep CI regenerated ALL component analyses on every run even when --last-sha was provided. Two root causes: 1. analyze_current_depth iterated over ALL components regardless of diff_context.changed_components. The incremental hint was only a text note in the prompt that the LLM could ignore. 2. clone_repo used --depth 1 (shallow clone), so git diff last_sha..head_sha failed silently because the shallow clone didn't contain last_sha, falling back to full analysis. Fixes: - analyze_current_depth now reads analysis_mode and changed_components from pipeline state. In incremental mode, unchanged components with pre-loaded analyses are skipped entirely (no LLM calls). - read_discovery pre-populates component_analyses from existing service_analyses/ for unchanged components, so upstream context is available for changed components that depend on them. - receive_analysis_input passes analysis_mode and changed_components into pipeline state from CLI inputs. - clone_repo uses --depth 100 when last_sha is provided, and fetches the specific SHA if the repo already exists. - git_diff_files falls back to the GitHub Compare API when local git diff fails (e.g. shallow clone missing the base commit). - Artifact copy now merges service_analyses/ in incremental mode instead of nuking and replacing the entire directory. --- agent/burr_app.py | 83 ++++++++++++++++++++++++-- agent/cli.py | 145 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 206 insertions(+), 22 deletions(-) diff --git a/agent/burr_app.py b/agent/burr_app.py index bcfe355..c1f7db8 100644 --- a/agent/burr_app.py +++ b/agent/burr_app.py @@ -1206,7 +1206,7 @@ def respond(state: State) -> State: @action( - reads=["service_name"], + reads=["service_name", "analysis_mode", "changed_components"], writes=[ "components", "depth_order", @@ -1227,11 +1227,15 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State: - /tmp/{service_name}/service_discovery/components.json - /tmp/{service_name}/dependency_graphs/analysis_order.json - Populates state with components and depth ordering for analysis. + In incremental mode, also pre-populates component_analyses from + existing artifacts for components that have NOT changed, so that + analyze_current_depth can skip re-running them. """ from pathlib import Path service_name = state.get("service_name", "unknown") + analysis_mode = state.get("analysis_mode", "full") + changed_components = state.get("changed_components", []) work_dir = Path(f"/tmp/{service_name}") components_file = work_dir / "service_discovery" / "components.json" analysis_order_file = work_dir / "dependency_graphs" / "analysis_order.json" @@ -1269,12 +1273,36 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State: # Build component lookup by name component_map = {c["name"]: c for c in components} + # Pre-populate analyses for unchanged components in incremental mode + # so analyze_current_depth can skip them while still having context + # for downstream dependencies. + preloaded_analyses: Dict[str, str] = {} + if analysis_mode == "incremental" and changed_components: + changed_set = set(changed_components) + analyses_dir = work_dir / "service_analyses" + if analyses_dir.exists(): + for comp_name in component_map: + if comp_name not in changed_set: + md_file = analyses_dir / f"{comp_name}.md" + if md_file.exists(): + try: + preloaded_analyses[comp_name] = md_file.read_text( + encoding="utf-8" + ) + except OSError: + pass + if preloaded_analyses: + logger.info( + "Pre-loaded %d unchanged component analyses for incremental run", + len(preloaded_analyses), + ) + return state.update( components=component_map, depth_order=depth_order, current_depth=0, total_depths=len(depth_order), - component_analyses={}, # Will be populated as we analyze + component_analyses=preloaded_analyses, ) @@ -1285,6 +1313,8 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State: "current_depth", "component_analyses", "service_name", + "analysis_mode", + "changed_components", ], writes=["component_analyses", "current_depth"], tags=[ @@ -1301,6 +1331,11 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State: 2. Run a component analyzer ReAct loop 3. Store the analysis result + In incremental mode (analysis_mode == "incremental"), components that + are NOT in changed_components are skipped — their pre-loaded analyses + from read_discovery are kept as-is. Only changed components (and any + at depth 0 without pre-loaded analysis) get re-analyzed. + Components at the same depth run concurrently on a ThreadPoolExecutor, bounded by FLASHLIGHT_MAX_PARALLEL (default 4). Threads are appropriate because the work is I/O-bound (LLM HTTP calls + subprocess grep/bash). @@ -1322,20 +1357,39 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State: current_depth = state.get("current_depth", 0) analyses = dict(state.get("component_analyses", {})) service_name = state.get("service_name", "unknown") + analysis_mode = state.get("analysis_mode", "full") + changed_components = state.get("changed_components", []) if current_depth >= len(depth_order): # No more depths to analyze return state.update(current_depth=current_depth) + # In incremental mode, determine which components to skip. + # Unchanged components that already have a pre-loaded analysis are + # carried forward without re-running the LLM. + changed_set = set(changed_components) if analysis_mode == "incremental" else None + # Resolve the work items for this depth up-front. Upstream context is # built from the already-complete `analyses` dict before any threads # start, so worker threads never read mutable orchestrator state. work_items: List[tuple[str, Dict[str, Any], str]] = [] + skipped: List[str] = [] for comp_name in depth_order[current_depth]: comp = components.get(comp_name) if not comp: logger.warning(f"Component not found in inventory: {comp_name}") continue + + # Skip unchanged components in incremental mode (they already have + # pre-loaded analysis from read_discovery). + if ( + changed_set is not None + and comp_name not in changed_set + and comp_name in analyses + ): + skipped.append(comp_name) + continue + upstream_context = _build_upstream_context( comp.get("internal_dependencies", []), analyses ) @@ -1348,6 +1402,8 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State: depth=current_depth, component_count=len(work_items), component_names=[name for name, _, _ in work_items], + skipped_components=skipped, + skipped_count=len(skipped), max_parallel=max_parallel, pool_size=pool_size, ) @@ -1738,9 +1794,22 @@ def build_analysis_pipeline( """ # Simple input action for analysis mode - @action(reads=[], writes=["task"], tags=["phase:input"]) - def receive_analysis_input(state: State, task: str) -> State: - return state.update(task=task) + @action( + reads=[], + writes=["task", "analysis_mode", "changed_components"], + tags=["phase:input"], + ) + def receive_analysis_input( + state: State, + task: str, + analysis_mode: str = "full", + changed_components: list | None = None, + ) -> State: + return state.update( + task=task, + analysis_mode=analysis_mode, + changed_components=changed_components or [], + ) app = ( ApplicationBuilder() @@ -1769,6 +1838,8 @@ def receive_analysis_input(state: State, task: str) -> State: service_name=service_name, synthesis_result="", final_response="", + analysis_mode="full", + changed_components=[], ) .with_tracker(project=project_name) .build() diff --git a/agent/cli.py b/agent/cli.py index 9bdc4b3..3429019 100644 --- a/agent/cli.py +++ b/agent/cli.py @@ -65,7 +65,11 @@ def load_components(artifacts_dir: Path) -> list[dict]: def git_diff_files(repo_path: Path, last_sha: str, head_sha: str) -> list[str]: - """Get list of changed files between two commits.""" + """Get list of changed files between two commits. + + Tries ``git diff`` first. If that fails (e.g. shallow clone missing + last_sha), falls back to the GitHub Compare API. + """ result = subprocess.run( ["git", "diff", "--name-only", f"{last_sha}..{head_sha}"], capture_output=True, @@ -74,14 +78,71 @@ def git_diff_files(repo_path: Path, last_sha: str, head_sha: str) -> list[str]: ) if result.returncode != 0: logger.warning( - "git diff failed (rc=%d): %s — falling back to full analysis", + "git diff failed (rc=%d): %s — trying GitHub Compare API", result.returncode, result.stderr.strip(), ) + # Fallback: use GitHub Compare API if this is a GitHub repo + api_files = _github_diff_files(repo_path, last_sha, head_sha) + if api_files is not None: + return api_files return [] + return [f for f in result.stdout.strip().split("\n") if f] +def _github_diff_files( + repo_path: Path, last_sha: str, head_sha: str +) -> list[str] | None: + """Fetch changed files between two commits via the GitHub Compare API. + + Returns None if the repo is not a GitHub repo or the API call fails. + """ + # Try to extract owner/repo from the remote URL + try: + remote_result = subprocess.run( + ["git", "remote", "get-url", "origin"], + capture_output=True, + text=True, + cwd=repo_path, + ) + if remote_result.returncode != 0: + return None + remote_url = remote_result.stdout.strip() + # Handle both https://github.com/owner/repo.git and git@github.com:owner/repo.git + import re + match = re.search(r"github\.com[:/]([^/]+/[^/\s]+?)(?:\.git)?$", remote_url) + if not match: + return None + repo_slug = match.group(1) + except Exception: + return None + + try: + gh_cmd = ["gh", "api", f"repos/{repo_slug}/compare/{last_sha}...{head_sha}", + "--jq", ".files[].filename"] + # Use private token if available for private repos + private_token = os.environ.get("FLASHLIGHT_REPO_TOKEN", "") + if private_token: + gh_cmd = ["gh", "api", + "-H", f"Authorization: token {private_token}", + f"repos/{repo_slug}/compare/{last_sha}...{head_sha}", + "--jq", ".files[].filename"] + api_result = subprocess.run( + gh_cmd, + capture_output=True, + text=True, + timeout=30, + ) + if api_result.returncode != 0: + logger.warning("GitHub Compare API failed: %s", api_result.stderr.strip()) + return None + return [f for f in api_result.stdout.strip().split("\n") if f] + except Exception as exc: + logger.warning("GitHub Compare API error: %s", exc) + return None + + def map_files_to_components( changed_files: list[str], components: list[dict], @@ -409,6 +470,30 @@ def analyze( ) print() + # Prepare pipeline inputs + pipeline_inputs = { + "task": f"Analyze {service_name}", + "service_name": service_name, + } + + # In incremental mode, copy existing service_analyses into the work dir + # so that read_discovery can pre-load unchanged component analyses. + # This also ensures upstream context is available for changed components + # that depend on unchanged ones. + if diff_context["mode"] == "incremental" and output.exists(): + existing_analyses = output / "service_analyses" + target_analyses = work_dir / "service_analyses" + if existing_analyses.exists() and not target_analyses.exists(): + shutil.copytree(existing_analyses, target_analyses) + print(f" Pre-copied existing analyses to {target_analyses}") + pipeline_inputs["analysis_mode"] = "incremental" + pipeline_inputs["changed_components"] = sorted( + diff_context.get("changed_components", []) + ) + print( + f" Incremental: re-analyzing {len(pipeline_inputs['changed_components'])} changed component(s)" + ) + try: transcript.write_to_file(f"\nAnalyzing {service_name}...\n") @@ -416,10 +501,7 @@ def analyze( # The pipeline: receive_input -> read_discovery -> analyze_current_depth (loop) -> synthesize -> respond action, result, state = app.run( halt_after=["respond"], - inputs={ - "task": f"Analyze {service_name}", - "service_name": service_name, - }, + inputs=pipeline_inputs, ) # Get the response @@ -516,9 +598,21 @@ def analyze( src = tmp_artifacts / dirname dst = output / dirname if src.exists(): - if dst.exists(): - shutil.rmtree(dst) - shutil.copytree(src, dst) + if dirname == "service_analyses" and diff_context["mode"] == "incremental" and dst.exists(): + # Merge: copy individual files from src into dst, keeping + # unchanged analyses from the prior run intact. + for item in src.iterdir(): + target = dst / item.name + if target.exists(): + target.unlink() + if item.is_dir(): + shutil.copytree(item, target) + else: + shutil.copy2(item, target) + else: + if dst.exists(): + shutil.rmtree(dst) + shutil.copytree(src, dst) # Copy manifest.json manifest_src = tmp_artifacts / "manifest.json" @@ -596,12 +690,15 @@ def _setup_project_dir(repo: Path, work_dir: Path) -> Path: return project_dir -def clone_repo(url: str, target_dir: Path) -> Path: +def clone_repo(url: str, target_dir: Path, last_sha: str = "") -> Path: """Clone a git repository to a target directory. Args: url: Git repository URL (https://github.com/org/repo) target_dir: Directory to clone into + last_sha: Previous commit SHA (for incremental). When provided, + the clone is deepened to include this commit so that + ``git diff last_sha..head_sha`` works. Returns: Path to the cloned repository @@ -625,14 +722,30 @@ def clone_repo(url: str, target_dir: Path) -> Path: check=True, capture_output=True, ) + # If we need a specific SHA for diffing, ensure it's available + if last_sha: + subprocess.run( + ["git", "fetch", "--depth", "100", "origin", last_sha], + cwd=repo_path, + capture_output=True, + ) else: print(f" Cloning {url} to {repo_path}...") target_dir.mkdir(parents=True, exist_ok=True) - subprocess.run( - ["git", "clone", "--depth", "1", url, str(repo_path)], - check=True, - capture_output=True, - ) + if last_sha: + # Clone with enough depth to include last_sha for diffing. + # Use --no-single-branch so we get the full tip history. + subprocess.run( + ["git", "clone", "--depth", "100", url, str(repo_path)], + check=True, + capture_output=True, + ) + else: + subprocess.run( + ["git", "clone", "--depth", "1", url, str(repo_path)], + check=True, + capture_output=True, + ) return repo_path @@ -704,7 +817,7 @@ def main(): ): print("Detected repository URL, cloning...") clone_dir = Path("/tmp/flashlight-repos") - repo_path = str(clone_repo(repo_path, clone_dir)) + repo_path = str(clone_repo(repo_path, clone_dir, last_sha=args.last_sha)) analyze( repo_path=repo_path,