diff --git a/GOALS.md b/GOALS.md index 5b799a7..befa744 100644 --- a/GOALS.md +++ b/GOALS.md @@ -8,8 +8,8 @@ Be the default benchmark for new process-mining methods. Within 18 months, - 7 datasets fetchable + hash-verified — fetch/hash machinery shipped (`pm-bench fetch [--pin]`); per-dataset hash pins pending the one-time TOS-gated downloads -- 5 tasks with fixed scoring scripts (next-event ✅, remaining-time ✅; - outcome, conformance, bottleneck pending) +- 5 tasks with fixed scoring scripts (next-event ✅, remaining-time ✅, + outcome ✅; conformance, bottleneck pending) - `gnn` runs end-to-end as the reference baseline (Markov reference ✅; `gnn` integration pending the first pinned dataset) - End-to-end loop runs on `synthetic-toy` ✅ — split → prefixes → diff --git a/README.md b/README.md index f8a356e..432dfe9 100644 --- a/README.md +++ b/README.md @@ -221,8 +221,10 @@ honesty. The point of the benchmark is to make the comparison real. `$PM_BENCH_CACHE` resolution); per-dataset hash-pinning PRs pending the one-time TOS-gated downloads from 4TU and Mendeley. - [x] v0.2 — splits + targets for next-event ✅ and remaining-time ✅ -- [🟡] v0.3 — scoring scripts for all 5 tasks. next-event ✅ and - remaining-time ✅; outcome / conformance / bottleneck remain. +- [🟡] v0.3 — scoring scripts for all 5 tasks. next-event ✅, + remaining-time ✅, outcome ✅ (AUC scoring + prior baseline + + pipeline; leaderboard entry waits on a dataset whose test split + has both classes); conformance / bottleneck remain. - [🟡] v0.4 — leaderboard CI + landing page. Standings format, reference Markov entry, `pm-bench leaderboard [--all] --verify`, and the dedicated `leaderboard.yml` GitHub workflow shipped; diff --git a/STATUS.md b/STATUS.md index 49cfa20..7051d50 100644 --- a/STATUS.md +++ b/STATUS.md @@ -60,6 +60,22 @@ pm-bench fetch bpi2020 --pin ## Recently shipped +- **Outcome task (binary AUC)** (`outcome-task` branch). + - `score_outcome` — pure-CPython rank-sum AUC, with average-rank + tie-breaking; degenerate single-class case returns 0.5 by + convention rather than NaN. + - `pm_bench/baselines/prior_outcome.py` — last-activity-conditioned + positive rate (with global-rate fallback for unseen activities). + The dumbest baseline that uses *any* prefix signal. + - CLI: `--task outcome`, `--baseline prior`, end-to-end through + `prefixes / predict / score`. + - Per-dataset outcome rule registered for synthetic-toy + (`is_positive_outcome`: case ends with `delivery_confirmed`). + - **No leaderboard entry yet** — synthetic-toy with seed=42 happens + to have zero positives in the test partition, so AUC degenerates. + The pipeline runs end-to-end and the test asserts it; a real + leaderboard entry waits on a pinned BPI dataset. + - 8 new tests; 73 total, ruff clean. - **Remaining-time task** (`remaining-time` branch). - `score_remaining_time` (MAE in days), prefixes/predictions formats parallel to next-event so models share a loader. diff --git a/pm_bench/__init__.py b/pm_bench/__init__.py index 348e7b8..3ad5ea2 100644 --- a/pm_bench/__init__.py +++ b/pm_bench/__init__.py @@ -5,20 +5,26 @@ from pm_bench.predictions import Prediction, read_predictions_csv, write_predictions_csv from pm_bench.prefixes import ( + OutcomeTarget, Prefix, TimeTarget, + extract_outcome_targets, extract_prefixes, extract_remaining_time_targets, + read_outcome_targets_csv, read_prefixes_csv, read_time_targets_csv, + write_outcome_targets_csv, write_prefixes_csv, write_time_targets_csv, ) from pm_bench.registry import Dataset, get_dataset, load_registry from pm_bench.score import ( NextEventScore, + OutcomeScore, RemainingTimeScore, score_next_event, + score_outcome, score_remaining_time, ) from pm_bench.split import Event, Split, case_chrono_split @@ -27,21 +33,27 @@ "Dataset", "Event", "NextEventScore", + "OutcomeScore", + "OutcomeTarget", "Prediction", "Prefix", "RemainingTimeScore", "Split", "TimeTarget", "case_chrono_split", + "extract_outcome_targets", "extract_prefixes", "extract_remaining_time_targets", "get_dataset", "load_registry", + "read_outcome_targets_csv", "read_predictions_csv", "read_prefixes_csv", "read_time_targets_csv", "score_next_event", + "score_outcome", "score_remaining_time", + "write_outcome_targets_csv", "write_predictions_csv", "write_prefixes_csv", "write_time_targets_csv", diff --git a/pm_bench/_synth.py b/pm_bench/_synth.py index 47f8d92..e2fad3a 100644 --- a/pm_bench/_synth.py +++ b/pm_bench/_synth.py @@ -48,3 +48,13 @@ def synthetic_log(n_cases: int = 50, seed: int = 42) -> Iterator[Event]: for activity in path: yield (str(case_id), activity, t) t += dt.timedelta(hours=rng.randint(1, 48)) + + +def is_positive_outcome(activities: list[str]) -> bool: + """Synthetic-toy outcome rule: case ends with `delivery_confirmed`. + + This corresponds to the happy path (PATHS[4]) — a fully delivered + order. Cancelled, refunded, and shipped-but-unconfirmed cases are + all negative. + """ + return bool(activities) and activities[-1] == "delivery_confirmed" diff --git a/pm_bench/baselines/__init__.py b/pm_bench/baselines/__init__.py index 5913fdf..0ef5b65 100644 --- a/pm_bench/baselines/__init__.py +++ b/pm_bench/baselines/__init__.py @@ -16,14 +16,28 @@ read_time_predictions_csv, write_time_predictions_csv, ) +from pm_bench.baselines.prior_outcome import ( + OutcomePrediction, + PriorOutcomeBaseline, + fit_prior_outcome, + predict_prior_outcome, + read_outcome_predictions_csv, + write_outcome_predictions_csv, +) __all__ = [ "MarkovBaseline", "MeanTimeBaseline", + "OutcomePrediction", + "PriorOutcomeBaseline", "TimePrediction", "fit_mean_time", + "fit_prior_outcome", "predict_markov", "predict_mean_time", + "predict_prior_outcome", + "read_outcome_predictions_csv", "read_time_predictions_csv", + "write_outcome_predictions_csv", "write_time_predictions_csv", ] diff --git a/pm_bench/baselines/prior_outcome.py b/pm_bench/baselines/prior_outcome.py new file mode 100644 index 0000000..44e80c3 --- /dev/null +++ b/pm_bench/baselines/prior_outcome.py @@ -0,0 +1,132 @@ +"""Last-activity-conditioned reference baseline for outcome prediction. + +For every (last_activity_in_prefix → case_outcome) pair observed on the +training cases, store the empirical positive rate. At test time, look +up the prefix's last activity and return its rate. This is the dumbest +baseline that uses *any* prefix information — a model that ties this +isn't conditioning on the trace at all. + +Falls back to the global positive rate when a prefix ends in an +activity unseen during training. +""" +from __future__ import annotations + +from collections import defaultdict +from collections.abc import Callable, Iterable +from dataclasses import dataclass + +from pm_bench.prefixes import OutcomeTarget +from pm_bench.split import Activity, CaseId, Event + + +@dataclass(frozen=True) +class PriorOutcomeBaseline: + """Last-activity → positive rate, plus a global fallback.""" + + by_last: dict[Activity, float] + global_rate: float + + +@dataclass(frozen=True) +class OutcomePrediction: + case_id: CaseId + prefix_idx: int + score: float + + +def fit_prior_outcome( + events: Iterable[Event], + train_case_ids: Iterable[CaseId], + is_positive: Callable[[list[Activity]], bool], +) -> PriorOutcomeBaseline: + """Aggregate per-last-activity outcome rates over training prefixes.""" + keep = set(train_case_ids) + by_case: dict[CaseId, list[tuple[Activity, object]]] = {} + for case_id, activity, ts in events: + if case_id not in keep: + continue + by_case.setdefault(case_id, []).append((activity, ts)) + + counts: dict[Activity, list[int]] = defaultdict(lambda: [0, 0]) # [pos, total] + pos_cases = 0 + total_cases = 0 + for rows in by_case.values(): + rows.sort(key=lambda r: r[1]) + activities = [a for a, _ in rows] + if len(activities) < 2: + continue + total_cases += 1 + outcome = 1 if is_positive(activities) else 0 + if outcome: + pos_cases += 1 + for k in range(1, len(activities)): + last = activities[k - 1] + counts[last][1] += 1 + if outcome: + counts[last][0] += 1 + + by_last = { + last: (pos / total) if total else 0.0 + for last, (pos, total) in counts.items() + } + global_rate = (pos_cases / total_cases) if total_cases else 0.0 + return PriorOutcomeBaseline(by_last=by_last, global_rate=global_rate) + + +def predict_prior_outcome( + model: PriorOutcomeBaseline, + targets: Iterable[OutcomeTarget], + events_by_case: dict[CaseId, list[Activity]] | None = None, +) -> list[OutcomePrediction]: + """Score each target by its prefix's last-activity training rate. + + `events_by_case` maps every case_id we'll be asked about to its + full ordered activity list (test cases included). Looking up the + prefix's last activity needs the full sequence; the targets file + by itself only carries `(case_id, prefix_idx)`. + """ + out: list[OutcomePrediction] = [] + for t in targets: + score = model.global_rate + if events_by_case is not None: + seq = events_by_case.get(t.case_id) + if seq is not None and t.prefix_idx <= len(seq): + last = seq[t.prefix_idx - 1] + score = model.by_last.get(last, model.global_rate) + out.append(OutcomePrediction(case_id=t.case_id, prefix_idx=t.prefix_idx, score=score)) + return out + + +def write_outcome_predictions_csv( + predictions: Iterable[OutcomePrediction], + path: str, +) -> int: + """Write outcome predictions to CSV. Returns row count.""" + import csv + + n = 0 + with open(path, "w", newline="") as f: + w = csv.writer(f) + w.writerow(["case_id", "prefix_idx", "score"]) + for p in predictions: + w.writerow([p.case_id, p.prefix_idx, repr(p.score)]) + n += 1 + return n + + +def read_outcome_predictions_csv(path: str) -> list[OutcomePrediction]: + """Read an outcome predictions CSV.""" + import csv + + out: list[OutcomePrediction] = [] + with open(path, newline="") as f: + r = csv.DictReader(f) + for row in r: + out.append( + OutcomePrediction( + case_id=row["case_id"], + prefix_idx=int(row["prefix_idx"]), + score=float(row["score"]), + ) + ) + return out diff --git a/pm_bench/cli.py b/pm_bench/cli.py index f8e7d33..563b8b1 100644 --- a/pm_bench/cli.py +++ b/pm_bench/cli.py @@ -14,6 +14,12 @@ read_time_predictions_csv, write_time_predictions_csv, ) +from pm_bench.baselines.prior_outcome import ( + fit_prior_outcome, + predict_prior_outcome, + read_outcome_predictions_csv, + write_outcome_predictions_csv, +) from pm_bench.fetch import ( FetchError, ManualFetchRequired, @@ -23,15 +29,18 @@ from pm_bench.leaderboard import load_board, standings, verify from pm_bench.predictions import read_predictions_csv, write_predictions_csv from pm_bench.prefixes import ( + extract_outcome_targets, extract_prefixes, extract_remaining_time_targets, + read_outcome_targets_csv, read_prefixes_csv, read_time_targets_csv, + write_outcome_targets_csv, write_prefixes_csv, write_time_targets_csv, ) from pm_bench.registry import get_dataset, load_registry -from pm_bench.score import score_next_event, score_remaining_time +from pm_bench.score import score_next_event, score_outcome, score_remaining_time from pm_bench.split import case_chrono_split @@ -50,6 +59,16 @@ def _load_events(name: str) -> list: return list(_synth.synthetic_log()) +def _outcome_rule(name: str): + """Return the per-dataset positive-outcome predicate.""" + if name == "synthetic-toy": + return _synth.is_positive_outcome + raise click.UsageError( + f"outcome rule for {name!r} not yet defined; pin a dataset hash and " + "register its outcome rule" + ) + + @click.group() @click.version_option() def main() -> None: @@ -203,7 +222,7 @@ def split(name: str, task: str) -> None: ) @click.option( "--task", - type=click.Choice(["next-event", "remaining-time"]), + type=click.Choice(["next-event", "remaining-time", "outcome"]), default="next-event", show_default=True, ) @@ -220,10 +239,15 @@ def prefixes(name: str, split_path: str, out_path: str, partition: str, task: st case_ids = split_data[partition] if task == "next-event": n = write_prefixes_csv(extract_prefixes(events, case_ids), out_path) - else: + elif task == "remaining-time": n = write_time_targets_csv( extract_remaining_time_targets(events, case_ids), out_path ) + else: + rule = _outcome_rule(name) + n = write_outcome_targets_csv( + extract_outcome_targets(events, case_ids, rule), out_path + ) click.echo(f"wrote {n} prefixes to {out_path} (task={task} partition={partition})") @@ -250,14 +274,14 @@ def prefixes(name: str, split_path: str, out_path: str, partition: str, task: st ) @click.option( "--baseline", - type=click.Choice(["markov", "mean"]), + type=click.Choice(["markov", "mean", "prior"]), default="markov", show_default=True, - help="markov → next-event; mean → remaining-time.", + help="markov → next-event; mean → remaining-time; prior → outcome.", ) @click.option( "--task", - type=click.Choice(["next-event", "remaining-time"]), + type=click.Choice(["next-event", "remaining-time", "outcome"]), default="next-event", show_default=True, ) @@ -281,13 +305,27 @@ def predict( targets = read_prefixes_csv(prefixes_path) preds = predict_markov(model, targets) n = write_predictions_csv(preds, out_path) - else: + elif task == "remaining-time": if baseline != "mean": raise click.UsageError(f"baseline {baseline!r} doesn't apply to remaining-time") time_model = fit_mean_time(events, split_data["train"]) time_targets = read_time_targets_csv(prefixes_path) time_preds = predict_mean_time(time_model, time_targets) n = write_time_predictions_csv(time_preds, out_path) + else: + # outcome + if baseline != "prior": + raise click.UsageError(f"baseline {baseline!r} doesn't apply to outcome") + rule = _outcome_rule(name) + outcome_model = fit_prior_outcome(events, split_data["train"], rule) + outcome_targets = read_outcome_targets_csv(prefixes_path) + # Build full activity sequences keyed by case_id so the baseline + # can read off each prefix's last activity. + seq_by_case: dict[str, list[str]] = {} + for cid, act, _ts in sorted(events, key=lambda e: e[2]): + seq_by_case.setdefault(cid, []).append(act) + outcome_preds = predict_prior_outcome(outcome_model, outcome_targets, seq_by_case) + n = write_outcome_predictions_csv(outcome_preds, out_path) click.echo(f"wrote {n} predictions to {out_path} (task={task} baseline={baseline})") @@ -302,7 +340,7 @@ def predict( ) @click.option( "--task", - type=click.Choice(["next-event", "remaining-time"]), + type=click.Choice(["next-event", "remaining-time", "outcome"]), default="next-event", show_default=True, ) @@ -335,14 +373,40 @@ def score(predictions_path: str, prefixes_path: str, task: str) -> None: ) return - # remaining-time - truth_time = read_time_targets_csv(prefixes_path) - pred_time = read_time_predictions_csv(predictions_path) - pred_t_lookup = {(p.case_id, p.prefix_idx): p.predicted_days for p in pred_time} + if task == "remaining-time": + truth_time = read_time_targets_csv(prefixes_path) + pred_time = read_time_predictions_csv(predictions_path) + pred_t_lookup = {(p.case_id, p.prefix_idx): p.predicted_days for p in pred_time} + missing = [ + (t.case_id, t.prefix_idx) + for t in truth_time + if (t.case_id, t.prefix_idx) not in pred_t_lookup + ] + if missing: + click.echo( + f"predictions is missing {len(missing)} target(s); first: {missing[0]}", + err=True, + ) + sys.exit(2) + preds_floats = [pred_t_lookup[(t.case_id, t.prefix_idx)] for t in truth_time] + truth_floats = [t.remaining_days for t in truth_time] + rt = score_remaining_time(preds_floats, truth_floats) + click.echo( + json.dumps( + {"task": task, "mae_days": rt.mae_days, "n": rt.n}, + indent=2, + ), + ) + return + + # outcome + truth_o = read_outcome_targets_csv(prefixes_path) + pred_o = read_outcome_predictions_csv(predictions_path) + pred_o_lookup = {(p.case_id, p.prefix_idx): p.score for p in pred_o} missing = [ (t.case_id, t.prefix_idx) - for t in truth_time - if (t.case_id, t.prefix_idx) not in pred_t_lookup + for t in truth_o + if (t.case_id, t.prefix_idx) not in pred_o_lookup ] if missing: click.echo( @@ -350,12 +414,12 @@ def score(predictions_path: str, prefixes_path: str, task: str) -> None: err=True, ) sys.exit(2) - preds_floats = [pred_t_lookup[(t.case_id, t.prefix_idx)] for t in truth_time] - truth_floats = [t.remaining_days for t in truth_time] - rt = score_remaining_time(preds_floats, truth_floats) + preds_o = [pred_o_lookup[(t.case_id, t.prefix_idx)] for t in truth_o] + truth_o_int = [t.outcome for t in truth_o] + os_ = score_outcome(preds_o, truth_o_int) click.echo( json.dumps( - {"task": task, "mae_days": rt.mae_days, "n": rt.n}, + {"task": task, "auc": os_.auc, "n": os_.n, "n_pos": os_.n_pos}, indent=2, ), ) diff --git a/pm_bench/prefixes.py b/pm_bench/prefixes.py index 3fb3b4c..408971f 100644 --- a/pm_bench/prefixes.py +++ b/pm_bench/prefixes.py @@ -19,12 +19,18 @@ case_id,prefix_idx,remaining_days -— shape parallels the next-event format so models that handle both -tasks share most of the loader. +For outcome, the truth is a binary integer (the case's final 0/1 +outcome, repeated for every prefix of that case so predictions can +condition on prefix length): + + case_id,prefix_idx,outcome + +— all three formats share `case_id, prefix_idx` so models that handle +multiple tasks share most of the loader. """ from __future__ import annotations -from collections.abc import Iterable, Iterator +from collections.abc import Callable, Iterable, Iterator from dataclasses import dataclass from datetime import datetime @@ -56,6 +62,21 @@ class TimeTarget: remaining_days: float +@dataclass(frozen=True) +class OutcomeTarget: + """Binary outcome prediction target. + + `outcome` is the case's final 0/1 outcome — defined per-dataset + (synthetic-toy: 1 iff the case ends with `delivery_confirmed`). + The same value is repeated across every prefix of a case so a model + can score how its prediction sharpens as the prefix grows. + """ + + case_id: CaseId + prefix_idx: int + outcome: int + + def extract_prefixes( events: Iterable[Event], case_ids: Iterable[CaseId], @@ -157,6 +178,69 @@ def extract_remaining_time_targets( ) +def extract_outcome_targets( + events: Iterable[Event], + case_ids: Iterable[CaseId], + is_positive: Callable[[list[Activity]], bool], +) -> Iterator[OutcomeTarget]: + """Yield outcome targets for the given case ids. + + `is_positive` is the per-dataset rule that decides a case's outcome + from its full activity sequence (in chronological order). Targets + are emitted at every prefix length 1..L-1, all carrying the same + case-level outcome — so a model predicts the same target with + progressively more context. + """ + keep = set(case_ids) + by_case: dict[CaseId, list[tuple[Activity, datetime]]] = {} + for case_id, activity, ts in events: + if case_id not in keep: + continue + by_case.setdefault(case_id, []).append((activity, ts)) + + for case_id in keep: + rows = by_case.get(case_id) + if not rows or len(rows) < 2: + continue + rows.sort(key=lambda r: r[1]) + activities = [a for a, _ in rows] + outcome = 1 if is_positive(activities) else 0 + for k in range(1, len(activities)): + yield OutcomeTarget(case_id=case_id, prefix_idx=k, outcome=outcome) + + +def write_outcome_targets_csv(targets: Iterable[OutcomeTarget], path: str) -> int: + """Write outcome targets to a CSV file. Returns the number of rows.""" + import csv + + n = 0 + with open(path, "w", newline="") as f: + w = csv.writer(f) + w.writerow(["case_id", "prefix_idx", "outcome"]) + for t in targets: + w.writerow([t.case_id, t.prefix_idx, t.outcome]) + n += 1 + return n + + +def read_outcome_targets_csv(path: str) -> list[OutcomeTarget]: + """Read an outcome-targets CSV emitted by `write_outcome_targets_csv`.""" + import csv + + out: list[OutcomeTarget] = [] + with open(path, newline="") as f: + r = csv.DictReader(f) + for row in r: + out.append( + OutcomeTarget( + case_id=row["case_id"], + prefix_idx=int(row["prefix_idx"]), + outcome=int(row["outcome"]), + ) + ) + return out + + def write_time_targets_csv(targets: Iterable[TimeTarget], path: str) -> int: """Write remaining-time targets to a CSV file. Returns the number of rows.""" import csv diff --git a/pm_bench/score.py b/pm_bench/score.py index 6b794bb..fdd2ac0 100644 --- a/pm_bench/score.py +++ b/pm_bench/score.py @@ -1,7 +1,7 @@ """Scoring scripts for pm-bench tasks. -v0: next-event prediction (top-1 / top-3 accuracy) and remaining-time -prediction (MAE in days). +v0: next-event prediction (top-1 / top-3 accuracy), remaining-time +prediction (MAE in days), and outcome prediction (AUC). """ from __future__ import annotations @@ -51,6 +51,66 @@ def score_next_event( return NextEventScore(top1=top1 / n, top3=top3 / n, n=n) +@dataclass(frozen=True) +class OutcomeScore: + """AUC for binary outcome prediction. + + `auc` is the area under the ROC curve, computed via the rank-sum + identity (no scipy / sklearn needed). Ties in the score get the + average rank — so a model that predicts the same probability for + every case scores 0.5 by construction. + """ + + auc: float + n: int + n_pos: int + + +def score_outcome( + predictions: Sequence[float], + truth: Sequence[int], +) -> OutcomeScore: + """ROC AUC for binary outcome prediction. + + `predictions[i]` is the model's predicted P(outcome=1) for the + i-th held-out target; `truth[i]` is the actual outcome (0 or 1). + + Implementation: sort by score ascending, assign average ranks for + ties, then AUC = (sum_of_ranks_of_positives - n_pos*(n_pos+1)/2) / + (n_pos * n_neg). Edge cases: if either class is missing, AUC is + undefined; we return 0.5 in that degenerate case so the + leaderboard doesn't NaN out a row. + """ + if len(predictions) != len(truth): + raise ValueError("predictions and truth must be the same length") + if len(predictions) == 0: + raise ValueError("nothing to score") + + n = len(predictions) + n_pos = sum(1 for t in truth if t == 1) + n_neg = n - n_pos + + if n_pos == 0 or n_neg == 0: + return OutcomeScore(auc=0.5, n=n, n_pos=n_pos) + + # Indices sorted by predicted score ascending; ties get average ranks. + order = sorted(range(n), key=lambda i: predictions[i]) + ranks = [0.0] * n + i = 0 + while i < n: + j = i + while j + 1 < n and predictions[order[j + 1]] == predictions[order[i]]: + j += 1 + avg_rank = (i + j) / 2 + 1 # 1-based ranks + for k in range(i, j + 1): + ranks[order[k]] = avg_rank + i = j + 1 + + sum_pos_ranks = sum(ranks[i] for i in range(n) if truth[i] == 1) + auc = (sum_pos_ranks - n_pos * (n_pos + 1) / 2) / (n_pos * n_neg) + return OutcomeScore(auc=auc, n=n, n_pos=n_pos) + + def score_remaining_time( predictions: Sequence[float], truth: Sequence[float], diff --git a/tests/test_outcome.py b/tests/test_outcome.py new file mode 100644 index 0000000..e6e727e --- /dev/null +++ b/tests/test_outcome.py @@ -0,0 +1,160 @@ +"""End-to-end + targeted tests for the outcome task. + +Synthetic-toy with seed=42 doesn't put any `delivery_confirmed` cases +in the test split (path-4 is 10% of cases and the chronological tail +happens to have none), so we test the outcome machinery on a hand-built +event set with controlled class balance instead. The CLI smoke test +runs against synthetic-toy and asserts the pipeline executes cleanly, +even though the AUC degenerates to 0.5 (n_pos=0 in test). +""" +from __future__ import annotations + +import datetime as dt +import json + +from click.testing import CliRunner + +from pm_bench import ( + OutcomeTarget, + extract_outcome_targets, + read_outcome_targets_csv, + score_outcome, + write_outcome_targets_csv, +) +from pm_bench.baselines.prior_outcome import ( + OutcomePrediction, + fit_prior_outcome, + predict_prior_outcome, + read_outcome_predictions_csv, + write_outcome_predictions_csv, +) +from pm_bench.cli import main + + +def _events_with_outcomes() -> list[tuple[str, str, dt.datetime]]: + """4 cases, 2 positive (end with `pay`), 2 negative (end with `cancel`).""" + base = dt.datetime(2024, 1, 1) + out: list[tuple[str, str, dt.datetime]] = [] + for cid, last in [("c1", "pay"), ("c2", "pay"), ("c3", "cancel"), ("c4", "cancel")]: + out.append((cid, "start", base)) + out.append((cid, "review", base + dt.timedelta(hours=1))) + out.append((cid, last, base + dt.timedelta(hours=2))) + return out + + +def _is_pay(activities: list[str]) -> bool: + return bool(activities) and activities[-1] == "pay" + + +def test_extract_outcome_targets_repeats_outcome_per_prefix() -> None: + out = list(extract_outcome_targets(_events_with_outcomes(), ["c1", "c3"], _is_pay)) + # 2 cases × (3-1) prefixes = 4 targets + assert len(out) == 4 + assert {t.outcome for t in out if t.case_id == "c1"} == {1} + assert {t.outcome for t in out if t.case_id == "c3"} == {0} + + +def test_round_trip_csv(tmp_path) -> None: + targets = [ + OutcomeTarget(case_id="c1", prefix_idx=1, outcome=1), + OutcomeTarget(case_id="c1", prefix_idx=2, outcome=1), + ] + p = tmp_path / "o.csv" + n = write_outcome_targets_csv(targets, str(p)) + assert n == 2 + back = read_outcome_targets_csv(str(p)) + assert back == targets + + +def test_prior_baseline_uses_train_only() -> None: + model = fit_prior_outcome(_events_with_outcomes(), ["c1", "c2", "c3", "c4"], _is_pay) + # 2 of 4 cases are positive → global rate 0.5 + assert abs(model.global_rate - 0.5) < 1e-9 + + +def test_prior_baseline_per_last_activity() -> None: + """Prefixes ending in 'pay' or 'cancel' get the appropriate rate.""" + model = fit_prior_outcome(_events_with_outcomes(), ["c1", "c2", "c3", "c4"], _is_pay) + # All training prefixes ending in 'review' come from cases that go on + # to either 'pay' or 'cancel' — half of each, so rate = 0.5. + assert abs(model.by_last["review"] - 0.5) < 1e-9 + # Prefix ending in 'start' has the same logic: every case starts. + assert abs(model.by_last["start"] - 0.5) < 1e-9 + + +def test_predict_prior_with_seq_lookup() -> None: + events = _events_with_outcomes() + model = fit_prior_outcome(events, ["c1", "c2", "c3", "c4"], _is_pay) + targets = [OutcomeTarget(case_id="c1", prefix_idx=2, outcome=1)] + seq_by_case = {"c1": ["start", "review", "pay"]} + preds = predict_prior_outcome(model, targets, seq_by_case) + # Last activity in prefix at idx 2 is 'review' → rate 0.5 + assert abs(preds[0].score - 0.5) < 1e-9 + + +def test_predictions_csv_round_trip(tmp_path) -> None: + preds = [ + OutcomePrediction(case_id="c1", prefix_idx=1, score=0.8), + OutcomePrediction(case_id="c2", prefix_idx=1, score=0.2), + ] + p = tmp_path / "p.csv" + write_outcome_predictions_csv(preds, str(p)) + back = read_outcome_predictions_csv(str(p)) + assert back == preds + + +def test_score_outcome_round_trip_via_writer(tmp_path) -> None: + targets = [ + OutcomeTarget(case_id="c1", prefix_idx=1, outcome=1), + OutcomeTarget(case_id="c2", prefix_idx=1, outcome=0), + ] + preds = [ + OutcomePrediction(case_id="c1", prefix_idx=1, score=0.9), + OutcomePrediction(case_id="c2", prefix_idx=1, score=0.1), + ] + s = score_outcome( + [p.score for p in preds], + [t.outcome for t in targets], + ) + assert s.auc == 1.0 + + +def test_full_outcome_pipeline_on_synthetic_toy(tmp_path) -> None: + """Pipeline runs cleanly end-to-end. AUC degenerates because seed=42's + test partition has no positives, but the contract still holds.""" + runner = CliRunner() + split_path = tmp_path / "split.json" + prefixes_path = tmp_path / "prefixes.csv" + preds_path = tmp_path / "predictions.csv" + + r = runner.invoke(main, ["split", "synthetic-toy"]) + assert r.exit_code == 0 + split_path.write_text(r.output) + + r = runner.invoke( + main, + ["prefixes", "synthetic-toy", "--split", str(split_path), + "--out", str(prefixes_path), "--task", "outcome"], + ) + assert r.exit_code == 0, r.output + + r = runner.invoke( + main, + ["predict", "synthetic-toy", "--split", str(split_path), + "--prefixes", str(prefixes_path), "--out", str(preds_path), + "--baseline", "prior", "--task", "outcome"], + ) + assert r.exit_code == 0, r.output + + r = runner.invoke( + main, + ["score", str(preds_path), "--prefixes", str(prefixes_path), + "--task", "outcome"], + ) + assert r.exit_code == 0, r.output + result = json.loads(r.output) + assert result["task"] == "outcome" + assert result["n"] > 0 + # synthetic-toy with seed=42 happens to have n_pos=0 in test + # → degenerate AUC = 0.5 by convention. The pipeline still runs. + assert 0.0 <= result["auc"] <= 1.0 diff --git a/tests/test_score.py b/tests/test_score.py index 9af5f34..5db51fa 100644 --- a/tests/test_score.py +++ b/tests/test_score.py @@ -1,6 +1,6 @@ import pytest -from pm_bench import score_next_event, score_remaining_time +from pm_bench import score_next_event, score_outcome, score_remaining_time def test_top1_perfect() -> None: @@ -55,3 +55,42 @@ def test_remaining_time_lengths_must_match() -> None: def test_remaining_time_empty_raises() -> None: with pytest.raises(ValueError): score_remaining_time([], []) + + +def test_outcome_perfect_separation_is_one() -> None: + s = score_outcome([0.1, 0.2, 0.9, 0.8], [0, 0, 1, 1]) + assert s.auc == 1.0 + assert s.n == 4 + assert s.n_pos == 2 + + +def test_outcome_perfect_inversion_is_zero() -> None: + s = score_outcome([0.9, 0.8, 0.1, 0.2], [0, 0, 1, 1]) + assert s.auc == 0.0 + + +def test_outcome_constant_predictions_is_half() -> None: + """All-equal scores → tied ranks → AUC = 0.5.""" + s = score_outcome([0.5, 0.5, 0.5, 0.5], [0, 0, 1, 1]) + assert s.auc == 0.5 + + +def test_outcome_single_class_returns_half() -> None: + s = score_outcome([0.1, 0.9], [1, 1]) + assert s.auc == 0.5 + + +def test_outcome_lengths_must_match() -> None: + with pytest.raises(ValueError): + score_outcome([0.1], [0, 1]) + + +def test_outcome_known_value() -> None: + """Hand-checked: 3 pos, 3 neg, one swap → AUC = 8/9.""" + # ranks ascending: [neg=0.1→1, neg=0.2→2, pos=0.3→3, neg=0.4→4, pos=0.5→5, pos=0.6→6] + # sum_pos_ranks = 3+5+6 = 14; n_pos*(n_pos+1)/2 = 6; n_pos*n_neg = 9 + # auc = (14 - 6) / 9 = 8/9 + preds = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6] + truth = [0, 0, 1, 0, 1, 1] + s = score_outcome(preds, truth) + assert abs(s.auc - 8 / 9) < 1e-9