Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions paperbanana/analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Run analytics helpers for PaperBanana artifacts."""

from paperbanana.analytics.aggregates import AnalyticsSummary, summarize_records
from paperbanana.analytics.loader import load_analytics_records
from paperbanana.analytics.reporting import render_markdown_summary

__all__ = [
"AnalyticsSummary",
"load_analytics_records",
"render_markdown_summary",
"summarize_records",
]
35 changes: 35 additions & 0 deletions paperbanana/analytics/aggregates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Aggregate analytics records into KPI summaries."""

from __future__ import annotations

from paperbanana.analytics.models import AnalyticsRecord, AnalyticsSummary


def _bump(counter: dict[str, int], key: str | None) -> None:
if not key:
return
counter[key] = counter.get(key, 0) + 1


def summarize_records(records: list[AnalyticsRecord]) -> AnalyticsSummary:
"""Compute summary metrics from analytics records."""
summary = AnalyticsSummary()
for record in records:
summary.total_records += 1
if record.status == "success":
summary.success_records += 1
elif record.status == "failed":
summary.failed_records += 1

summary.total_seconds += float(record.total_seconds)
if record.cost_usd is not None:
summary.total_cost_usd += float(record.cost_usd)
summary.cost_record_count += 1

_bump(summary.source_type_counts, record.source_type)
_bump(summary.vlm_provider_counts, record.vlm_provider)
_bump(summary.image_provider_counts, record.image_provider)

summary.total_seconds = round(summary.total_seconds, 3)
summary.total_cost_usd = round(summary.total_cost_usd, 6)
return summary
176 changes: 176 additions & 0 deletions paperbanana/analytics/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Load analytics records from run/batch/orchestration artifacts."""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any

from paperbanana.analytics.models import AnalyticsRecord


def _safe_load_json(path: Path) -> dict[str, Any] | None:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
return payload


def _as_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default


def _extract_metadata_cost(payload: dict[str, Any]) -> float | None:
candidates: list[Any] = []
candidates.append(payload.get("total_cost_usd"))
cost_tracking = payload.get("cost_tracking")
if isinstance(cost_tracking, dict):
candidates.append(cost_tracking.get("total_cost"))
candidates.append(cost_tracking.get("total_cost_usd"))
for item in candidates:
if item is None:
continue
try:
return float(item)
except (TypeError, ValueError):
continue
return None


def _load_run_metadata(path: Path) -> list[AnalyticsRecord]:
payload = _safe_load_json(path)
if payload is None:
return []

timing = payload.get("timing")
timing_seconds = 0.0
if isinstance(timing, dict):
timing_seconds = _as_float(timing.get("total_seconds"), 0.0)

config_snapshot = payload.get("config_snapshot")
vlm_provider: str | None = None
image_provider: str | None = None
if isinstance(config_snapshot, dict):
if config_snapshot.get("vlm_provider"):
vlm_provider = str(config_snapshot["vlm_provider"])
if config_snapshot.get("image_provider"):
image_provider = str(config_snapshot["image_provider"])

if payload.get("vlm_provider"):
vlm_provider = str(payload.get("vlm_provider"))
if payload.get("image_provider"):
image_provider = str(payload.get("image_provider"))

run_id = str(payload.get("run_id") or path.parent.name)
return [
AnalyticsRecord(
source_type="run",
source_path=str(path),
source_id=run_id,
status="success",
total_seconds=timing_seconds,
cost_usd=_extract_metadata_cost(payload),
vlm_provider=vlm_provider,
image_provider=image_provider,
)
]


def _load_batch_report(path: Path) -> list[AnalyticsRecord]:
payload = _safe_load_json(path)
if payload is None:
return []
batch_id = str(payload.get("batch_id") or path.parent.name)
batch_seconds = _as_float(payload.get("total_seconds"), 0.0)
items = payload.get("items")
if not isinstance(items, list):
return []
if not items:
return []
avg_item_seconds = batch_seconds / len(items) if len(items) else 0.0
records: list[AnalyticsRecord] = []
for item in items:
if not isinstance(item, dict):
continue
item_id = str(item.get("id") or "item")
status = str(item.get("status") or "unknown")
if status == "running":
status = "failed"
records.append(
AnalyticsRecord(
source_type="batch_item",
source_path=str(path),
source_id=f"{batch_id}:{item_id}",
status=status,
total_seconds=avg_item_seconds,
cost_usd=None,
)
)
return records


def _load_orchestration_report(path: Path) -> list[AnalyticsRecord]:
payload = _safe_load_json(path)
if payload is None:
return []
orchestrate_id = str(payload.get("orchestration_id") or path.parent.name)
total_seconds = _as_float(payload.get("total_seconds"), 0.0)
generated_items = payload.get("generated_items")
failures = payload.get("failures")
if not isinstance(generated_items, list):
generated_items = []
if not isinstance(failures, list):
failures = []
total_items = len(generated_items) + len(failures)
avg_item_seconds = total_seconds / total_items if total_items else 0.0

records: list[AnalyticsRecord] = []
for item in generated_items:
if not isinstance(item, dict):
continue
records.append(
AnalyticsRecord(
source_type="orchestration_item",
source_path=str(path),
source_id=f"{orchestrate_id}:{item.get('id', 'item')}",
status="success",
total_seconds=avg_item_seconds,
cost_usd=None,
)
)
for item in failures:
if not isinstance(item, dict):
continue
records.append(
AnalyticsRecord(
source_type="orchestration_item",
source_path=str(path),
source_id=f"{orchestrate_id}:{item.get('id', 'item')}",
status="failed",
total_seconds=avg_item_seconds,
cost_usd=None,
)
)
return records


def load_analytics_records(root_path: str | Path) -> list[AnalyticsRecord]:
"""Load normalized analytics records from an outputs root."""
root = Path(root_path).resolve()
if not root.exists():
raise FileNotFoundError(f"Path not found: {root}")
records: list[AnalyticsRecord] = []
for path in root.rglob("metadata.json"):
records.extend(_load_run_metadata(path))
for path in root.rglob("batch_report.json"):
records.extend(_load_batch_report(path))
for path in root.rglob("figure_package.json"):
records.extend(_load_orchestration_report(path))
records.sort(key=lambda x: (x.source_type, x.source_id))
return records
34 changes: 34 additions & 0 deletions paperbanana/analytics/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Typed models for analytics records and summaries."""

from __future__ import annotations

from dataclasses import dataclass, field


@dataclass(slots=True)
class AnalyticsRecord:
"""Normalized analytics event extracted from run artifacts."""

source_type: str
source_path: str
source_id: str
status: str
total_seconds: float
cost_usd: float | None = None
vlm_provider: str | None = None
image_provider: str | None = None


@dataclass(slots=True)
class AnalyticsSummary:
"""Aggregated KPIs over a set of analytics records."""

total_records: int = 0
success_records: int = 0
failed_records: int = 0
total_seconds: float = 0.0
total_cost_usd: float = 0.0
cost_record_count: int = 0
source_type_counts: dict[str, int] = field(default_factory=dict)
vlm_provider_counts: dict[str, int] = field(default_factory=dict)
image_provider_counts: dict[str, int] = field(default_factory=dict)
69 changes: 69 additions & 0 deletions paperbanana/analytics/reporting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Render analytics summaries in JSON-friendly and markdown forms."""

from __future__ import annotations

from paperbanana.analytics.models import AnalyticsSummary


def summary_to_dict(summary: AnalyticsSummary) -> dict[str, object]:
success_rate = (
(summary.success_records / summary.total_records) if summary.total_records > 0 else 0.0
)
return {
"total_records": summary.total_records,
"success_records": summary.success_records,
"failed_records": summary.failed_records,
"success_rate": round(success_rate, 4),
"total_seconds": round(summary.total_seconds, 3),
"mean_seconds": round(summary.total_seconds / summary.total_records, 3)
if summary.total_records
else 0.0,
"total_cost_usd": round(summary.total_cost_usd, 6),
"cost_record_count": summary.cost_record_count,
"source_type_counts": dict(sorted(summary.source_type_counts.items())),
"vlm_provider_counts": dict(sorted(summary.vlm_provider_counts.items())),
"image_provider_counts": dict(sorted(summary.image_provider_counts.items())),
}


def render_markdown_summary(summary: AnalyticsSummary) -> str:
"""Render a compact markdown summary."""
payload = summary_to_dict(summary)
lines = [
"# Run Analytics Summary",
"",
f"- Total records: **{payload['total_records']}**",
f"- Success records: **{payload['success_records']}**",
f"- Failed records: **{payload['failed_records']}**",
f"- Success rate: **{payload['success_rate']:.2%}**",
f"- Total seconds: **{payload['total_seconds']}**",
f"- Mean seconds/record: **{payload['mean_seconds']}**",
f"- Total cost (USD): **{payload['total_cost_usd']}**",
f"- Records with explicit cost: **{payload['cost_record_count']}**",
"",
"## Source Types",
]
source_counts: dict[str, int] = payload["source_type_counts"] # type: ignore[assignment]
if source_counts:
for key, value in source_counts.items():
lines.append(f"- {key}: {value}")
else:
lines.append("- (none)")
lines.append("")
lines.append("## Providers")
vlm_counts: dict[str, int] = payload["vlm_provider_counts"] # type: ignore[assignment]
image_counts: dict[str, int] = payload["image_provider_counts"] # type: ignore[assignment]
lines.append("- VLM providers:")
if vlm_counts:
for key, value in vlm_counts.items():
lines.append(f" - {key}: {value}")
else:
lines.append(" - (none)")
lines.append("- Image providers:")
if image_counts:
for key, value in image_counts.items():
lines.append(f" - {key}: {value}")
else:
lines.append(" - (none)")

return "\n".join(lines) + "\n"
42 changes: 42 additions & 0 deletions paperbanana/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
from rich.prompt import Prompt
from rich.table import Table

from paperbanana.analytics import (
load_analytics_records,
render_markdown_summary,
summarize_records,
)
from paperbanana.core.config import Settings
from paperbanana.core.logging import configure_logging
from paperbanana.core.types import (
Expand Down Expand Up @@ -3360,6 +3365,43 @@ def validate_manifest(
raise typer.Exit(1)


@app.command("analytics")
def analytics(
path: str = typer.Option(
"outputs",
"--path",
"-p",
help=(
"Root directory to scan for metadata.json, batch_report.json, and figure_package.json."
),
),
format: str = typer.Option(
"markdown",
"--format",
"-f",
help="Output format: markdown or json.",
),
) -> None:
"""Analyze historical run artifacts and report aggregate cost/latency/success KPIs."""
output_format = format.lower().strip()
if output_format not in {"markdown", "json"}:
console.print(f"[red]Error: --format must be 'markdown' or 'json'. Got: {format}[/red]")
raise typer.Exit(1)
try:
records = load_analytics_records(Path(path))
except FileNotFoundError as exc:
console.print(f"[red]Error: {exc}[/red]")
raise typer.Exit(1)

summary = summarize_records(records)
if output_format == "json":
from paperbanana.analytics.reporting import summary_to_dict

console.print_json(json_mod.dumps(summary_to_dict(summary), indent=2))
else:
console.print(render_markdown_summary(summary))


@app.command("show-config")
def show_config(
json_output: bool = typer.Option(False, "--json", help="Emit resolved config as JSON"),
Expand Down
Loading
Loading