Skip to content

Commit a066fb4

Browse files
authored
fix: discovery auditing && updated README (#15)
1 parent 3d73793 commit a066fb4

16 files changed

Lines changed: 2026 additions & 352 deletions

README.md

Lines changed: 114 additions & 246 deletions
Large diffs are not rendered by default.

agent/burr_app.py

Lines changed: 170 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,23 @@
1111
receive_input -> call_llm -> [execute_tools -> call_llm]* -> respond
1212
1313
ANALYSIS MODE (headless codebase analysis):
14-
receive_input -> read_discovery -> analyze_depth_N -> ... -> synthesize -> respond
14+
receive_input -> read_discovery -> analyze_current_depth (loop) -> synthesize -> respond
1515
16-
Each analyze_depth_N action:
17-
- Runs component analyzers in parallel for all components at that depth
16+
Each analyze_current_depth invocation:
17+
- Runs component analyzers for all components at the current depth,
18+
bounded by FLASHLIGHT_MAX_PARALLEL (default 4) via a ThreadPoolExecutor
1819
- Each component analyzer is a ReAct loop (call_llm <-> execute_tools)
19-
- Waits for all to complete before transitioning to next depth
20+
wrapped in its own Burr Application for per-subagent UI visibility
21+
- Waits for all components at the current depth to complete before
22+
advancing to the next depth
2023
2124
Multi-agent visibility:
2225
The Burr UI shows:
2326
- receive_input: Initial task
2427
- read_discovery: Load components.json and analysis_order.json
25-
- analyze_depth_0: Parallel analysis of depth-0 components
26-
- analyze_depth_1: Parallel analysis of depth-1 components (with upstream context)
27-
- ... (more depth levels as needed)
28+
- analyze_current_depth (once per depth): components at that depth, capped
29+
by FLASHLIGHT_MAX_PARALLEL. Per-component detail lives in each subagent's
30+
own tracked Application.
2831
- synthesize: Architecture documentation synthesis
2932
- respond: Final output
3033
"""
@@ -890,7 +893,26 @@ def _chat_completion(
890893
response=response,
891894
)
892895

893-
# Non-retryable errors (4xx except 429)
896+
# Non-retryable errors (4xx except 429) — surface the
897+
# upstream error body so failures are actionable instead of
898+
# a bare "400 Bad Request".
899+
if response.status_code >= 400:
900+
try:
901+
err_body: Any = response.json()
902+
except Exception:
903+
err_body = response.text[:2000]
904+
approx_chars = sum(
905+
len(str(m.get("content", ""))) for m in messages
906+
)
907+
logger.error(
908+
"LLM %d from %s (model=%s, msgs=%d, approx_chars=%d): %s",
909+
response.status_code,
910+
base_url,
911+
model,
912+
len(messages),
913+
approx_chars,
914+
err_body,
915+
)
894916
response.raise_for_status()
895917
data = response.json()
896918

@@ -1272,17 +1294,28 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
12721294
],
12731295
)
12741296
def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
1275-
"""Analyze all components at the current depth level.
1297+
"""Analyze all components at the current depth level in parallel.
12761298
12771299
For each component at this depth:
12781300
1. Build upstream context from already-analyzed dependencies
12791301
2. Run a component analyzer ReAct loop
12801302
3. Store the analysis result
12811303
1282-
Components at the same depth are analyzed sequentially in this implementation.
1283-
(Future: could use threading for true parallelism)
1304+
Components at the same depth run concurrently on a ThreadPoolExecutor,
1305+
bounded by FLASHLIGHT_MAX_PARALLEL (default 4). Threads are appropriate
1306+
because the work is I/O-bound (LLM HTTP calls + subprocess grep/bash).
1307+
Each subagent is an independent Burr Application with its own httpx
1308+
client, so no shared-state locking is required.
1309+
1310+
Results are merged back into ``component_analyses`` in deterministic
1311+
(depth-order) sequence so the downstream synthesis prompt is
1312+
reproducible even though futures complete in arbitrary order.
1313+
1314+
Set ``FLASHLIGHT_MAX_PARALLEL=1`` to opt out of parallelism (useful for
1315+
single-instance local backends like Ollama, or to avoid rate limits on
1316+
small API tiers).
12841317
"""
1285-
from pathlib import Path
1318+
from concurrent.futures import ThreadPoolExecutor, as_completed
12861319

12871320
components = state.get("components", {})
12881321
depth_order = state.get("depth_order", [])
@@ -1294,64 +1327,125 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
12941327
# No more depths to analyze
12951328
return state.update(current_depth=current_depth)
12961329

1297-
component_names = depth_order[current_depth]
1298-
1299-
__tracer.log_attributes(
1300-
depth=current_depth,
1301-
component_count=len(component_names),
1302-
component_names=component_names,
1303-
)
1304-
1305-
for comp_name in component_names:
1330+
# Resolve the work items for this depth up-front. Upstream context is
1331+
# built from the already-complete `analyses` dict before any threads
1332+
# start, so worker threads never read mutable orchestrator state.
1333+
work_items: List[tuple[str, Dict[str, Any], str]] = []
1334+
for comp_name in depth_order[current_depth]:
13061335
comp = components.get(comp_name)
13071336
if not comp:
13081337
logger.warning(f"Component not found in inventory: {comp_name}")
13091338
continue
1339+
upstream_context = _build_upstream_context(
1340+
comp.get("internal_dependencies", []), analyses
1341+
)
1342+
work_items.append((comp_name, comp, upstream_context))
13101343

1311-
with __tracer(f"analyze:{comp_name}") as t:
1312-
# Build upstream context from dependencies
1313-
upstream_context = ""
1314-
deps = comp.get("internal_dependencies", [])
1315-
if deps:
1316-
context_parts = []
1317-
for dep_name in deps:
1318-
if dep_name in analyses:
1319-
# Extract summary from the analysis
1320-
analysis = analyses[dep_name]
1321-
summary = _extract_summary(analysis)
1322-
context_parts.append(f"### {dep_name}\n{summary}")
1323-
upstream_context = "\n\n".join(context_parts)
1324-
1325-
t.log_attributes(
1326-
component_kind=comp.get("kind", "unknown"),
1327-
component_type=comp.get("type", "unknown"),
1328-
dependency_count=len(deps),
1329-
has_upstream_context=bool(upstream_context),
1330-
)
1344+
max_parallel = _get_max_parallel()
1345+
pool_size = max(1, min(max_parallel, len(work_items))) if work_items else 1
13311346

1332-
# Run the component analyzer
1333-
analysis_result = _run_component_analyzer(
1334-
component=comp,
1335-
service_name=service_name,
1336-
upstream_context=upstream_context,
1337-
tracer=t,
1338-
)
1347+
__tracer.log_attributes(
1348+
depth=current_depth,
1349+
component_count=len(work_items),
1350+
component_names=[name for name, _, _ in work_items],
1351+
max_parallel=max_parallel,
1352+
pool_size=pool_size,
1353+
)
13391354

1340-
analyses[comp_name] = analysis_result
1355+
results: Dict[str, str] = {}
1356+
if work_items:
1357+
with ThreadPoolExecutor(
1358+
max_workers=pool_size,
1359+
thread_name_prefix=f"flashlight-depth{current_depth}",
1360+
) as pool:
1361+
future_to_name = {
1362+
pool.submit(
1363+
_run_component_analyzer,
1364+
component=comp,
1365+
service_name=service_name,
1366+
upstream_context=upstream_context,
1367+
tracer=None,
1368+
): name
1369+
for name, comp, upstream_context in work_items
1370+
}
1371+
for future in as_completed(future_to_name):
1372+
name = future_to_name[future]
1373+
try:
1374+
results[name] = future.result()
1375+
except Exception as exc:
1376+
logger.error(
1377+
"Component analyzer for %s crashed: %s",
1378+
name,
1379+
exc,
1380+
exc_info=True,
1381+
)
1382+
# Record the failure as an Error-prefixed string so the
1383+
# rest of the pipeline (synthesis, logging) behaves
1384+
# identically to the sequential-path error shape.
1385+
results[name] = f"Error: component analyzer crashed: {exc}"
1386+
1387+
# Merge results back in deterministic work-item order so downstream
1388+
# synthesis sees a stable component ordering across runs.
1389+
successes: List[str] = []
1390+
failures: List[str] = []
1391+
for comp_name, comp, _ in work_items:
1392+
result = results.get(comp_name, "")
1393+
analyses[comp_name] = result
1394+
if result and not result.startswith("Error"):
1395+
successes.append(comp_name)
1396+
else:
1397+
failures.append(comp_name)
13411398

1342-
t.log_attributes(
1343-
analysis_length=len(analysis_result) if analysis_result else 0,
1344-
success=bool(
1345-
analysis_result and not analysis_result.startswith("Error")
1346-
),
1347-
)
1399+
__tracer.log_attributes(
1400+
successful_components=successes,
1401+
failed_components=failures,
1402+
success_count=len(successes),
1403+
failure_count=len(failures),
1404+
)
13481405

13491406
return state.update(
13501407
component_analyses=analyses,
13511408
current_depth=current_depth + 1,
13521409
)
13531410

13541411

1412+
def _build_upstream_context(deps: List[str], analyses: Dict[str, str]) -> str:
1413+
"""Build an upstream_context prompt fragment from prior analyses.
1414+
1415+
For each dependency that has a completed analysis in ``analyses``,
1416+
emit a section with the dependency name and its summary. Dependencies
1417+
without an analysis (missing from inventory, failed earlier) are
1418+
silently skipped — the subagent simply gets less context.
1419+
"""
1420+
if not deps:
1421+
return ""
1422+
parts: List[str] = []
1423+
for dep_name in deps:
1424+
analysis = analyses.get(dep_name)
1425+
if analysis:
1426+
parts.append(f"### {dep_name}\n{_extract_summary(analysis)}")
1427+
return "\n\n".join(parts)
1428+
1429+
1430+
def _get_max_parallel() -> int:
1431+
"""Read FLASHLIGHT_MAX_PARALLEL, clamped to sensible bounds.
1432+
1433+
Defaults to 4 (reasonable for paid OpenAI-compatible tiers and
1434+
most self-hosted vLLM/LM Studio backends). Set to 1 to serialize.
1435+
Negative / non-integer values fall back to the default.
1436+
"""
1437+
raw = os.environ.get("FLASHLIGHT_MAX_PARALLEL", "4")
1438+
try:
1439+
n = int(raw)
1440+
except (TypeError, ValueError):
1441+
logger.warning(
1442+
"FLASHLIGHT_MAX_PARALLEL=%r is not an integer; using default 4",
1443+
raw,
1444+
)
1445+
return 4
1446+
return max(1, n)
1447+
1448+
13551449
def _extract_summary(analysis: str) -> str:
13561450
"""Extract a brief summary from a component analysis."""
13571451
# Look for a summary section or take first few paragraphs
@@ -1434,7 +1528,7 @@ def _run_component_analyzer(
14341528
system_prompt = f"You are a component-analyzer for the {service_name} codebase."
14351529

14361530
# Run as a proper Burr application for UI visibility
1437-
return _run_subagent_as_app(
1531+
analysis = _run_subagent_as_app(
14381532
system_prompt=system_prompt,
14391533
user_prompt=prompt,
14401534
subagent_type="component-analyzer",
@@ -1443,6 +1537,26 @@ def _run_component_analyzer(
14431537
parent_sequence_id=parent_sequence_id,
14441538
)
14451539

1540+
# Persist the analysis to disk ourselves. We don't trust the LLM to call
1541+
# write_file — empirically it drops that tool call ~75% of the time,
1542+
# especially on smaller models, silently throwing away the analysis.
1543+
# Saving in the orchestrator guarantees every completed subagent produces
1544+
# a .md on disk regardless of which tool calls the model chose to make.
1545+
if analysis and not analysis.startswith("Error"):
1546+
out_path = Path(f"/tmp/{service_name}/service_analyses/{comp_name}.md")
1547+
try:
1548+
out_path.parent.mkdir(parents=True, exist_ok=True)
1549+
out_path.write_text(analysis, encoding="utf-8")
1550+
except OSError as exc:
1551+
logger.error(
1552+
"Failed to persist analysis for %s to %s: %s",
1553+
comp_name,
1554+
out_path,
1555+
exc,
1556+
)
1557+
1558+
return analysis
1559+
14461560

14471561
@action(
14481562
reads=["component_analyses", "service_name"],

0 commit comments

Comments
 (0)