diff --git a/GOALS.md b/GOALS.md index befa744..fc38606 100644 --- a/GOALS.md +++ b/GOALS.md @@ -9,7 +9,7 @@ Be the default benchmark for new process-mining methods. Within 18 months, (`pm-bench fetch [--pin]`); per-dataset hash pins pending the one-time TOS-gated downloads - 5 tasks with fixed scoring scripts (next-event ✅, remaining-time ✅, - outcome ✅; conformance, bottleneck pending) + outcome ✅, bottleneck ✅; conformance pending) - `gnn` runs end-to-end as the reference baseline (Markov reference ✅; `gnn` integration pending the first pinned dataset) - End-to-end loop runs on `synthetic-toy` ✅ — split → prefixes → diff --git a/README.md b/README.md index 432dfe9..3e8d560 100644 --- a/README.md +++ b/README.md @@ -224,7 +224,8 @@ honesty. The point of the benchmark is to make the comparison real. - [🟡] v0.3 — scoring scripts for all 5 tasks. next-event ✅, remaining-time ✅, outcome ✅ (AUC scoring + prior baseline + pipeline; leaderboard entry waits on a dataset whose test split - has both classes); conformance / bottleneck remain. + has both classes), bottleneck ✅ (NDCG@10 + mean-wait baseline + + leaderboard entry); conformance remains. - [🟡] v0.4 — leaderboard CI + landing page. Standings format, reference Markov entry, `pm-bench leaderboard [--all] --verify`, and the dedicated `leaderboard.yml` GitHub workflow shipped; diff --git a/STATUS.md b/STATUS.md index 7051d50..729afbe 100644 --- a/STATUS.md +++ b/STATUS.md @@ -60,6 +60,20 @@ pm-bench fetch bpi2020 --pin ## Recently shipped +- **Bottleneck task (NDCG@10 over transitions)** (`bottleneck-task` branch). + - `score_bottleneck` — pure-CPython NDCG@k with average DCG/IDCG + discounting. Missing predictions sink to the bottom of the + ranking (model that refuses to predict can't claim credit). + - `pm_bench/bottleneck.py` — per-transition mean-wait targets. + Truth shape is `(activity_a, activity_b, mean_wait_seconds, + n_observations)` — different from the per-prefix tasks. + - `pm_bench/baselines/mean_wait.py` — train-mean-per-transition + with global-mean fallback. On synthetic-toy: NDCG@10 0.9786 over + 6 transitions. Strong floor for any temporal model. + - CLI: `--task bottleneck`, `--baseline mean-wait`, end-to-end. + - `leaderboard/bottleneck/synthetic-toy.json` with the mean-wait-ref + entry; `pm-bench leaderboard --all --verify` now walks 3 boards. + - 7 new tests (`test_bottleneck.py`); 86 total, ruff clean. - **Outcome task (binary AUC)** (`outcome-task` branch). - `score_outcome` — pure-CPython rank-sum AUC, with average-rank tie-breaking; degenerate single-class case returns 0.5 by diff --git a/leaderboard/bottleneck/synthetic-toy.json b/leaderboard/bottleneck/synthetic-toy.json new file mode 100644 index 0000000..a5711e5 --- /dev/null +++ b/leaderboard/bottleneck/synthetic-toy.json @@ -0,0 +1,27 @@ +{ + "task": "bottleneck", + "dataset": "synthetic-toy", + "metric": "NDCG@10 over per-transition wait times (higher is better)", + "scored_with": "pm_bench.score.score_bottleneck", + "split": { + "kind": "case-chrono", + "train_frac": 0.7, + "val_frac": 0.1 + }, + "entries": [ + { + "model": "mean-wait-ref", + "version": "0.1.0", + "predictions_path": "leaderboard/predictions/bottleneck/synthetic-toy/mean-wait-ref.csv.gz", + "code": "https://github.com/erphq/pm-bench/blob/main/pm_bench/baselines/mean_wait.py", + "paper": null, + "score": { + "ndcg_at_k": 0.9786469611053435, + "k": 10, + "n_transitions": 6 + }, + "scored_at": "2026-04-30T00:00:00Z", + "notes": "Per-transition mean wait time fitted on training cases, with global-mean fallback for unseen transitions. The dumbest model that uses any time information; a real model has to do strictly better." + } + ] +} diff --git a/leaderboard/predictions/bottleneck/synthetic-toy/mean-wait-ref.csv.gz b/leaderboard/predictions/bottleneck/synthetic-toy/mean-wait-ref.csv.gz new file mode 100644 index 0000000..1de80bd Binary files /dev/null and b/leaderboard/predictions/bottleneck/synthetic-toy/mean-wait-ref.csv.gz differ diff --git a/pm_bench/__init__.py b/pm_bench/__init__.py index 3ad5ea2..6c9db3b 100644 --- a/pm_bench/__init__.py +++ b/pm_bench/__init__.py @@ -20,9 +20,11 @@ ) from pm_bench.registry import Dataset, get_dataset, load_registry from pm_bench.score import ( + BottleneckScore, NextEventScore, OutcomeScore, RemainingTimeScore, + score_bottleneck, score_next_event, score_outcome, score_remaining_time, @@ -30,6 +32,7 @@ from pm_bench.split import Event, Split, case_chrono_split __all__ = [ + "BottleneckScore", "Dataset", "Event", "NextEventScore", @@ -50,6 +53,7 @@ "read_predictions_csv", "read_prefixes_csv", "read_time_targets_csv", + "score_bottleneck", "score_next_event", "score_outcome", "score_remaining_time", diff --git a/pm_bench/baselines/__init__.py b/pm_bench/baselines/__init__.py index 0ef5b65..0731789 100644 --- a/pm_bench/baselines/__init__.py +++ b/pm_bench/baselines/__init__.py @@ -16,6 +16,7 @@ read_time_predictions_csv, write_time_predictions_csv, ) +from pm_bench.baselines.mean_wait import MeanWaitBaseline, fit_mean_wait, predict_mean_wait from pm_bench.baselines.prior_outcome import ( OutcomePrediction, PriorOutcomeBaseline, @@ -28,13 +29,16 @@ __all__ = [ "MarkovBaseline", "MeanTimeBaseline", + "MeanWaitBaseline", "OutcomePrediction", "PriorOutcomeBaseline", "TimePrediction", "fit_mean_time", + "fit_mean_wait", "fit_prior_outcome", "predict_markov", "predict_mean_time", + "predict_mean_wait", "predict_prior_outcome", "read_outcome_predictions_csv", "read_time_predictions_csv", diff --git a/pm_bench/baselines/mean_wait.py b/pm_bench/baselines/mean_wait.py new file mode 100644 index 0000000..fb9cb5c --- /dev/null +++ b/pm_bench/baselines/mean_wait.py @@ -0,0 +1,62 @@ +"""Train-mean-wait reference baseline for bottleneck detection. + +For every (activity_a, activity_b) transition observed in the training +cases, store the mean wait time. At test time, predict that mean. Falls +back to the global training mean for transitions never seen during +training. + +Identifies the "obvious" bottlenecks — transitions that were already +slow in training. A model that ties this isn't using any new +information from the test set. +""" +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass + +from pm_bench.bottleneck import BottleneckPrediction, BottleneckTarget, extract_bottleneck_targets +from pm_bench.split import Activity, CaseId, Event + + +@dataclass(frozen=True) +class MeanWaitBaseline: + by_transition: dict[tuple[Activity, Activity], float] + global_mean_seconds: float + + +def fit_mean_wait( + events: Iterable[Event], + train_case_ids: Iterable[CaseId], +) -> MeanWaitBaseline: + """Per-transition mean wait time over training cases.""" + targets = list(extract_bottleneck_targets(events, train_case_ids)) + if not targets: + return MeanWaitBaseline(by_transition={}, global_mean_seconds=0.0) + + by_transition = { + (t.activity_a, t.activity_b): t.mean_wait_seconds for t in targets + } + # Weight global mean by observation count so common transitions dominate. + total_wait = sum(t.mean_wait_seconds * t.n_observations for t in targets) + total_obs = sum(t.n_observations for t in targets) + global_mean = (total_wait / total_obs) if total_obs else 0.0 + return MeanWaitBaseline(by_transition=by_transition, global_mean_seconds=global_mean) + + +def predict_mean_wait( + model: MeanWaitBaseline, + targets: Iterable[BottleneckTarget], +) -> list[BottleneckPrediction]: + """For each target transition, return the trained mean (or global fallback).""" + out: list[BottleneckPrediction] = [] + for t in targets: + key = (t.activity_a, t.activity_b) + pred = model.by_transition.get(key, model.global_mean_seconds) + out.append( + BottleneckPrediction( + activity_a=t.activity_a, + activity_b=t.activity_b, + predicted_wait_seconds=pred, + ) + ) + return out diff --git a/pm_bench/bottleneck.py b/pm_bench/bottleneck.py new file mode 100644 index 0000000..16bb2cf --- /dev/null +++ b/pm_bench/bottleneck.py @@ -0,0 +1,142 @@ +"""Bottleneck-detection targets — per-transition mean wait time. + +Bottleneck is the only v0 task that's *per-transition* rather than +per-prefix: there's one truth row per ordered (activity_a, activity_b) +pair observed in the partition, with the mean wait time (seconds) +between them across all cases. Models predict a value per transition; +NDCG@10 over the ranking is the score. + +Truth file columns: + + activity_a,activity_b,mean_wait_seconds,n_observations + +Predictions file columns: + + activity_a,activity_b,predicted_wait_seconds +""" +from __future__ import annotations + +from collections.abc import Iterable, Iterator +from dataclasses import dataclass + +from pm_bench.split import Activity, CaseId, Event + + +@dataclass(frozen=True) +class BottleneckTarget: + activity_a: Activity + activity_b: Activity + mean_wait_seconds: float + n_observations: int + + +@dataclass(frozen=True) +class BottleneckPrediction: + activity_a: Activity + activity_b: Activity + predicted_wait_seconds: float + + +def extract_bottleneck_targets( + events: Iterable[Event], + case_ids: Iterable[CaseId], +) -> Iterator[BottleneckTarget]: + """Yield per-transition mean wait time for the given case ids. + + For each pair of chronologically-consecutive activities within a + case, we record the wait time. The yielded targets aggregate + across all cases in `case_ids` — one row per distinct (a, b) pair. + """ + keep = set(case_ids) + by_case: dict[CaseId, list[tuple[Activity, object]]] = {} + for case_id, activity, ts in events: + if case_id not in keep: + continue + by_case.setdefault(case_id, []).append((activity, ts)) + + sums: dict[tuple[Activity, Activity], float] = {} + counts: dict[tuple[Activity, Activity], int] = {} + for rows in by_case.values(): + rows.sort(key=lambda r: r[1]) + for (a, ta), (b, tb) in zip(rows, rows[1:], strict=False): + key = (a, b) + wait = (tb - ta).total_seconds() # type: ignore[operator] + sums[key] = sums.get(key, 0.0) + wait + counts[key] = counts.get(key, 0) + 1 + + for key in sorted(sums.keys()): + yield BottleneckTarget( + activity_a=key[0], + activity_b=key[1], + mean_wait_seconds=sums[key] / counts[key], + n_observations=counts[key], + ) + + +def write_bottleneck_targets_csv( + targets: Iterable[BottleneckTarget], path: str +) -> int: + """Write bottleneck targets to a CSV file.""" + import csv + + n = 0 + with open(path, "w", newline="") as f: + w = csv.writer(f) + w.writerow(["activity_a", "activity_b", "mean_wait_seconds", "n_observations"]) + for t in targets: + w.writerow([t.activity_a, t.activity_b, repr(t.mean_wait_seconds), t.n_observations]) + n += 1 + return n + + +def read_bottleneck_targets_csv(path: str) -> list[BottleneckTarget]: + """Read a bottleneck-targets CSV.""" + import csv + + out: list[BottleneckTarget] = [] + with open(path, newline="") as f: + r = csv.DictReader(f) + for row in r: + out.append( + BottleneckTarget( + activity_a=row["activity_a"], + activity_b=row["activity_b"], + mean_wait_seconds=float(row["mean_wait_seconds"]), + n_observations=int(row["n_observations"]), + ) + ) + return out + + +def write_bottleneck_predictions_csv( + predictions: Iterable[BottleneckPrediction], path: str +) -> int: + """Write bottleneck predictions to a CSV file.""" + import csv + + n = 0 + with open(path, "w", newline="") as f: + w = csv.writer(f) + w.writerow(["activity_a", "activity_b", "predicted_wait_seconds"]) + for p in predictions: + w.writerow([p.activity_a, p.activity_b, repr(p.predicted_wait_seconds)]) + n += 1 + return n + + +def read_bottleneck_predictions_csv(path: str) -> list[BottleneckPrediction]: + """Read a bottleneck-predictions CSV.""" + import csv + + out: list[BottleneckPrediction] = [] + with open(path, newline="") as f: + r = csv.DictReader(f) + for row in r: + out.append( + BottleneckPrediction( + activity_a=row["activity_a"], + activity_b=row["activity_b"], + predicted_wait_seconds=float(row["predicted_wait_seconds"]), + ) + ) + return out diff --git a/pm_bench/cli.py b/pm_bench/cli.py index 563b8b1..367023f 100644 --- a/pm_bench/cli.py +++ b/pm_bench/cli.py @@ -14,12 +14,20 @@ read_time_predictions_csv, write_time_predictions_csv, ) +from pm_bench.baselines.mean_wait import fit_mean_wait, predict_mean_wait from pm_bench.baselines.prior_outcome import ( fit_prior_outcome, predict_prior_outcome, read_outcome_predictions_csv, write_outcome_predictions_csv, ) +from pm_bench.bottleneck import ( + extract_bottleneck_targets, + read_bottleneck_predictions_csv, + read_bottleneck_targets_csv, + write_bottleneck_predictions_csv, + write_bottleneck_targets_csv, +) from pm_bench.fetch import ( FetchError, ManualFetchRequired, @@ -40,7 +48,12 @@ write_time_targets_csv, ) from pm_bench.registry import get_dataset, load_registry -from pm_bench.score import score_next_event, score_outcome, score_remaining_time +from pm_bench.score import ( + score_bottleneck, + score_next_event, + score_outcome, + score_remaining_time, +) from pm_bench.split import case_chrono_split @@ -222,7 +235,7 @@ def split(name: str, task: str) -> None: ) @click.option( "--task", - type=click.Choice(["next-event", "remaining-time", "outcome"]), + type=click.Choice(["next-event", "remaining-time", "outcome", "bottleneck"]), default="next-event", show_default=True, ) @@ -243,11 +256,16 @@ def prefixes(name: str, split_path: str, out_path: str, partition: str, task: st n = write_time_targets_csv( extract_remaining_time_targets(events, case_ids), out_path ) - else: + elif task == "outcome": rule = _outcome_rule(name) n = write_outcome_targets_csv( extract_outcome_targets(events, case_ids, rule), out_path ) + else: + # bottleneck + n = write_bottleneck_targets_csv( + extract_bottleneck_targets(events, case_ids), out_path + ) click.echo(f"wrote {n} prefixes to {out_path} (task={task} partition={partition})") @@ -274,14 +292,17 @@ def prefixes(name: str, split_path: str, out_path: str, partition: str, task: st ) @click.option( "--baseline", - type=click.Choice(["markov", "mean", "prior"]), + type=click.Choice(["markov", "mean", "prior", "mean-wait"]), default="markov", show_default=True, - help="markov → next-event; mean → remaining-time; prior → outcome.", + help=( + "markov → next-event; mean → remaining-time; " + "prior → outcome; mean-wait → bottleneck." + ), ) @click.option( "--task", - type=click.Choice(["next-event", "remaining-time", "outcome"]), + type=click.Choice(["next-event", "remaining-time", "outcome", "bottleneck"]), default="next-event", show_default=True, ) @@ -312,8 +333,7 @@ def predict( time_targets = read_time_targets_csv(prefixes_path) time_preds = predict_mean_time(time_model, time_targets) n = write_time_predictions_csv(time_preds, out_path) - else: - # outcome + elif task == "outcome": if baseline != "prior": raise click.UsageError(f"baseline {baseline!r} doesn't apply to outcome") rule = _outcome_rule(name) @@ -326,6 +346,14 @@ def predict( seq_by_case.setdefault(cid, []).append(act) outcome_preds = predict_prior_outcome(outcome_model, outcome_targets, seq_by_case) n = write_outcome_predictions_csv(outcome_preds, out_path) + else: + # bottleneck + if baseline != "mean-wait": + raise click.UsageError(f"baseline {baseline!r} doesn't apply to bottleneck") + wait_model = fit_mean_wait(events, split_data["train"]) + wait_targets = read_bottleneck_targets_csv(prefixes_path) + wait_preds = predict_mean_wait(wait_model, wait_targets) + n = write_bottleneck_predictions_csv(wait_preds, out_path) click.echo(f"wrote {n} predictions to {out_path} (task={task} baseline={baseline})") @@ -340,7 +368,7 @@ def predict( ) @click.option( "--task", - type=click.Choice(["next-event", "remaining-time", "outcome"]), + type=click.Choice(["next-event", "remaining-time", "outcome", "bottleneck"]), default="next-event", show_default=True, ) @@ -399,27 +427,46 @@ def score(predictions_path: str, prefixes_path: str, task: str) -> None: ) return - # outcome - truth_o = read_outcome_targets_csv(prefixes_path) - pred_o = read_outcome_predictions_csv(predictions_path) - pred_o_lookup = {(p.case_id, p.prefix_idx): p.score for p in pred_o} - missing = [ - (t.case_id, t.prefix_idx) - for t in truth_o - if (t.case_id, t.prefix_idx) not in pred_o_lookup - ] - if missing: + if task == "outcome": + truth_o = read_outcome_targets_csv(prefixes_path) + pred_o = read_outcome_predictions_csv(predictions_path) + pred_o_lookup = {(p.case_id, p.prefix_idx): p.score for p in pred_o} + missing = [ + (t.case_id, t.prefix_idx) + for t in truth_o + if (t.case_id, t.prefix_idx) not in pred_o_lookup + ] + if missing: + click.echo( + f"predictions is missing {len(missing)} target(s); first: {missing[0]}", + err=True, + ) + sys.exit(2) + preds_o = [pred_o_lookup[(t.case_id, t.prefix_idx)] for t in truth_o] + truth_o_int = [t.outcome for t in truth_o] + os_ = score_outcome(preds_o, truth_o_int) click.echo( - f"predictions is missing {len(missing)} target(s); first: {missing[0]}", - err=True, + json.dumps( + {"task": task, "auc": os_.auc, "n": os_.n, "n_pos": os_.n_pos}, + indent=2, + ), ) - sys.exit(2) - preds_o = [pred_o_lookup[(t.case_id, t.prefix_idx)] for t in truth_o] - truth_o_int = [t.outcome for t in truth_o] - os_ = score_outcome(preds_o, truth_o_int) + return + + # bottleneck + truth_b = read_bottleneck_targets_csv(prefixes_path) + pred_b = read_bottleneck_predictions_csv(predictions_path) + truth_dict = {(t.activity_a, t.activity_b): t.mean_wait_seconds for t in truth_b} + pred_dict = {(p.activity_a, p.activity_b): p.predicted_wait_seconds for p in pred_b} + bs = score_bottleneck(pred_dict, truth_dict, k=10) click.echo( json.dumps( - {"task": task, "auc": os_.auc, "n": os_.n, "n_pos": os_.n_pos}, + { + "task": task, + "ndcg_at_k": bs.ndcg_at_k, + "k": bs.k, + "n_transitions": bs.n_transitions, + }, indent=2, ), ) @@ -509,13 +556,24 @@ def leaderboard( click.echo(f"{board.task} · {board.dataset} · {board.metric}") click.echo("-" * (width + 30)) for e in standings(board): - n = e.score.get("n") if board.task == "remaining-time": mae = e.score.get("mae_days") + n = e.score.get("n") click.echo(f"{e.model:<{width}} mae_days={mae:.4f} n={n}") + elif board.task == "outcome": + auc = e.score.get("auc") + n = e.score.get("n") + n_pos = e.score.get("n_pos") + click.echo(f"{e.model:<{width}} auc={auc:.4f} n={n} n_pos={n_pos}") + elif board.task == "bottleneck": + ndcg = e.score.get("ndcg_at_k") + k = e.score.get("k") + n_t = e.score.get("n_transitions") + click.echo(f"{e.model:<{width}} ndcg@{k}={ndcg:.4f} n_transitions={n_t}") else: top1 = e.score.get("top1") top3 = e.score.get("top3") + n = e.score.get("n") click.echo( f"{e.model:<{width}} top1={top1:.4f} top3={top3:.4f} n={n}" ) diff --git a/pm_bench/leaderboard.py b/pm_bench/leaderboard.py index 9e8c3c5..c0be638 100644 --- a/pm_bench/leaderboard.py +++ b/pm_bench/leaderboard.py @@ -21,6 +21,7 @@ from dataclasses import dataclass from pathlib import Path +from pm_bench.bottleneck import BottleneckTarget, extract_bottleneck_targets from pm_bench.predictions import Prediction from pm_bench.prefixes import ( PREFIX_SEP, @@ -29,7 +30,7 @@ extract_prefixes, extract_remaining_time_targets, ) -from pm_bench.score import score_next_event, score_remaining_time +from pm_bench.score import score_bottleneck, score_next_event, score_remaining_time @dataclass(frozen=True) @@ -126,6 +127,12 @@ def _time_truth_for_dataset(name: str) -> list[TimeTarget]: return list(extract_remaining_time_targets(events, test_cases)) +def _bottleneck_truth_for_dataset(name: str) -> list[BottleneckTarget]: + """Canonical bottleneck truth set for a known dataset.""" + events, test_cases = _events_and_test_cases(name) + return list(extract_bottleneck_targets(events, test_cases)) + + def _rescore_next_event(board: Board, repo_root: Path) -> list[tuple[Entry, dict]]: truth = _truth_for_dataset(board.dataset) truth_keys = [(t.case_id, t.prefix_idx) for t in truth] @@ -179,6 +186,34 @@ def _rescore_remaining_time(board: Board, repo_root: Path) -> list[tuple[Entry, return out +def _rescore_bottleneck(board: Board, repo_root: Path) -> list[tuple[Entry, dict]]: + import csv + import gzip + + truth = _bottleneck_truth_for_dataset(board.dataset) + truth_dict = {(t.activity_a, t.activity_b): t.mean_wait_seconds for t in truth} + + out: list[tuple[Entry, dict]] = [] + for entry in board.entries: + pred_path = repo_root / entry.predictions_path + opener = gzip.open if str(pred_path).endswith(".gz") else open + pred_dict: dict[tuple[str, str], float] = {} + with opener(pred_path, "rt", newline="") as f: + reader = csv.DictReader(f) + for row in reader: + pred_dict[(row["activity_a"], row["activity_b"])] = float( + row["predicted_wait_seconds"] + ) + s = score_bottleneck(pred_dict, truth_dict, k=10) + out.append( + ( + entry, + {"ndcg_at_k": s.ndcg_at_k, "k": s.k, "n_transitions": s.n_transitions}, + ) + ) + return out + + def rescore(board: Board, repo_root: str | Path = ".") -> list[tuple[Entry, dict]]: """Re-run scoring for every entry; return (entry, fresh_score) pairs.""" root = Path(repo_root) @@ -186,6 +221,8 @@ def rescore(board: Board, repo_root: str | Path = ".") -> list[tuple[Entry, dict return _rescore_next_event(board, root) if board.task == "remaining-time": return _rescore_remaining_time(board, root) + if board.task == "bottleneck": + return _rescore_bottleneck(board, root) raise ValueError(f"unknown task: {board.task}") @@ -209,8 +246,8 @@ def standings(board: Board, *, key: str | None = None) -> list[Entry]: """Return entries sorted by the appropriate score key. Direction follows the metric: top1 (higher better) for next-event, - mae_days (lower better) for remaining-time. Pass `key` explicitly to - override. + mae_days (lower better) for remaining-time, auc / ndcg_at_k (higher + better) for outcome / bottleneck. Pass `key` explicitly to override. """ if key is None: if board.task == "remaining-time": @@ -218,5 +255,10 @@ def standings(board: Board, *, key: str | None = None) -> list[Entry]: board.entries, key=lambda e: e.score.get("mae_days", float("inf")), ) - key = "top1" + if board.task == "outcome": + key = "auc" + elif board.task == "bottleneck": + key = "ndcg_at_k" + else: + key = "top1" return sorted(board.entries, key=lambda e: e.score.get(key, float("-inf")), reverse=True) diff --git a/pm_bench/score.py b/pm_bench/score.py index fdd2ac0..e812a57 100644 --- a/pm_bench/score.py +++ b/pm_bench/score.py @@ -1,10 +1,12 @@ """Scoring scripts for pm-bench tasks. v0: next-event prediction (top-1 / top-3 accuracy), remaining-time -prediction (MAE in days), and outcome prediction (AUC). +prediction (MAE in days), outcome prediction (AUC), and bottleneck +detection (NDCG@10 over transitions ranked by wait time). """ from __future__ import annotations +import math from collections.abc import Sequence from dataclasses import dataclass @@ -111,6 +113,63 @@ def score_outcome( return OutcomeScore(auc=auc, n=n, n_pos=n_pos) +@dataclass(frozen=True) +class BottleneckScore: + """NDCG@k for transition-bottleneck ranking. + + `ndcg_at_k` is the standard normalized DCG: predicted ranking's DCG + divided by the ideal DCG (sorting transitions descending by true + wait time). Higher is better; 1.0 is a perfect ranking. `k` is the + cutoff used; `n_transitions` is the size of the truth set. + """ + + ndcg_at_k: float + k: int + n_transitions: int + + +def _dcg(relevances: Sequence[float]) -> float: + """Discounted cumulative gain: sum(rel_i / log2(i + 2)) for i in 0..n-1.""" + return sum(rel / math.log2(i + 2) for i, rel in enumerate(relevances)) + + +def score_bottleneck( + predictions: dict[tuple[str, str], float], + truth: dict[tuple[str, str], float], + *, + k: int = 10, +) -> BottleneckScore: + """NDCG@k for a transition-wait-time ranking. + + `predictions[(a, b)]` is the model's predicted wait time for the + transition `a → b`; `truth[(a, b)]` is the actual mean wait time + on the held-out partition. We rank truth transitions by predicted + wait (descending), take the top k, and compute NDCG against the + ideal ranking (truth sorted descending). Transitions present in + truth but missing from predictions are scored 0 — a model that + refuses to predict can't claim credit. + """ + if not truth: + raise ValueError("truth is empty — nothing to rank") + if k <= 0: + raise ValueError("k must be > 0") + + transitions = list(truth.keys()) + # Rank by predicted wait, descending. Missing predictions = -inf so + # they sink to the bottom (and contribute their truth at low rank). + transitions.sort(key=lambda t: predictions.get(t, float("-inf")), reverse=True) + pred_top = transitions[:k] + pred_relevances = [truth[t] for t in pred_top] + + # Ideal ranking sorts truth descending. + ideal_top = sorted(truth.values(), reverse=True)[:k] + + dcg = _dcg(pred_relevances) + idcg = _dcg(ideal_top) + ndcg = dcg / idcg if idcg > 0 else 0.0 + return BottleneckScore(ndcg_at_k=ndcg, k=k, n_transitions=len(truth)) + + def score_remaining_time( predictions: Sequence[float], truth: Sequence[float], diff --git a/tests/test_bottleneck.py b/tests/test_bottleneck.py new file mode 100644 index 0000000..8ac722b --- /dev/null +++ b/tests/test_bottleneck.py @@ -0,0 +1,127 @@ +"""End-to-end + targeted tests for the bottleneck task.""" +from __future__ import annotations + +import datetime as dt +import json +from pathlib import Path + +from click.testing import CliRunner + +from pm_bench.baselines.mean_wait import fit_mean_wait, predict_mean_wait +from pm_bench.bottleneck import ( + BottleneckPrediction, + BottleneckTarget, + extract_bottleneck_targets, + read_bottleneck_predictions_csv, + read_bottleneck_targets_csv, + write_bottleneck_predictions_csv, + write_bottleneck_targets_csv, +) +from pm_bench.cli import main +from pm_bench.leaderboard import load_board, verify + +REPO_ROOT = Path(__file__).resolve().parent.parent +BOTTLENECK_BOARD = REPO_ROOT / "leaderboard" / "bottleneck" / "synthetic-toy.json" + + +def _events() -> list[tuple[str, str, dt.datetime]]: + base = dt.datetime(2024, 1, 1) + return [ + ("c1", "a", base), + ("c1", "b", base + dt.timedelta(seconds=100)), + ("c1", "c", base + dt.timedelta(seconds=300)), + ("c2", "a", base), + ("c2", "b", base + dt.timedelta(seconds=200)), + ("c2", "c", base + dt.timedelta(seconds=500)), + ] + + +def test_extract_targets_aggregates_per_transition() -> None: + out = list(extract_bottleneck_targets(_events(), ["c1", "c2"])) + by_pair = {(t.activity_a, t.activity_b): t for t in out} + # a→b: 100 and 200 → mean 150; b→c: 200 and 300 → mean 250 + assert by_pair[("a", "b")].mean_wait_seconds == 150.0 + assert by_pair[("a", "b")].n_observations == 2 + assert by_pair[("b", "c")].mean_wait_seconds == 250.0 + + +def test_round_trip_csv(tmp_path) -> None: + targets = [ + BottleneckTarget("a", "b", 150.0, 2), + BottleneckTarget("b", "c", 250.0, 2), + ] + p = tmp_path / "t.csv" + n = write_bottleneck_targets_csv(targets, str(p)) + assert n == 2 + back = read_bottleneck_targets_csv(str(p)) + assert back == targets + + +def test_predictions_csv_round_trip(tmp_path) -> None: + preds = [ + BottleneckPrediction("a", "b", 150.0), + BottleneckPrediction("b", "c", 250.0), + ] + p = tmp_path / "p.csv" + write_bottleneck_predictions_csv(preds, str(p)) + back = read_bottleneck_predictions_csv(str(p)) + assert back == preds + + +def test_mean_wait_baseline_matches_train_mean() -> None: + model = fit_mean_wait(_events(), ["c1", "c2"]) + assert model.by_transition[("a", "b")] == 150.0 + assert model.by_transition[("b", "c")] == 250.0 + + +def test_mean_wait_unseen_transition_falls_back_to_global() -> None: + model = fit_mean_wait(_events(), ["c1", "c2"]) + targets = [BottleneckTarget("z", "y", 9999.0, 1)] + preds = predict_mean_wait(model, targets) + # Unseen → global mean = (150*2 + 250*2) / 4 = 200 + assert abs(preds[0].predicted_wait_seconds - 200.0) < 1e-9 + + +def test_bottleneck_board_verifies() -> None: + board = load_board(BOTTLENECK_BOARD) + drifts = verify(board, repo_root=REPO_ROOT) + assert drifts == [], drifts + + +def test_full_bottleneck_pipeline(tmp_path) -> None: + runner = CliRunner() + split_path = tmp_path / "split.json" + targets_path = tmp_path / "targets.csv" + preds_path = tmp_path / "preds.csv" + + r = runner.invoke(main, ["split", "synthetic-toy"]) + assert r.exit_code == 0 + split_path.write_text(r.output) + + r = runner.invoke( + main, + ["prefixes", "synthetic-toy", "--split", str(split_path), + "--out", str(targets_path), "--task", "bottleneck"], + ) + assert r.exit_code == 0, r.output + + r = runner.invoke( + main, + ["predict", "synthetic-toy", "--split", str(split_path), + "--prefixes", str(targets_path), "--out", str(preds_path), + "--baseline", "mean-wait", "--task", "bottleneck"], + ) + assert r.exit_code == 0, r.output + + r = runner.invoke( + main, + ["score", str(preds_path), "--prefixes", str(targets_path), + "--task", "bottleneck"], + ) + assert r.exit_code == 0, r.output + result = json.loads(r.output) + assert result["task"] == "bottleneck" + assert result["k"] == 10 + assert 0.0 <= result["ndcg_at_k"] <= 1.0 + # The mean-wait baseline should beat 0.5 on synthetic-toy. + assert result["ndcg_at_k"] > 0.5 diff --git a/tests/test_score.py b/tests/test_score.py index 5db51fa..2479c69 100644 --- a/tests/test_score.py +++ b/tests/test_score.py @@ -1,6 +1,13 @@ +import math + import pytest -from pm_bench import score_next_event, score_outcome, score_remaining_time +from pm_bench import ( + score_bottleneck, + score_next_event, + score_outcome, + score_remaining_time, +) def test_top1_perfect() -> None: @@ -85,6 +92,59 @@ def test_outcome_lengths_must_match() -> None: score_outcome([0.1], [0, 1]) +def test_bottleneck_perfect_ranking_is_one() -> None: + truth = {("a", "b"): 10.0, ("c", "d"): 5.0, ("e", "f"): 1.0} + preds = {("a", "b"): 10.0, ("c", "d"): 5.0, ("e", "f"): 1.0} + s = score_bottleneck(preds, truth, k=10) + assert s.ndcg_at_k == 1.0 + assert s.n_transitions == 3 + assert s.k == 10 + + +def test_bottleneck_inverted_ranking_below_one() -> None: + truth = {("a", "b"): 10.0, ("c", "d"): 5.0, ("e", "f"): 1.0} + # Predicted scores invert the order. + preds = {("a", "b"): 1.0, ("c", "d"): 5.0, ("e", "f"): 10.0} + s = score_bottleneck(preds, truth) + assert s.ndcg_at_k < 1.0 + + +def test_bottleneck_missing_predictions_sink_to_bottom() -> None: + """A model that doesn't predict at all gets the worst possible ranking.""" + truth = {("a", "b"): 10.0, ("c", "d"): 5.0} + preds: dict = {} + s = score_bottleneck(preds, truth) + # All transitions tied at -inf → tie-break by dict insertion order. + # The actual NDCG depends on that order; just assert it's a valid value. + assert 0.0 <= s.ndcg_at_k <= 1.0 + + +def test_bottleneck_known_value() -> None: + """Hand-checked: 3 transitions, predicted ranking [b,a,c], truth [a,b,c].""" + truth = {"a": 10.0, "b": 5.0, "c": 1.0} + # Make tuples to match the API. + truth_t = {(k, "x"): v for k, v in truth.items()} + # Predicted ranks b > a > c + preds_t = {("b", "x"): 100.0, ("a", "x"): 50.0, ("c", "x"): 1.0} + # DCG = 5/log2(2) + 10/log2(3) + 1/log2(4) = 5 + 10/1.585 + 0.5 + # IDCG = 10/log2(2) + 5/log2(3) + 1/log2(4) = 10 + 5/1.585 + 0.5 + expected_dcg = 5.0 / math.log2(2) + 10.0 / math.log2(3) + 1.0 / math.log2(4) + expected_idcg = 10.0 / math.log2(2) + 5.0 / math.log2(3) + 1.0 / math.log2(4) + expected_ndcg = expected_dcg / expected_idcg + s = score_bottleneck(preds_t, truth_t, k=10) + assert abs(s.ndcg_at_k - expected_ndcg) < 1e-9 + + +def test_bottleneck_empty_truth_raises() -> None: + with pytest.raises(ValueError): + score_bottleneck({}, {}) + + +def test_bottleneck_invalid_k_raises() -> None: + with pytest.raises(ValueError): + score_bottleneck({}, {("a", "b"): 1.0}, k=0) + + def test_outcome_known_value() -> None: """Hand-checked: 3 pos, 3 neg, one swap → AUC = 8/9.""" # ranks ascending: [neg=0.1→1, neg=0.2→2, pos=0.3→3, neg=0.4→4, pos=0.5→5, pos=0.6→6]