From c1610ab89006d923fcd3309d722f867c0519079f Mon Sep 17 00:00:00 2001 From: Denny Schaedig Date: Fri, 15 May 2026 16:26:25 -0600 Subject: [PATCH 1/2] =?UTF-8?q?feat(0.9.1):=20Stages=200b=20+=200c=20?= =?UTF-8?q?=E2=80=94=20action=20JSONL=20telemetry=20+=20recommend=5Faction?= =?UTF-8?q?=20emission?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Telemetry instrumentation prereqs from release_0_9_1.md (lifted verbatim from bio_emergent_persona_foundations.md Stage 0). Ships the measurement infrastructure Roy-3 needs to disambiguate whether Wire-A's annotation actually reached the LLM proposer's decision pathway — without this, Roy-3 reads tool counts at the action layer but can't trace WHY each call was made. Stage 0b — Action JSONL telemetry: - ActionRecord (CC3-additive on the frozen dataclass): appends agent_id, session_id, entity_class optional fields with None defaults. Existing third-party ActionSink consumers continue to work unchanged. - RequestContext binding at sim orchestrator entry: AUT thread binds in _aut_worker; main-thread orchestrator binds in the orchestrator-loop try/finally. Per-thread ContextVar scoping means each worker sees its own agent_id ("sim_aut" vs "sim_orchestrator") + the shared session_id from the entry-built timestamp. reset_context in finally guards against per-call leak. - InstrumentedExecutor.execute() reads utils/http.py::current_context() + derives entity_class via best-effort heuristic (params["entity_class"] → params["target"]/"entity"/"object" → tool-name verb-prefix-strip + role-suffix-strip). Verb-only tools (respond, examine) return None — Roy-3 normalization aggregates with None skipped from exposure counts. - save_action_log writes a _format_version header line at the head of actions.jsonl per CC1 contract, plus the three new telemetry fields per record. - RecordingSink._compress_oldest preserves telemetry through compression (tiny fields, full attribution survives audit trail). Stage 0c — recommend_action emission: - NAc.recommend_action emits exactly one sim_log("NAc_RECOMMEND", ...) event per call, including all three early-exit paths (no scores, sub-threshold, success). Per the plan: "the event MUST emit even when recommend_action returns None" — Roy-3 needs to distinguish "gate fired, consumer did nothing" from "consumer didn't run at all." - Fields: tick (int(time.time()) bucket), current_cluster_id, cluster_reward_bias_consulted (the value read from _cluster_reward_bias for the active cluster only — NOT the agent-wide Wire-A aggregate; mismatch between rendered Wire-A signal and consulted recommend_action signal is the H1 failure mode), best_tool, best_score, min_confidence, passed_gate. - _emit_recommend_action_event helper at top of nac.py is fail-soft — non-sim runtime calls (e.g., headless API, unit tests without sim logging enabled) don't crash on missing telemetry plumbing. Stage 0b/0c NAc snapshots interpretation: per-stage save_aut_state calls (already wired since PR #248) satisfy "session boundary" for Roy's multi-stage harness pattern. Each Roy stage produces its own session_id with its own aut_nac.json — reward_bias evolution is plottable across the priming-stage sequence. Intra-session checkpoints (within a single sim_id) are a follow-up if needed. Sim-action interface change: sim_action() grows entity_class + **kwargs keyword-only parameters. Existing 2/3-arg positional calls keep working; new callers can pass entity_class. Field omitted from JSONL when None (avoids null-noise in Roy-3 records). Test surface (27 tests, 8 layers): - Layer 1 (ActionRecord): back-compat shape + new fields populated. - Layer 2 (entity_class derivation): all 4 fallback paths, priority order, verb-only tools → None, non-dict params → None. - Layer 3 (InstrumentedExecutor): context-bound + context-unbound paths; record_block also carries telemetry. - Layer 4 (compression): tiny fields survive _compress_oldest. - Layer 5 (save_action_log): _format_version header, telemetry fields per record, header even with 0 records. - Layer 6 (sim_action): legacy call shape, entity_class threading, None-omission. - Layer 7 (sim_recommend_action): emission on all 3 paths + fail-soft when sim logging disabled. - Layer 8 (RequestContext binding): round-trip + reset-on-exception. All passing. ruff clean on touched files (2 pre-existing ruff errors in orchestrator.py are unrelated to this PR). Frozen contract impact: ActionRecord SHAPE-FROZEN at 1.0 (CC3) — optional fields appended at end with defaults are non-breaking; docstring updated to declare the new fields. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 4 + src/maxim/decisions/nac.py | 93 +++ src/maxim/simulation/instrumented_executor.py | 79 ++- src/maxim/simulation/orchestrator.py | 27 + src/maxim/simulation/report.py | 30 +- src/maxim/simulation/sim_logger.py | 29 +- src/maxim/simulation/sinks.py | 32 +- tests/unit/test_stage_0b_0c_telemetry.py | 536 ++++++++++++++++++ 8 files changed, 824 insertions(+), 6 deletions(-) create mode 100644 tests/unit/test_stage_0b_0c_telemetry.py diff --git a/CLAUDE.md b/CLAUDE.md index 1aec5b6f..5ae202b1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -398,6 +398,10 @@ MAXIM_NAC_MIN_CONFIDENCE=0.0 # Override propose_via_substrate's min_confiden # EC activation instrumentation (release_0_9_1.md Stage 0d, cross_modal_substrate_binding.md Stage 1) MAXIM_EC_TRACE_ACTIVATIONS=1 # Gate per-tick `sim_ec_activation` JSONL events from EntorhinalCortex.pattern_complete_or_separate. Fields: agent_id, tick (int second bucket), active_node_id, activation_strength, modality_tag (linguistic/drive/sensor), modality, is_new. Off by default — Roy-4 sets it in the runner environment for the cross-modal binding pre-implementation validation experiment (scripts/analyze_roy_4_coactivation.py is the post-hoc analyzer). Falsy values ("0", "false", "no", "off", empty) disable. The instrumentation fires even on cold-start when active_node_id is freshly allocated, so pattern-separation events are visible in the co-activation matrix. +# Action JSONL + recommend_action telemetry (release_0_9_1.md Stages 0b + 0c) — no env var; structural +# Stage 0b: actions.jsonl gains a header line with `_format_version: "1.0"` + per-record `agent_id` / `session_id` / `entity_class` fields populated from utils/http.py::current_context() (bound at the sim orchestrator entry on both AUT + orchestrator threads via set_context/reset_context). InstrumentedExecutor derives entity_class best-effort from tool params (params["entity_class"] → params["target"]/["entity"]/["object"] → tool-name heuristic stripping verb prefixes + role suffixes); None when not derivable. ActionRecord is shape-frozen at 1.0 (CC3) with three optional fields appended at the end — back-compat with existing ActionSink consumers. +# Stage 0c: NAc.recommend_action emits one `sim_log("NAc_RECOMMEND", ...)` event per call (including empty-scores and sub-threshold early-return paths — Roy-3 needs to distinguish "gate fired, consumer did nothing" from "consumer didn't run at all"). Fields: tick, current_cluster_id, cluster_reward_bias_consulted, best_tool, best_score, min_confidence, passed_gate. Routes through the standard sim_log JSONL writer + the MAXIM_LOG_FILE bridge. Fail-soft when sim_logger is unavailable — non-sim runtime calls don't crash on missing telemetry plumbing. + # Leader proxy admission control MAXIM_PROXY_MAX_CONCURRENT=4 # Max in-flight requests to upstream (0=unlimited) MAXIM_PROXY_RATE_LIMIT_RPM=0 # Per-peer requests/minute (0=unlimited) diff --git a/src/maxim/decisions/nac.py b/src/maxim/decisions/nac.py index b01d229e..31aef2ea 100644 --- a/src/maxim/decisions/nac.py +++ b/src/maxim/decisions/nac.py @@ -29,6 +29,53 @@ logger = logging.getLogger(__name__) +def _emit_recommend_action_event( + *, + agent_id: str, + current_cluster_id: str | None, + cluster_reward_bias_consulted: float | None, + best_tool: str | None, + best_score: float, + min_confidence: float, + passed_gate: bool, +) -> None: + """Emit a ``sim_recommend_action`` event for Stage 0c telemetry. + + Per release_0_9_1.md Stage 0c, every ``recommend_action`` call MUST + emit exactly one event — even the early-return paths (empty scores, + sub-threshold) — so Roy-3 measurement can distinguish "gate fired + but consumer did nothing" from "consumer ran and proposed nothing." + + The event lands on the ``sim_log("NAc_RECOMMEND", ...)`` channel, + which routes through the standard sim_log JSONL writer + the + MAXIM_LOG_FILE bridge. The emission is fail-soft: any import or + sim-log error returns silently so a NAc decision in a non-sim + process never crashes on missing telemetry plumbing. + """ + try: + from maxim.simulation.sim_logger import sim_log + + sim_log( + "NAc_RECOMMEND", + f"recommend_action: passed_gate={passed_gate}", + { + "tick": int(time.time()), + "current_cluster_id": current_cluster_id, + "cluster_reward_bias_consulted": cluster_reward_bias_consulted, + "best_tool": best_tool, + "best_score": round(best_score, 4), + "min_confidence": min_confidence, + "passed_gate": passed_gate, + }, + agent_id=agent_id, + ) + except Exception: + # sim_logger may not be available (non-sim runtime) or sim + # logging may not be active — Stage 0c is observability only, + # not load-bearing for correctness. Swallow silently. + pass + + @dataclass(frozen=True) class NACConfig: """Configuration for Nucleus Accumbens.""" @@ -1292,14 +1339,60 @@ def recommend_action( scores[tool_name] = score reasoning_parts[tool_name] = parts + # Stage 0c (release_0_9_1.md): emit `sim_recommend_action` for + # post-hoc Roy-3 measurement. Every recommend_action call emits + # exactly one event — even on the early-return paths (no scores, + # sub-threshold) — so Roy iterations can distinguish "gate fired + # but consumer didn't run" from "consumer ran and proposed + # nothing." Per the plan: "the event MUST emit even when + # recommend_action returns None." if not scores: + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=None, + best_tool=None, + best_score=0.0, + min_confidence=min_confidence, + passed_gate=False, + ) return None best_tool = max(scores, key=lambda t: (scores[t], t)) best_score = scores[best_tool] + + # Record the cluster_reward_bias consulted for the best tool — + # informative for Roy-3 because Wire-A renders aggregate biases + # across all clusters, but recommend_action only consults the + # active-cluster value. Mismatch between rendered Wire-A signal + # and consulted recommend_action signal is the failure mode the + # H1 sub-hypothesis branches (cross_modal_substrate_binding.md / + # jepa_cross_modal_alignment.md) eventually address. + consulted_bias: float | None = None + if current_cluster_id: + consulted_bias = self.cluster_reward_bias(agent_id, current_cluster_id, f"tool:{best_tool}") + if best_score < min_confidence: + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=consulted_bias, + best_tool=best_tool, + best_score=best_score, + min_confidence=min_confidence, + passed_gate=False, + ) return None + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=consulted_bias, + best_tool=best_tool, + best_score=best_score, + min_confidence=min_confidence, + passed_gate=True, + ) return { "tool_name": best_tool, "params": {}, diff --git a/src/maxim/simulation/instrumented_executor.py b/src/maxim/simulation/instrumented_executor.py index 65334a7d..aff1a0c0 100644 --- a/src/maxim/simulation/instrumented_executor.py +++ b/src/maxim/simulation/instrumented_executor.py @@ -3,6 +3,13 @@ Captures every tool execution (success, failure, and autonomy rejections) as ActionRecords in a RecordingSink. Transparently wraps an existing Executor without changing its interface. + +Stage 0b (release_0_9_1.md) telemetry: each record carries +``agent_id`` / ``session_id`` from the ``utils/http.py::current_context`` +ContextVar (bound at the sim orchestrator entry) and a best-effort +``entity_class`` derived from the action's params. The fields default +to ``None`` when context isn't bound (e.g., unit tests, headless API), +so the producer never raises. """ from __future__ import annotations @@ -12,6 +19,63 @@ from maxim.simulation.sinks import ActionRecord, ActionSink from maxim.tools.base import ToolOutput +from maxim.utils.http import current_context + + +def _derive_entity_class(tool_name: str, params: dict[str, Any]) -> str | None: + """Best-effort entity-class extraction for Stage 0b telemetry. + + Roy-3 analysis normalizes pain-aversion counts per entity_class — + knowing the agent encountered "food" 50 times but felt pain 3 times + matters very differently from encountering "food" 5 times and feeling + pain 3 times. The exact derivation is fuzzy because Maxim's tool + surface mixes verb-only tools (``respond``, ``examine``) with + entity-bound tools (``infant_humanoid_pick_up``, ``sense_food_source``). + + Heuristics (lowest-cost-to-highest): + 1. ``params["entity_class"]`` — explicit caller override (highest fidelity). + 2. ``params["target"]`` / ``params["entity"]`` / ``params["object"]`` — + the conventional param names entity-binding tools use. + 3. Tool-name prefix split — ``infant_humanoid_pick_up`` → ``infant_humanoid``; + ``sense_food_source`` → ``food`` (heuristic: skip leading verb token). + + Returns ``None`` when nothing in the action surface looks + entity-bound (e.g., ``respond``, ``examine``, sleep tools). The + field is best-effort metadata, never load-bearing for correctness; + Roy-3 analysis aggregates with ``None`` skipped from + exposure-count normalization. + """ + if not isinstance(params, dict): + return None + # 1. Explicit caller override. + explicit = params.get("entity_class") + if isinstance(explicit, str) and explicit: + return explicit + # 2. Conventional param names. + for key in ("target", "entity", "object"): + val = params.get(key) + if isinstance(val, str) and val: + return val + # 3. Tool-name heuristic. Strip leading verb tokens. + if "_" in tool_name: + parts = tool_name.split("_") + # Common verb prefixes that tools start with — these are NOT + # the entity class. + verb_prefixes = {"sense", "use", "do", "get", "set", "make", "go", "look", "examine"} + if parts and parts[0].lower() in verb_prefixes: + remainder = "_".join(parts[1:]) + if remainder: + # ``sense_food_source`` → ``food_source`` after strip; + # further trim trailing role tokens like ``_source`` / + # ``_target`` so the bucket reads as ``food``. + role_suffixes = {"source", "target", "object"} + tail_parts = remainder.split("_") + while tail_parts and tail_parts[-1].lower() in role_suffixes: + tail_parts.pop() + if tail_parts: + return "_".join(tail_parts) + return remainder + return None class InstrumentedExecutor: @@ -33,6 +97,16 @@ def __init__(self, executor: Any, sink: ActionSink) -> None: self._executor = executor self._sink = sink + def _telemetry_fields(self, tool_name: str, params: dict[str, Any]) -> dict[str, Any]: + """Pull Stage 0b telemetry (agent_id, session_id, entity_class) + off the bound RequestContext + tool action.""" + ctx = current_context() + return { + "agent_id": ctx.agent_id if ctx is not None else None, + "session_id": ctx.session_id if ctx is not None else None, + "entity_class": _derive_entity_class(tool_name, params), + } + def execute(self, action: dict[str, Any]) -> ToolOutput: """Execute a tool action and record the result.""" tool_name = action.get("tool_name", "unknown") @@ -54,6 +128,7 @@ def execute(self, action: dict[str, Any]) -> ToolOutput: result_error=result.error, blocked=is_blocked, block_reason=result.error if is_blocked else None, + **self._telemetry_fields(tool_name, params), ) ) @@ -61,13 +136,15 @@ def execute(self, action: dict[str, Any]) -> ToolOutput: def record_block(self, tool_name: str, reason: str, params: dict[str, Any] | None = None) -> None: """Record that an action was blocked (e.g., by FearAgent or autonomy).""" + params = params or {} self._sink.record( ActionRecord( timestamp=time.time(), tool_name=tool_name, - tool_args=params or {}, + tool_args=params, blocked=True, block_reason=reason, + **self._telemetry_fields(tool_name, params), ) ) diff --git a/src/maxim/simulation/orchestrator.py b/src/maxim/simulation/orchestrator.py index 91a4eff9..eea235e2 100644 --- a/src/maxim/simulation/orchestrator.py +++ b/src/maxim/simulation/orchestrator.py @@ -1494,6 +1494,18 @@ def _get_component_integrity(name: str) -> float: aut_error: list[Exception] = [] def _aut_worker() -> None: + # Stage 0b (release_0_9_1.md): bind RequestContext on the AUT + # thread so InstrumentedExecutor.execute(), recommend_action's + # sim_recommend_action emitter, and any other downstream code + # reading utils/http.py::current_context() see the right + # agent_id + session_id pair. ContextVars are per-thread; the + # main-thread binding doesn't reach here without copy_context. + # Bound BEFORE sim_agent_context so the typed RequestContext + # and the sim_logger contextvar agree on agent identity. + from maxim.utils.http import new_request_context, reset_context, set_context + + _aut_request_ctx = new_request_context(agent_id="sim_aut", session_id=session_id) + _aut_request_token = set_context(_aut_request_ctx) try: with sim_agent_context("sim_aut"): run_agentic_loop( @@ -1523,6 +1535,8 @@ def _aut_worker() -> None: except Exception as e: aut_error.append(e) logger.error("AUT loop failed: %s", e) + finally: + reset_context(_aut_request_token) aut_thread = threading.Thread(target=_aut_worker, name="sim.aut", daemon=True) aut_thread.start() @@ -2371,6 +2385,18 @@ def _orch_action_count() -> int: # so there's only one spinner managing the terminal line. bridge._spinner.start("Orchestrator planning first probe...") + # Stage 0b: bind RequestContext on the orchestrator thread so + # orch-side action records (rare — most actions land on the AUT + # sink, but orchestrator tools still execute) carry agent_id + + # session_id. Symmetric with the AUT thread binding above. Runs + # on the main thread; the reset in `finally` keeps the bind + # scoped to the sim run. + from maxim.utils.http import new_request_context as _new_ctx + from maxim.utils.http import reset_context as _reset_ctx + from maxim.utils.http import set_context as _set_ctx + + _orch_request_ctx = _new_ctx(agent_id="sim_orchestrator", session_id=session_id) + _orch_request_token = _set_ctx(_orch_request_ctx) try: with sim_agent_context("sim_orchestrator"): run_agentic_loop( @@ -2398,6 +2424,7 @@ def _orch_action_count() -> int: orch_error.append(e) logger.error("Orchestrator loop failed: %s", e) finally: + _reset_ctx(_orch_request_token) # Always clean up, even on interrupt bridge._spinner.stop() diff --git a/src/maxim/simulation/report.py b/src/maxim/simulation/report.py index 0ebede77..ebdad595 100644 --- a/src/maxim/simulation/report.py +++ b/src/maxim/simulation/report.py @@ -351,13 +351,35 @@ def save_report(report: SimulationReport, base_dir: str | None = None) -> Path: def save_action_log(bridge: Any, base_dir: str, session_id: str) -> Path | None: - """Save all action records as JSONL for post-hoc analysis.""" + """Save all action records as JSONL for post-hoc analysis. + + The first line is a header record carrying ``_format_version`` per + CLAUDE.md CC1 (Stage 0b, release_0_9_1.md). Per-action records + follow, one per line. Each carries Stage 0b telemetry fields + (``agent_id``, ``session_id``, ``entity_class``) populated by + ``InstrumentedExecutor`` from the bound ``RequestContext`` — + ``None`` when the context was unbound at execution time (e.g., + pre-0b sims, headless API runs). + + Format-version evolution rule: appending optional fields to the + per-action record is back-compat (existing parsers ignore unknown + keys); removing or renaming fields requires a major bump. + """ session_dir = Path(base_dir) / session_id session_dir.mkdir(parents=True, exist_ok=True) log_path = session_dir / "actions.jsonl" try: with open(str(log_path), "w", encoding="utf-8") as f: + # Stage 0b: format-version header (one-line schema marker + # at the top of the JSONL — existing per-record parsers + # ignore unknown top-level keys, so this is back-compat). + header = { + "_format_version": "1.0", + "_record_kind": "header", + "session_id": session_id, + } + f.write(json.dumps(header) + "\n") for a in bridge.get_all_actions(): entry = { "timestamp": a.timestamp, @@ -368,6 +390,12 @@ def save_action_log(bridge: Any, base_dir: str, session_id: str) -> Path | None: "error": a.result_error, "blocked": a.blocked, "block_reason": a.block_reason, + # Stage 0b telemetry — None when RequestContext was unbound + # (pre-0b sims, headless API) or entity_class couldn't be + # derived from tool_args. + "agent_id": a.agent_id, + "session_id": a.session_id, + "entity_class": a.entity_class, } f.write(json.dumps(entry, default=str) + "\n") logger.info("Action log saved: %s (%d records)", log_path, len(bridge.get_all_actions())) diff --git a/src/maxim/simulation/sim_logger.py b/src/maxim/simulation/sim_logger.py index 580d5ed7..594301e0 100644 --- a/src/maxim/simulation/sim_logger.py +++ b/src/maxim/simulation/sim_logger.py @@ -934,12 +934,35 @@ def sim_fear(tool: str, allowed: bool, reason: str = "", *, agent_id: str | None sim_log("BLOCKED", f"🚫 BLOCKED: {tool} — {reason}", agent_id=agent_id) -def sim_action(tool: str, success: bool, summary: str = "", *, agent_id: str | None = None) -> None: - """Log a tool execution.""" +def sim_action( + tool: str, + success: bool, + summary: str = "", + *, + agent_id: str | None = None, + entity_class: str | None = None, + **kwargs: Any, +) -> None: + """Log a tool execution. + + Stage 0b (release_0_9_1.md): accepts ``entity_class`` for + exposure-count normalization in Roy-3 analysis. Falls into the + structured ``data`` dict alongside any other kwargs. None → field + omitted from the persisted record. The plain ``sim_action(tool, + success)`` call shape from earlier callers continues to work + unchanged (entity_class defaults to None and is dropped before + the dict is passed to sim_log). + """ icon = "⚔️" if success else "❌" status = "OK" if success else "FAIL" + data: dict[str, Any] = dict(kwargs) + if entity_class is not None: + data["entity_class"] = entity_class sim_log( - "MOTOR", f"{icon} [{status}] {tool}: {summary}" if summary else f"{icon} [{status}] {tool}", agent_id=agent_id + "MOTOR", + f"{icon} [{status}] {tool}: {summary}" if summary else f"{icon} [{status}] {tool}", + data if data else None, + agent_id=agent_id, ) diff --git a/src/maxim/simulation/sinks.py b/src/maxim/simulation/sinks.py index d9d000db..542b8297 100644 --- a/src/maxim/simulation/sinks.py +++ b/src/maxim/simulation/sinks.py @@ -38,7 +38,23 @@ @dataclass(frozen=True) class ActionRecord: - """Captured output action from the agent pipeline.""" + """Captured output action from the agent pipeline. + + SHAPE-FROZEN at 1.0 (CC3) — appending optional fields at the end + with sensible defaults is the only allowed evolution. Required + fields, type changes, and field reorderings are major-version + bumps. The fields are observability-focused; no isolation-hygiene + rule applies, so an ``extra`` escape hatch is unnecessary — + add purpose-specific fields here as new telemetry needs surface. + + Stage 0b additions (release_0_9_1.md): ``agent_id`` / + ``session_id`` thread through from the ``RequestContext`` ContextVar + bound at the sim orchestrator entry; ``entity_class`` carries the + target entity classification (food, weapon, body-part, etc.) for + pain-aversion exposure normalization in Roy-3 analysis. All three + are optional ``None`` defaults so existing test fixtures and + third-party ``ActionSink`` implementations keep working. + """ timestamp: float tool_name: str @@ -48,6 +64,13 @@ class ActionRecord: result_error: str | None = None blocked: bool = False block_reason: str | None = None + # Stage 0b (release_0_9_1.md): per-record agent + session attribution. + # Populated by InstrumentedExecutor from utils/http.py::current_context(). + agent_id: str | None = None + session_id: str | None = None + # Stage 0b: entity classification for exposure-count normalization. + # Best-effort — derived from tool_args target where present. + entity_class: str | None = None @runtime_checkable @@ -105,6 +128,13 @@ def _compress_oldest(self) -> None: result_error=rec.result_error[:100] if rec.result_error else None, blocked=rec.blocked, block_reason=rec.block_reason, + # Stage 0b telemetry fields are kept through compression — + # they're tiny and the whole point of 0b is post-hoc + # attribution analysis across the full action stream, + # which would break if compressed records dropped them. + agent_id=rec.agent_id, + session_id=rec.session_id, + entity_class=rec.entity_class, ) ) self._actions = compressed + self._actions[half:] diff --git a/tests/unit/test_stage_0b_0c_telemetry.py b/tests/unit/test_stage_0b_0c_telemetry.py new file mode 100644 index 00000000..88a7f3b2 --- /dev/null +++ b/tests/unit/test_stage_0b_0c_telemetry.py @@ -0,0 +1,536 @@ +"""Tests for Stages 0b + 0c of release_0_9_1.md (telemetry instrumentation). + +Stage 0b layers: +- ``ActionRecord`` gains optional ``agent_id`` / ``session_id`` / + ``entity_class`` fields (CC3-additive on the frozen dataclass). +- ``InstrumentedExecutor`` populates the fields from + ``utils/http.py::current_context()`` + tool params. +- ``RecordingSink._compress_oldest`` preserves the new fields. +- ``save_action_log`` writes the new fields + a ``_format_version`` + header at the JSONL head. +- ``sim_action`` accepts an ``entity_class`` kwarg routed through + ``sim_log``'s data dict. + +Stage 0c layer: +- ``NAc.recommend_action`` emits a ``sim_log("NAc_RECOMMEND", ...)`` + event on every call (no-scores, sub-threshold, success — three + early-exit paths), with the fields per the plan. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from maxim.decisions.nac import NAc, NACConfig +from maxim.simulation.instrumented_executor import InstrumentedExecutor, _derive_entity_class +from maxim.simulation.sinks import ActionRecord, RecordingSink +from maxim.tools.base import ToolOutput +from maxim.utils.http import new_request_context, reset_context, set_context + + +# ───────────────────────────────────────────────────────────────────── +# Layer 1: ActionRecord field additions (CC3-additive) +# ───────────────────────────────────────────────────────────────────── + + +class TestActionRecordFields: + """The new optional fields ship with ``None`` defaults so existing + callers and third-party ActionSink consumers keep working without + modification. The CC3 audit rule for shape-frozen dataclasses is: + optional fields appended at the end with sensible defaults are + non-breaking; renames or required-field additions are major bumps.""" + + def test_record_constructs_without_new_fields(self) -> None: + """Back-compat shape: pre-0b callers construct ActionRecord + with only the original 8 fields and the new 3 default to None.""" + rec = ActionRecord( + timestamp=1.0, + tool_name="respond", + ) + assert rec.agent_id is None + assert rec.session_id is None + assert rec.entity_class is None + + def test_record_carries_new_fields(self) -> None: + rec = ActionRecord( + timestamp=1.0, + tool_name="sense_food_source", + agent_id="sim_aut", + session_id="20260515_120000", + entity_class="food", + ) + assert rec.agent_id == "sim_aut" + assert rec.session_id == "20260515_120000" + assert rec.entity_class == "food" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 2: entity_class derivation heuristic +# ───────────────────────────────────────────────────────────────────── + + +class TestEntityClassDerivation: + """Best-effort heuristic — Roy-3 analysis aggregates with ``None`` + skipped from exposure-count normalization, so the heuristic only + needs to produce sensible classes when the tool is entity-bound. + Verb-only tools (``respond``, ``examine``) should return None.""" + + def test_explicit_param_wins(self) -> None: + assert _derive_entity_class("any_tool", {"entity_class": "food"}) == "food" + + def test_target_param_fallback(self) -> None: + assert _derive_entity_class("any_tool", {"target": "weapon"}) == "weapon" + + def test_entity_param_fallback(self) -> None: + assert _derive_entity_class("any_tool", {"entity": "infant_humanoid"}) == "infant_humanoid" + + def test_object_param_fallback(self) -> None: + assert _derive_entity_class("any_tool", {"object": "fire"}) == "fire" + + def test_param_priority_explicit_over_target(self) -> None: + """params["entity_class"] beats params["target"] when both present.""" + assert _derive_entity_class("any_tool", {"entity_class": "food", "target": "drink"}) == "food" + + def test_tool_name_verb_prefix_stripped(self) -> None: + """``sense_food_source`` → strip 'sense' verb + 'source' role suffix → 'food'.""" + assert _derive_entity_class("sense_food_source", {}) == "food" + + def test_tool_name_no_role_suffix(self) -> None: + """``use_weapon`` → strip 'use' verb → 'weapon'.""" + assert _derive_entity_class("use_weapon", {}) == "weapon" + + def test_verb_only_tools_return_none(self) -> None: + """``respond`` / ``examine`` / no underscore → None (not entity-bound).""" + assert _derive_entity_class("respond", {}) is None + assert _derive_entity_class("examine", {}) is None + assert _derive_entity_class("examine", {"target": ""}) is None + + def test_non_dict_params_returns_none(self) -> None: + """Defensive — params might be None or something weird in some paths.""" + assert _derive_entity_class("any_tool", None) is None # type: ignore[arg-type] + + +# ───────────────────────────────────────────────────────────────────── +# Layer 3: InstrumentedExecutor reads RequestContext +# ───────────────────────────────────────────────────────────────────── + + +class _StubExecutor: + """Minimal stand-in for the real Executor.""" + + def execute(self, action: dict[str, Any]) -> ToolOutput: + return ToolOutput(success=True, output={"ok": True}) + + +class TestInstrumentedExecutorTelemetry: + def test_record_populated_from_request_context(self) -> None: + sink = RecordingSink() + executor = InstrumentedExecutor(_StubExecutor(), sink) + ctx = new_request_context(agent_id="sim_aut", session_id="20260515_120000") + token = set_context(ctx) + try: + executor.execute({"tool_name": "sense_food_source", "params": {}}) + finally: + reset_context(token) + rec = sink.actions[-1] + assert rec.agent_id == "sim_aut" + assert rec.session_id == "20260515_120000" + # entity_class derived from tool name via verb-prefix strip. + assert rec.entity_class == "food" + + def test_no_context_bound_yields_none_fields(self) -> None: + """Unit tests / non-sim runtime paths don't bind RequestContext. + InstrumentedExecutor must not raise — record gets None fields.""" + sink = RecordingSink() + executor = InstrumentedExecutor(_StubExecutor(), sink) + # No set_context call — current_context() returns None. + executor.execute({"tool_name": "respond", "params": {}}) + rec = sink.actions[-1] + assert rec.agent_id is None + assert rec.session_id is None + assert rec.entity_class is None # respond is verb-only + + def test_record_block_populates_telemetry(self) -> None: + """Blocked actions also carry telemetry — useful for the + normalization analysis (blocked-but-attempted is meaningful).""" + sink = RecordingSink() + executor = InstrumentedExecutor(_StubExecutor(), sink) + ctx = new_request_context(agent_id="sim_aut", session_id="sid") + token = set_context(ctx) + try: + executor.record_block("infant_humanoid_pick_up", reason="too_heavy") + finally: + reset_context(token) + rec = sink.actions[-1] + assert rec.blocked is True + assert rec.block_reason == "too_heavy" + assert rec.agent_id == "sim_aut" + assert rec.session_id == "sid" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 4: RecordingSink compression preserves new fields +# ───────────────────────────────────────────────────────────────────── + + +class TestCompressionPreservesTelemetry: + """Per the plan: actions.jsonl is meant for POST-HOC analysis. If + compression dropped agent_id/session_id/entity_class, normalization + counts for long-running sims would silently skew (compressed half + looks anonymous; uncompressed half attributes correctly). Pin + the preservation contract.""" + + def test_compression_keeps_new_fields(self) -> None: + """``_compress_oldest`` strips heavy fields (tool_args, + result_output) but keeps a lightweight summary. The telemetry + fields are tiny and MUST be preserved so post-hoc attribution + analysis doesn't get an anonymous half of the record stream.""" + sink = RecordingSink(max_actions=4) + for i in range(6): + sink.record( + ActionRecord( + timestamp=float(i), + tool_name=f"tool_{i}", + tool_args={"large": "args"}, # heavy field that compression drops + result_output={"big": "output"}, # ditto + agent_id=f"agent_{i}", + session_id="shared_session", + entity_class=f"class_{i}", + ) + ) + # Compression fires when len > max — verified by checking + # the oldest half had heavy fields stripped. + compressed_count = sum(1 for r in sink.actions if r.tool_args == {}) + assert compressed_count > 0, "compression should have stripped at least some records" + # Telemetry fields survive compression in every record + # (compressed AND uncompressed). + for rec in sink.actions: + assert rec.agent_id is not None + assert rec.session_id == "shared_session" + assert rec.entity_class is not None + + +# ───────────────────────────────────────────────────────────────────── +# Layer 5: save_action_log writes new fields + format-version header +# ───────────────────────────────────────────────────────────────────── + + +class TestSaveActionLog: + """Per the plan: actions.jsonl now carries a ``_format_version`` + header line + telemetry fields per record. The header pattern is + CC1 contract — every persisted JSONL Maxim writes carries + ``_format_version`` at the head so future parsers can branch on + schema evolution.""" + + def test_writes_format_version_header(self, tmp_path: Path) -> None: + from maxim.simulation.report import save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [ + ActionRecord(timestamp=1.0, tool_name="respond", agent_id="sim_aut", session_id="sid"), + ] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="sid") + assert log_path is not None + lines = log_path.read_text().splitlines() + header = json.loads(lines[0]) + assert header["_format_version"] == "1.0" + assert header["_record_kind"] == "header" + assert header["session_id"] == "sid" + + def test_writes_telemetry_fields_per_record(self, tmp_path: Path) -> None: + from maxim.simulation.report import save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [ + ActionRecord( + timestamp=1.0, + tool_name="sense_food_source", + agent_id="sim_aut", + session_id="20260515_120000", + entity_class="food", + ), + ] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="20260515_120000") + assert log_path is not None + lines = log_path.read_text().splitlines() + # Skip the header line. + record = json.loads(lines[1]) + assert record["agent_id"] == "sim_aut" + assert record["session_id"] == "20260515_120000" + assert record["entity_class"] == "food" + + def test_header_appears_even_with_zero_records(self, tmp_path: Path) -> None: + """Empty action log still has the header — schema discovery + shouldn't depend on a record existing.""" + from maxim.simulation.report import save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="sid") + assert log_path is not None + lines = log_path.read_text().splitlines() + assert len(lines) == 1 + assert json.loads(lines[0])["_format_version"] == "1.0" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 6: sim_action entity_class kwarg +# ───────────────────────────────────────────────────────────────────── + + +class TestSimActionEntityClass: + """sim_action grows an ``entity_class`` kwarg that routes through + sim_log's data dict for post-hoc Roy-3 analysis. Existing + callers using the positional shape continue to work.""" + + def test_legacy_call_shape_unchanged(self) -> None: + """The plain ``sim_action(tool, success)`` call must still + work — most existing callers don't pass entity_class yet.""" + from maxim.simulation.sim_logger import sim_action + + # Should not raise; nothing else to assert because sim_log + # silently no-ops when sim logging isn't enabled. + sim_action("respond", True) + + def test_entity_class_threaded_through(self, tmp_path: Path) -> None: + """When sim_action is called with entity_class, it shows up + in the JSONL record's data dict.""" + from maxim.simulation.sim_logger import ( + disable_sim_logging, + enable_sim_logging, + sim_action, + ) + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + sim_action("sense_food_source", True, summary="found", entity_class="food") + finally: + disable_sim_logging() + + records = [json.loads(line) for line in log_path.read_text().splitlines()] + # Find the MOTOR record. + motor_records = [r for r in records if r.get("subsystem") == "MOTOR"] + assert len(motor_records) >= 1 + assert motor_records[0]["data"]["entity_class"] == "food" + + def test_entity_class_none_omits_field(self, tmp_path: Path) -> None: + """When entity_class is None (the default), it should NOT + appear as a key in the data dict — cluttering Roy-3 records + with ``entity_class: null`` for every verb-only tool is noise.""" + from maxim.simulation.sim_logger import ( + disable_sim_logging, + enable_sim_logging, + sim_action, + ) + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + sim_action("respond", True, summary="hi") + finally: + disable_sim_logging() + + records = [json.loads(line) for line in log_path.read_text().splitlines()] + motor_records = [r for r in records if r.get("subsystem") == "MOTOR"] + assert len(motor_records) >= 1 + # The `data` field is either absent or doesn't contain entity_class. + data = motor_records[0].get("data") or {} + assert "entity_class" not in data + + +# ───────────────────────────────────────────────────────────────────── +# Layer 7: Stage 0c — sim_recommend_action emission +# ───────────────────────────────────────────────────────────────────── + + +class TestRecommendActionEmission: + """Per the plan: EVERY recommend_action call emits exactly one + sim_recommend_action event, including the early-return paths. + Roy-3 needs to distinguish "gate fired, consumer did nothing" + from "consumer didn't run at all".""" + + def _fresh_nac(self) -> NAc: + return NAc(config=NACConfig()) + + def _read_recommend_records(self, log_path: Path) -> list[dict[str, Any]]: + records = [json.loads(line) for line in log_path.read_text().splitlines()] + return [r for r in records if r.get("subsystem") == "NAc_RECOMMEND"] + + def test_emission_on_success_path(self, tmp_path: Path) -> None: + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # Seed cluster bias so a tool wins. + nac.update_cluster_reward( + agent_id="sim_aut", + cluster_id="cluster-1", + tool_signature="tool:sense_food_source", + reward=10.0, + ) + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["sense_food_source"], + current_cluster_id="cluster-1", + ) + finally: + disable_sim_logging() + + assert result is not None + assert result["tool_name"] == "sense_food_source" + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is True + assert data["best_tool"] == "sense_food_source" + assert data["best_score"] > 0.0 + assert data["current_cluster_id"] == "cluster-1" + assert data["cluster_reward_bias_consulted"] is not None + + def test_emission_on_no_scores_path(self, tmp_path: Path) -> None: + """When recommend_action returns None because no tool scored, + the event MUST still emit — passed_gate=False, best_tool=None.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # No bias seeded; no drives; no causal links → empty scores. + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["sense_food_source"], + ) + finally: + disable_sim_logging() + + assert result is None + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is False + assert data["best_tool"] is None + assert data["best_score"] == 0.0 + + def test_emission_on_sub_threshold_path(self, tmp_path: Path) -> None: + """When scores exist but best_score < min_confidence, the event + emits with passed_gate=False AND best_tool populated.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # Seed a tiny bias well under min_confidence (0.3). + nac.update_cluster_reward( + agent_id="sim_aut", + cluster_id="cluster-1", + tool_signature="tool:sense_food_source", + reward=0.5, # alpha=0.15 → +0.075, below the 0.3 default gate + ) + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["sense_food_source"], + current_cluster_id="cluster-1", + min_confidence=0.3, + ) + finally: + disable_sim_logging() + + assert result is None + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is False + assert data["best_tool"] == "sense_food_source" + assert 0.0 < data["best_score"] < 0.3 + + def test_fail_soft_when_sim_logging_disabled(self) -> None: + """Non-sim runtime path: sim_log is a no-op when sim logging + isn't enabled, so recommend_action returns normally without + raising or even leaving a partial state.""" + nac = self._fresh_nac() + nac.update_cluster_reward("sim_aut", "c1", "tool:foo", reward=10.0) + # sim logging NOT enabled here. + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id="c1", + ) + assert result is not None + assert result["tool_name"] == "foo" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 8: RequestContext binding regression guard +# ───────────────────────────────────────────────────────────────────── + + +class TestRequestContextBindingShape: + """The Stage 0b binding is done in _aut_worker (orchestrator.py) + via new_request_context + set_context + reset_context. The actual + sim-orchestrator binding happens at runtime inside a thread, so + we test the SHAPE here: a fresh context binds, current_context() + reads back, reset restores.""" + + def test_round_trip(self) -> None: + ctx = new_request_context(agent_id="sim_aut", session_id="20260515_120000") + token = set_context(ctx) + try: + from maxim.utils.http import current_context + + current = current_context() + assert current is not None + assert current.agent_id == "sim_aut" + assert current.session_id == "20260515_120000" + finally: + reset_context(token) + # After reset, the binding is gone (or restored to whatever + # was bound before — pytest fixtures generally start with None). + from maxim.utils.http import current_context + + after = current_context() + # Either None or pre-existing; the new binding is gone. + if after is not None: + assert after.agent_id != "sim_aut" or after.session_id != "20260515_120000" + + def test_reset_in_finally_handles_exception(self) -> None: + """If the sim worker raises, the reset_context call in finally + must still fire so the binding doesn't leak to the next test.""" + ctx = new_request_context(agent_id="sim_aut", session_id="sid") + token = set_context(ctx) + try: + try: + raise RuntimeError("simulated worker failure") + except RuntimeError: + pass # The orchestrator catches this; here we just want + # to ensure reset is still reachable. + finally: + reset_context(token) + from maxim.utils.http import current_context + + # Either None or the prior binding — not the failed binding. + after = current_context() + if after is not None: + assert after.agent_id != "sim_aut" or after.session_id != "sid" + + +@pytest.fixture(autouse=True) +def _reset_sim_logger_state() -> Any: + """sim_logger has module-level state (the _sim_active flag, the + _log_file handle, the _log_records deque). Make sure each test + starts with sim logging DISABLED, even if a prior test forgot to + call disable_sim_logging().""" + from maxim.simulation.sim_logger import disable_sim_logging + + disable_sim_logging() + yield + disable_sim_logging() From 011c99571a0b6f3dadc849217bbf804b8dcafbe0 Mon Sep 17 00:00:00 2001 From: Denny Schaedig Date: Fri, 15 May 2026 21:56:55 -0600 Subject: [PATCH 2/2] fix(0.9.1-stage-0b-0c): fold pre-merge review findings (arch + bio) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two-lens pre-merge review of feat/0-9-1-stage-0b-0c-telemetry surfaced 3 Critical architecture findings + 1 cross-confirmed Important finding (both lenses) + 4 Important findings (mixed lens). All folded before opening the PR per feedback_review_before_ship.md. 32 tests passing (up from 27 — 5 new fold-regression guards). Critical (architecture): 1. tick mismatch with Stage 0d. Pre-fold, sim_recommend_action emitted tick=int(time.time()) (raw epoch ~1.7e9); Stage 0d's sim_ec_activation emits tick=int(time.time() - _sim_start) (elapsed seconds). A 1e9 offset would have made Roy-3 left-joins on tick return zero matches every time. Fix imports sim_logger and subtracts _sim_start. Pinned by test_tick_aligned_with_sim_logger_start. 2. _format_version "1.0" → "1.1". Plan's "Cross-cutting: persistence schema" section explicitly pins this at "1.1" (minor bump from pre-0b unversioned per CC1's "0.x" sentinel rule). Pre-fold shipped "1.0" — readers branching on version would have read the wrong dialect. Bumped + extracted to _ACTIONS_JSONL_FORMAT_VERSION module constant so the next bump is a one-line change. 3. Fourth silent-return-None early-exit was missing emission. recommend_action's `if not available_tools: return None` bailed before the emitter — Roy-3 couldn't distinguish "no tools available" from "no tools scored above gate." Pinned by test_emission_on_empty_available_tools_path. Cross-confirmed (architecture I2 + bio I1+I2): entity_class heuristic scoped to strict opt-in. Pre-fold, _derive_entity_class included a tool-name verb-prefix- strip heuristic that produced noise on non-entity tools: get_status → "status", set_entity_sensor → "entity_sensor", do_something_clever → "something_clever". Roy-3 normalization would have silently attributed pain events to fake entity classes. Bio-lens flagged the contamination question; arch-lens flagged the false-positive rate. Fix drops the verb-strip path entirely. Tool authors opt into Roy-3 attribution by passing entity_class through params (entity_class / target / entity / object). The post-fold heuristic is conservative: Roy-3 normalization skips None, so being more conservative is strictly safer than producing wrong buckets — silent miscount is worse than missing data. Docstring adds two bio-fidelity guardrails: - "DO NOT consume this field from any substrate write path" — walls entity_class off from NAc/EC/ATL/Hippocampus/PainBus, making the contamination question structurally unambiguous. - 1.1 TODO pointing to a declared `Tool.entity_class` field as the future shape — tracks the same surface as feedback_two_identity_schemes.md. Tests: test_tool_name_alone_does_not_derive verifies the dropped heuristic; test_non_entity_tools_with_underscores_return_none pins the false-positive regression guard. Bio I3: empty-scores cluster sentinel. On the empty-scores path, cluster_reward_bias_consulted was always None — conflating "agent had no active cluster" with "agent had a cluster but no tools scored." Roy-3 H1 disambiguation needs the distinction (the Wire-A vs recommend_action gap is exactly here). Post-fold: 0.0 sentinel when current_cluster_id is set, None when truly absent. Pinned by test_empty_scores_sentinel_distinguishes_cluster_known_vs_unknown. Architecture I5: use context_scope() helper instead of manual set_context/reset_context. Both AUT and orchestrator thread bindings now use the canonical helper from utils/http.py; future sim entry points cannot forget the reset. Architecture I4: explicit header-skip reader contract in save_action_log docstring. Pinned by test_consumer_can_skip_header_line which simulates the documented "skip _record_kind == 'header'" reader pattern. Bio nice-to-have: comment on the AUT/orch agent_id binding documenting that the current sim-fixed strings are correct for the single-AUT topology but NPCs spawned via AgentFactory in this orchestrator session would need per-spawn context_scope. Architecture nice-to-have: narrowed `except Exception` to `except ImportError` in _emit_recommend_action_event. Non-sim runtime is the only documented swallow case; other exceptions propagate so a real sim_logger bug surfaces. Deferred (architecture I1, I6, N1-N3): sub-second t ordering (documentation, no code), third-thread LLM worker test (existing Plan 4 A.2 inheritance), nice-to-have polish. Tracked in fold review thread. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 4 +- src/maxim/decisions/nac.py | 66 +++++-- src/maxim/simulation/instrumented_executor.py | 74 +++---- src/maxim/simulation/orchestrator.py | 138 +++++++------ src/maxim/simulation/report.py | 33 +++- tests/unit/test_stage_0b_0c_telemetry.py | 187 ++++++++++++++++-- 6 files changed, 366 insertions(+), 136 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 5ae202b1..2ef75322 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -399,8 +399,8 @@ MAXIM_NAC_MIN_CONFIDENCE=0.0 # Override propose_via_substrate's min_confiden MAXIM_EC_TRACE_ACTIVATIONS=1 # Gate per-tick `sim_ec_activation` JSONL events from EntorhinalCortex.pattern_complete_or_separate. Fields: agent_id, tick (int second bucket), active_node_id, activation_strength, modality_tag (linguistic/drive/sensor), modality, is_new. Off by default — Roy-4 sets it in the runner environment for the cross-modal binding pre-implementation validation experiment (scripts/analyze_roy_4_coactivation.py is the post-hoc analyzer). Falsy values ("0", "false", "no", "off", empty) disable. The instrumentation fires even on cold-start when active_node_id is freshly allocated, so pattern-separation events are visible in the co-activation matrix. # Action JSONL + recommend_action telemetry (release_0_9_1.md Stages 0b + 0c) — no env var; structural -# Stage 0b: actions.jsonl gains a header line with `_format_version: "1.0"` + per-record `agent_id` / `session_id` / `entity_class` fields populated from utils/http.py::current_context() (bound at the sim orchestrator entry on both AUT + orchestrator threads via set_context/reset_context). InstrumentedExecutor derives entity_class best-effort from tool params (params["entity_class"] → params["target"]/["entity"]/["object"] → tool-name heuristic stripping verb prefixes + role suffixes); None when not derivable. ActionRecord is shape-frozen at 1.0 (CC3) with three optional fields appended at the end — back-compat with existing ActionSink consumers. -# Stage 0c: NAc.recommend_action emits one `sim_log("NAc_RECOMMEND", ...)` event per call (including empty-scores and sub-threshold early-return paths — Roy-3 needs to distinguish "gate fired, consumer did nothing" from "consumer didn't run at all"). Fields: tick, current_cluster_id, cluster_reward_bias_consulted, best_tool, best_score, min_confidence, passed_gate. Routes through the standard sim_log JSONL writer + the MAXIM_LOG_FILE bridge. Fail-soft when sim_logger is unavailable — non-sim runtime calls don't crash on missing telemetry plumbing. +# Stage 0b: actions.jsonl gains a header line with `_format_version: "1.1"` (minor bump from pre-0b unversioned "0.x" per CC1) + per-record `agent_id` / `session_id` / `entity_class` fields populated from utils/http.py::current_context() (bound at the sim orchestrator entry on both AUT + orchestrator threads via `context_scope()`). InstrumentedExecutor derives entity_class strictly opt-in (params["entity_class"] → params["target"]/["entity"]/["object"]); the verb-prefix-strip heuristic was dropped in pre-merge review fold (too noisy on non-entity tools like `get_status` → "status"). Tool authors opt into Roy-3 attribution by passing entity_class through params; 1.1 ships declared `Tool.entity_class` field per the docstring TODO. ActionRecord is shape-frozen at 1.0 (CC3) with three optional fields appended at the end — back-compat with existing ActionSink consumers. **Reader contract:** `_record_kind == "header"` MUST be skipped before interpreting per-action fields. +# Stage 0c: NAc.recommend_action emits one `sim_log("NAc_RECOMMEND", ...)` event per call (including all FOUR early-return paths: empty available_tools, empty scores, sub-threshold, success — Roy-3 needs to distinguish "gate fired, consumer did nothing" from "consumer didn't run at all"). Fields: tick (int(time.time() - sim_logger._sim_start) — ALIGNED with Stage 0d's `sim_ec_activation` tick space so Roy-3 cross-channel joins work), current_cluster_id, cluster_reward_bias_consulted (0.0 sentinel when cluster_id known but no tool scored; None when cluster_id truly absent — distinction is load-bearing for Roy-3 H1 disambiguation), best_tool, best_score, min_confidence, passed_gate. Routes through the standard sim_log JSONL writer + the MAXIM_LOG_FILE bridge. Fail-soft on ImportError only (non-sim runtime); other exceptions propagate so a real sim_logger bug surfaces. # Leader proxy admission control MAXIM_PROXY_MAX_CONCURRENT=4 # Max in-flight requests to upstream (0=unlimited) diff --git a/src/maxim/decisions/nac.py b/src/maxim/decisions/nac.py index 31aef2ea..10486993 100644 --- a/src/maxim/decisions/nac.py +++ b/src/maxim/decisions/nac.py @@ -42,24 +42,38 @@ def _emit_recommend_action_event( """Emit a ``sim_recommend_action`` event for Stage 0c telemetry. Per release_0_9_1.md Stage 0c, every ``recommend_action`` call MUST - emit exactly one event — even the early-return paths (empty scores, - sub-threshold) — so Roy-3 measurement can distinguish "gate fired - but consumer did nothing" from "consumer ran and proposed nothing." + emit exactly one event — even the early-return paths (empty + available_tools, empty scores, sub-threshold) — so Roy-3 measurement + can distinguish "gate fired but consumer did nothing" from + "consumer ran and proposed nothing." The event lands on the ``sim_log("NAc_RECOMMEND", ...)`` channel, which routes through the standard sim_log JSONL writer + the - MAXIM_LOG_FILE bridge. The emission is fail-soft: any import or - sim-log error returns silently so a NAc decision in a non-sim - process never crashes on missing telemetry plumbing. + MAXIM_LOG_FILE bridge. + + **Tick alignment with Stage 0d (CRITICAL):** the ``tick`` field + matches Stage 0d's ``sim_ec_activation`` tick space — + ``int(time.time() - sim_logger._sim_start)``, NOT raw epoch seconds. + Without this alignment Roy-3 cannot left-join the two channels + on tick (a 1e9 offset returns zero matches every time). For + sub-second ordering use the sim_log JSONL's top-level ``t`` field, + which sim_log auto-attaches with millisecond resolution from the + same ``_sim_start`` reference. + + The emission is fail-soft: ``ImportError`` (non-sim runtime where + sim_logger isn't importable at all) is swallowed silently. Any + other exception propagates — a real sim_logger bug should surface + rather than masquerade as silent annotation-off. """ try: - from maxim.simulation.sim_logger import sim_log + from maxim.simulation import sim_logger as _sl - sim_log( + tick = int(time.time() - _sl._sim_start) if _sl._sim_start > 0.0 else 0 + _sl.sim_log( "NAc_RECOMMEND", f"recommend_action: passed_gate={passed_gate}", { - "tick": int(time.time()), + "tick": tick, "current_cluster_id": current_cluster_id, "cluster_reward_bias_consulted": cluster_reward_bias_consulted, "best_tool": best_tool, @@ -69,10 +83,14 @@ def _emit_recommend_action_event( }, agent_id=agent_id, ) - except Exception: - # sim_logger may not be available (non-sim runtime) or sim - # logging may not be active — Stage 0c is observability only, - # not load-bearing for correctness. Swallow silently. + except ImportError: + # Non-sim runtime: sim_logger isn't importable at all (e.g., + # headless API without the simulation extras). Stage 0c is + # observability only, not load-bearing for correctness — + # swallow silently. Any OTHER exception (a real sim_logger + # bug, an attribute error from a broken refactor) propagates + # so we don't silently disable telemetry the Roy-3 measurement + # arm depends on. pass @@ -1265,6 +1283,19 @@ def recommend_action( if not agent_id: raise ValueError("recommend_action requires non-empty agent_id") if not available_tools: + # Stage 0c: empty available_tools is a legitimate early return + # (e.g., the scene_actor filter trimmed the executor's tool set + # to nothing). Still emit so Roy-3 can distinguish "no tools + # available" from "no tools scored above gate." + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=None, + best_tool=None, + best_score=0.0, + min_confidence=min_confidence, + passed_gate=False, + ) return None drives = current_drives or {} @@ -1347,10 +1378,17 @@ def recommend_action( # nothing." Per the plan: "the event MUST emit even when # recommend_action returns None." if not scores: + # Bio-fidelity review fold: distinguish "cluster known, no + # tool scored" (0.0 sentinel — agent had context but nothing + # rewarded) from "cluster unknown" (None — no + # current_cluster_id at all). Roy-3 needs this distinction + # to expose the Wire-A vs recommend_action gap; collapsing + # both into None would elide the H1 signal. + _consulted_on_empty: float | None = 0.0 if current_cluster_id else None _emit_recommend_action_event( agent_id=agent_id, current_cluster_id=current_cluster_id, - cluster_reward_bias_consulted=None, + cluster_reward_bias_consulted=_consulted_on_empty, best_tool=None, best_score=0.0, min_confidence=min_confidence, diff --git a/src/maxim/simulation/instrumented_executor.py b/src/maxim/simulation/instrumented_executor.py index aff1a0c0..e4e0ef61 100644 --- a/src/maxim/simulation/instrumented_executor.py +++ b/src/maxim/simulation/instrumented_executor.py @@ -25,25 +25,43 @@ def _derive_entity_class(tool_name: str, params: dict[str, Any]) -> str | None: """Best-effort entity-class extraction for Stage 0b telemetry. - Roy-3 analysis normalizes pain-aversion counts per entity_class — - knowing the agent encountered "food" 50 times but felt pain 3 times - matters very differently from encountering "food" 5 times and feeling - pain 3 times. The exact derivation is fuzzy because Maxim's tool - surface mixes verb-only tools (``respond``, ``examine``) with - entity-bound tools (``infant_humanoid_pick_up``, ``sense_food_source``). - - Heuristics (lowest-cost-to-highest): - 1. ``params["entity_class"]`` — explicit caller override (highest fidelity). + **DO NOT consume this field from any substrate write path** (NAc, + EC, ATL, Hippocampus, PainBus). It exists for Roy-3 post-hoc + exposure-count normalization and the Roy harness's per-class + plotting. Substrate consumers must derive entity identity from + the percept text + EC pattern completion, NEVER from this field. + The bio-fidelity guardrail in the bio-lens review: this field is + walled off from the substrate so it can stay a best-effort + heuristic without contaminating the 1.0 thesis ("substrate carries + cognition; language is I/O"). + + **Strict opt-in derivation:** ships explicit-param-only at 0.9.1 + after the pre-merge review caught the verb-strip heuristic + producing noisy buckets on non-entity tools (``get_status`` → + ``"status"``, ``set_entity_sensor`` → ``"entity_sensor"``, + ``do_something_clever`` → ``"something_clever"``). Roy-3 + normalization explicitly skips ``None``, so being conservative is + strictly safer than producing wrong buckets — silent miscount is + worse than missing data. + + Heuristics in priority order: + 1. ``params["entity_class"]`` — explicit caller override. 2. ``params["target"]`` / ``params["entity"]`` / ``params["object"]`` — the conventional param names entity-binding tools use. - 3. Tool-name prefix split — ``infant_humanoid_pick_up`` → ``infant_humanoid``; - ``sense_food_source`` → ``food`` (heuristic: skip leading verb token). - - Returns ``None`` when nothing in the action surface looks - entity-bound (e.g., ``respond``, ``examine``, sleep tools). The - field is best-effort metadata, never load-bearing for correctness; - Roy-3 analysis aggregates with ``None`` skipped from - exposure-count normalization. + + Returns ``None`` when neither (1) nor (2) is present, including + for tools whose name suggests an entity binding but didn't pass + one through params (``infant_humanoid_pick_up`` with no target → + None). The field is best-effort metadata. + + TODO (1.1): replace this opt-in heuristic with a declared + ``Tool.entity_class: str | None`` field on the Tool ABC, so tool + authors can opt their tools into Roy-3 attribution explicitly + without participating in this derivation logic at all. Tracks + the same surface as ``feedback_two_identity_schemes.md`` — the + substrate already uses tool-name AND EC-cluster identity for one + concept; declared ``entity_class`` would be a third explicit + handle that tooling can rely on. """ if not isinstance(params, dict): return None @@ -56,25 +74,9 @@ def _derive_entity_class(tool_name: str, params: dict[str, Any]) -> str | None: val = params.get(key) if isinstance(val, str) and val: return val - # 3. Tool-name heuristic. Strip leading verb tokens. - if "_" in tool_name: - parts = tool_name.split("_") - # Common verb prefixes that tools start with — these are NOT - # the entity class. - verb_prefixes = {"sense", "use", "do", "get", "set", "make", "go", "look", "examine"} - if parts and parts[0].lower() in verb_prefixes: - remainder = "_".join(parts[1:]) - if remainder: - # ``sense_food_source`` → ``food_source`` after strip; - # further trim trailing role tokens like ``_source`` / - # ``_target`` so the bucket reads as ``food``. - role_suffixes = {"source", "target", "object"} - tail_parts = remainder.split("_") - while tail_parts and tail_parts[-1].lower() in role_suffixes: - tail_parts.pop() - if tail_parts: - return "_".join(tail_parts) - return remainder + # No verb-strip path: pre-merge review showed it produced noise + # on non-entity tools that Roy-3 normalization would silently + # mis-attribute. Future work tracked in the docstring TODO. return None diff --git a/src/maxim/simulation/orchestrator.py b/src/maxim/simulation/orchestrator.py index eea235e2..b8078882 100644 --- a/src/maxim/simulation/orchestrator.py +++ b/src/maxim/simulation/orchestrator.py @@ -1495,48 +1495,59 @@ def _get_component_integrity(name: str) -> float: def _aut_worker() -> None: # Stage 0b (release_0_9_1.md): bind RequestContext on the AUT - # thread so InstrumentedExecutor.execute(), recommend_action's - # sim_recommend_action emitter, and any other downstream code - # reading utils/http.py::current_context() see the right - # agent_id + session_id pair. ContextVars are per-thread; the - # main-thread binding doesn't reach here without copy_context. + # thread via context_scope() so InstrumentedExecutor.execute(), + # recommend_action's sim_recommend_action emitter, and any + # other downstream code reading utils/http.py::current_context() + # see the right agent_id + session_id pair. ContextVars are + # per-thread; the main-thread binding doesn't reach here + # without copy_context. context_scope is a context manager — + # its __exit__ resets the binding even on exception, which is + # what the pre-merge review (architecture lens I5) recommended + # over manual set_context/reset_context in try/finally so + # future sim entry points cannot forget the reset. # Bound BEFORE sim_agent_context so the typed RequestContext # and the sim_logger contextvar agree on agent identity. - from maxim.utils.http import new_request_context, reset_context, set_context + # + # NOTE: this binding is correct for the current sim topology + # (one AUT, one orch, no AgentFactory sub-agents in the sim + # path). If NPCs spawned via AgentFactory start producing + # ActionRecords in this orchestrator session, every record + # will carry agent_id="sim_aut" instead of the NPC's per-agent + # stash id. Bio-lens nice-to-have: per-spawn context_scope + # inside the NPC's tool-dispatch boundary would be the fix + # when that surface ships. + from maxim.utils.http import context_scope, new_request_context - _aut_request_ctx = new_request_context(agent_id="sim_aut", session_id=session_id) - _aut_request_token = set_context(_aut_request_ctx) try: - with sim_agent_context("sim_aut"): - run_agentic_loop( - aut_agent, - aut_env, - aut_state, - aut_memory, - aut_decision_engine, - aut_executor, - autonomy_controller=aut_autonomy, - llm_worker=aut_llm_worker, - default_network=aut_default_network, - hippocampus=aut_hippocampus, - memory_hub=aut_memory_hub, - max_steps=0, # unlimited — AUT stops when bridge.finish() is called - stop_event=stop_event, - target_hz=2.0, - percept_source=bridge.percept_source, - action_sink=bridge.action_sink, - pain_bus=aut_pain_bus, - imagination_trigger=aut_imagination_trigger, - bio_enrichment_pipeline=aut_bio_enrichment_pipeline, - thought_gate=_aut_thought_gate, - aut_mode=aut_mode, - substrate_telemetry=aut_substrate_telemetry, - ) + with context_scope(new_request_context(agent_id="sim_aut", session_id=session_id)): + with sim_agent_context("sim_aut"): + run_agentic_loop( + aut_agent, + aut_env, + aut_state, + aut_memory, + aut_decision_engine, + aut_executor, + autonomy_controller=aut_autonomy, + llm_worker=aut_llm_worker, + default_network=aut_default_network, + hippocampus=aut_hippocampus, + memory_hub=aut_memory_hub, + max_steps=0, # unlimited — AUT stops when bridge.finish() is called + stop_event=stop_event, + target_hz=2.0, + percept_source=bridge.percept_source, + action_sink=bridge.action_sink, + pain_bus=aut_pain_bus, + imagination_trigger=aut_imagination_trigger, + bio_enrichment_pipeline=aut_bio_enrichment_pipeline, + thought_gate=_aut_thought_gate, + aut_mode=aut_mode, + substrate_telemetry=aut_substrate_telemetry, + ) except Exception as e: aut_error.append(e) logger.error("AUT loop failed: %s", e) - finally: - reset_context(_aut_request_token) aut_thread = threading.Thread(target=_aut_worker, name="sim.aut", daemon=True) aut_thread.start() @@ -2389,42 +2400,41 @@ def _orch_action_count() -> int: # orch-side action records (rare — most actions land on the AUT # sink, but orchestrator tools still execute) carry agent_id + # session_id. Symmetric with the AUT thread binding above. Runs - # on the main thread; the reset in `finally` keeps the bind - # scoped to the sim run. - from maxim.utils.http import new_request_context as _new_ctx - from maxim.utils.http import reset_context as _reset_ctx - from maxim.utils.http import set_context as _set_ctx - - _orch_request_ctx = _new_ctx(agent_id="sim_orchestrator", session_id=session_id) - _orch_request_token = _set_ctx(_orch_request_ctx) + # on the main thread; context_scope's __exit__ resets the bind + # on normal return AND on exception, so the bind is scoped to + # exactly the run_agentic_loop window. Per pre-merge review + # architecture lens I5, this replaces a manual set_context / + # reset_context try/finally with the canonical helper. + from maxim.utils.http import context_scope, new_request_context + try: - with sim_agent_context("sim_orchestrator"): - run_agentic_loop( - orch_agent, - orch_env, - orch_state, - orch_memory, - orch_decision_engine, - orch_executor, - autonomy_controller=orch_autonomy, - llm_worker=orch_llm_worker, - # NOTE: orchestrator hippocampus disabled for now — it captures - # every tool call as an episodic memory, which is noisy. - # Re-enable when cross-session learning (Phase 3) is tuned. - # hippocampus=orch_hippocampus, - # memory_hub=orch_memory_hub, - max_steps=0, # unlimited — stops via FinishSimulationTool or /cancel - stop_event=stop_event, - target_hz=2.0, - percept_source=orchestrator_source, - ) + with context_scope(new_request_context(agent_id="sim_orchestrator", session_id=session_id)): + with sim_agent_context("sim_orchestrator"): + run_agentic_loop( + orch_agent, + orch_env, + orch_state, + orch_memory, + orch_decision_engine, + orch_executor, + autonomy_controller=orch_autonomy, + llm_worker=orch_llm_worker, + # NOTE: orchestrator hippocampus disabled for now — it captures + # every tool call as an episodic memory, which is noisy. + # Re-enable when cross-session learning (Phase 3) is tuned. + # hippocampus=orch_hippocampus, + # memory_hub=orch_memory_hub, + max_steps=0, # unlimited — stops via FinishSimulationTool or /cancel + stop_event=stop_event, + target_hz=2.0, + percept_source=orchestrator_source, + ) except KeyboardInterrupt: display_summary(["Simulation stopped by user"]) except Exception as e: orch_error.append(e) logger.error("Orchestrator loop failed: %s", e) finally: - _reset_ctx(_orch_request_token) # Always clean up, even on interrupt bridge._spinner.stop() diff --git a/src/maxim/simulation/report.py b/src/maxim/simulation/report.py index ebdad595..b16bc3fb 100644 --- a/src/maxim/simulation/report.py +++ b/src/maxim/simulation/report.py @@ -350,16 +350,32 @@ def save_report(report: SimulationReport, base_dir: str | None = None) -> Path: return report_path +_ACTIONS_JSONL_FORMAT_VERSION = "1.1" +"""actions.jsonl ``_format_version``. Per release_0_9_1.md Stage 0b +("Cross-cutting: persistence schema"), this file ships at "1.1" — the +addition of the header line + ``agent_id`` / ``session_id`` / +``entity_class`` per-record fields is a minor bump from the pre-0b +unversioned ("0.x" per CC1) format. A future change that requires +readers to handle a removed field is a major bump.""" + + def save_action_log(bridge: Any, base_dir: str, session_id: str) -> Path | None: """Save all action records as JSONL for post-hoc analysis. - The first line is a header record carrying ``_format_version`` per - CLAUDE.md CC1 (Stage 0b, release_0_9_1.md). Per-action records - follow, one per line. Each carries Stage 0b telemetry fields - (``agent_id``, ``session_id``, ``entity_class``) populated by + **Reader contract (Stage 0b):** the first line is a header record + carrying ``_format_version`` per CLAUDE.md CC1. Consumers MUST + skip any line where ``_record_kind == "header"`` before + interpreting per-action fields. Roy analyzers shipped in 0.9.1+ + already follow this rule; third-party tooling that iterated the + file assuming every line is a record needs a one-line filter + update. + + Per-action records carry Stage 0b telemetry fields (``agent_id``, + ``session_id``, ``entity_class``) populated by ``InstrumentedExecutor`` from the bound ``RequestContext`` — ``None`` when the context was unbound at execution time (e.g., - pre-0b sims, headless API runs). + pre-0b sims, headless API runs) or when ``entity_class`` couldn't + be derived from tool params. Format-version evolution rule: appending optional fields to the per-action record is back-compat (existing parsers ignore unknown @@ -372,10 +388,11 @@ def save_action_log(bridge: Any, base_dir: str, session_id: str) -> Path | None: try: with open(str(log_path), "w", encoding="utf-8") as f: # Stage 0b: format-version header (one-line schema marker - # at the top of the JSONL — existing per-record parsers - # ignore unknown top-level keys, so this is back-compat). + # at the top of the JSONL). Consumers MUST skip lines + # where _record_kind=="header"; the docstring contract + # above is the single source of truth. header = { - "_format_version": "1.0", + "_format_version": _ACTIONS_JSONL_FORMAT_VERSION, "_record_kind": "header", "session_id": session_id, } diff --git a/tests/unit/test_stage_0b_0c_telemetry.py b/tests/unit/test_stage_0b_0c_telemetry.py index 88a7f3b2..4ce4ece8 100644 --- a/tests/unit/test_stage_0b_0c_telemetry.py +++ b/tests/unit/test_stage_0b_0c_telemetry.py @@ -96,13 +96,20 @@ def test_param_priority_explicit_over_target(self) -> None: """params["entity_class"] beats params["target"] when both present.""" assert _derive_entity_class("any_tool", {"entity_class": "food", "target": "drink"}) == "food" - def test_tool_name_verb_prefix_stripped(self) -> None: - """``sense_food_source`` → strip 'sense' verb + 'source' role suffix → 'food'.""" - assert _derive_entity_class("sense_food_source", {}) == "food" - - def test_tool_name_no_role_suffix(self) -> None: - """``use_weapon`` → strip 'use' verb → 'weapon'.""" - assert _derive_entity_class("use_weapon", {}) == "weapon" + def test_tool_name_alone_does_not_derive(self) -> None: + """Pre-merge review fold: the verb-strip heuristic was dropped + as too noisy. Tools whose name suggests an entity binding + (``sense_food_source``, ``infant_humanoid_pick_up``) but don't + pass an entity param through the call now return None. + + Roy-3 normalization explicitly skips None, so being conservative + is strictly safer than producing wrong buckets — silent miscount + is worse than missing data. The future fix (1.1 TODO in the + derivation docstring) declares ``Tool.entity_class`` on the + Tool ABC so authors opt in explicitly.""" + assert _derive_entity_class("sense_food_source", {}) is None + assert _derive_entity_class("infant_humanoid_pick_up", {}) is None + assert _derive_entity_class("use_weapon", {}) is None def test_verb_only_tools_return_none(self) -> None: """``respond`` / ``examine`` / no underscore → None (not entity-bound).""" @@ -110,10 +117,31 @@ def test_verb_only_tools_return_none(self) -> None: assert _derive_entity_class("examine", {}) is None assert _derive_entity_class("examine", {"target": ""}) is None + def test_non_entity_tools_with_underscores_return_none(self) -> None: + """Architecture lens I3 regression guard. The pre-fold verb-strip + heuristic produced noise on these tools — Roy-3 would have + attributed pain events to ``"status"`` or ``"entity_sensor"`` + as if they were real entity classes. With the heuristic dropped, + these all return None.""" + assert _derive_entity_class("get_status", {}) is None + assert _derive_entity_class("set_entity_sensor", {}) is None + assert _derive_entity_class("do_something_clever", {}) is None + assert _derive_entity_class("make_recommendation", {}) is None + assert _derive_entity_class("look_around", {}) is None + def test_non_dict_params_returns_none(self) -> None: """Defensive — params might be None or something weird in some paths.""" assert _derive_entity_class("any_tool", None) is None # type: ignore[arg-type] + def test_entity_param_with_underscore_tool_name_wins(self) -> None: + """Even when the tool name has underscores, an explicit + entity-class param wins. This is the supported path tool + authors use to opt into Roy-3 attribution today (until 1.1 + ships the ``Tool.entity_class`` declared field).""" + # Old verb-strip would have stripped "sense" → "food" too, + # but the param is the authoritative source either way. + assert _derive_entity_class("sense_food_source", {"target": "apple"}) == "apple" + # ───────────────────────────────────────────────────────────────────── # Layer 3: InstrumentedExecutor reads RequestContext @@ -134,13 +162,20 @@ def test_record_populated_from_request_context(self) -> None: ctx = new_request_context(agent_id="sim_aut", session_id="20260515_120000") token = set_context(ctx) try: - executor.execute({"tool_name": "sense_food_source", "params": {}}) + # Tool author opts into Roy-3 attribution via the + # explicit ``entity_class`` param (the only path the + # post-fold heuristic accepts). + executor.execute( + { + "tool_name": "sense_food_source", + "params": {"entity_class": "food"}, + } + ) finally: reset_context(token) rec = sink.actions[-1] assert rec.agent_id == "sim_aut" assert rec.session_id == "20260515_120000" - # entity_class derived from tool name via verb-prefix strip. assert rec.entity_class == "food" def test_no_context_bound_yields_none_fields(self) -> None: @@ -228,7 +263,7 @@ class TestSaveActionLog: schema evolution.""" def test_writes_format_version_header(self, tmp_path: Path) -> None: - from maxim.simulation.report import save_action_log + from maxim.simulation.report import _ACTIONS_JSONL_FORMAT_VERSION, save_action_log bridge = MagicMock() bridge.get_all_actions.return_value = [ @@ -238,10 +273,46 @@ def test_writes_format_version_header(self, tmp_path: Path) -> None: assert log_path is not None lines = log_path.read_text().splitlines() header = json.loads(lines[0]) - assert header["_format_version"] == "1.0" + # Plan release_0_9_1.md § "Cross-cutting: persistence schema" + # pins this at "1.1" — minor bump from pre-0b unversioned ("0.x"). + assert _ACTIONS_JSONL_FORMAT_VERSION == "1.1" + assert header["_format_version"] == "1.1" assert header["_record_kind"] == "header" assert header["session_id"] == "sid" + def test_consumer_can_skip_header_line(self, tmp_path: Path) -> None: + """Architecture lens I4 regression guard. The header-line is a + schema change for actions.jsonl consumers — third-party tooling + iterating the file as "every line is a record" needs to skip + ``_record_kind == "header"``. The docstring contract is + load-bearing; this test pins it as a real reader pattern.""" + from maxim.simulation.report import save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [ + ActionRecord( + timestamp=1.0, + tool_name="sense_food_source", + agent_id="sim_aut", + session_id="sid", + entity_class="food", + ), + ActionRecord(timestamp=2.0, tool_name="respond", agent_id="sim_aut", session_id="sid"), + ] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="sid") + assert log_path is not None + # Simulate the documented reader pattern: + records = [] + with log_path.open() as f: + for line in f: + obj = json.loads(line) + if obj.get("_record_kind") == "header": + continue + records.append(obj) + assert len(records) == 2 + assert records[0]["tool"] == "sense_food_source" + assert records[1]["tool"] == "respond" + def test_writes_telemetry_fields_per_record(self, tmp_path: Path) -> None: from maxim.simulation.report import save_action_log @@ -275,7 +346,7 @@ def test_header_appears_even_with_zero_records(self, tmp_path: Path) -> None: assert log_path is not None lines = log_path.read_text().splitlines() assert len(lines) == 1 - assert json.loads(lines[0])["_format_version"] == "1.0" + assert json.loads(lines[0])["_format_version"] == "1.1" # ───────────────────────────────────────────────────────────────────── @@ -468,6 +539,98 @@ def test_fail_soft_when_sim_logging_disabled(self) -> None: assert result is not None assert result["tool_name"] == "foo" + def test_emission_on_empty_available_tools_path(self, tmp_path: Path) -> None: + """Architecture lens C3 regression guard. Pre-fold, + ``available_tools=[]`` short-circuited before emitting, + leaving Roy-3 unable to distinguish "no tools available" + from "no tools scored above gate." Post-fold, this path + emits with best_tool=None, best_score=0.0, passed_gate=False.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + result = nac.recommend_action(agent_id="sim_aut", available_tools=[]) + finally: + disable_sim_logging() + + assert result is None + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is False + assert data["best_tool"] is None + assert data["best_score"] == 0.0 + + def test_tick_aligned_with_sim_logger_start(self, tmp_path: Path) -> None: + """Architecture lens C1 regression guard. Pre-fold, tick was + ``int(time.time())`` (raw epoch ~1.7e9), while Stage 0d's + ``sim_ec_activation`` uses ``int(time.time() - _sim_start)`` + (elapsed seconds, 0..N). A 1e9 offset means Roy-3 left-joins + on tick return zero matches every time. Post-fold, both + channels emit comparable tick values from the same _sim_start + reference — a tick within ~1s of sim start is a small int, + not an epoch.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + nac.update_cluster_reward("sim_aut", "c1", "tool:foo", reward=10.0) + nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id="c1", + ) + finally: + disable_sim_logging() + + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + tick = recs[0]["data"]["tick"] + # Elapsed-seconds tick within a fresh sim should be tiny. + # If we accidentally regress to raw epoch, this would be ~1.7e9. + assert 0 <= tick < 60, f"tick={tick} looks like raw epoch (regression to int(time.time()))" + + def test_empty_scores_sentinel_distinguishes_cluster_known_vs_unknown(self, tmp_path: Path) -> None: + """Bio-fidelity I3 regression guard. On the empty-scores path: + - cluster_id known but no tool scored → 0.0 sentinel + - cluster_id absent → None + Roy-3 disambiguation depends on this — otherwise "agent had no + active cluster" and "agent had a cluster but no tools scored" + collapse into the same record.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # Path A: cluster known, no tool scored (no bias seeded). + nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id="c1", + ) + # Path B: cluster unknown. + nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id=None, + ) + finally: + disable_sim_logging() + + recs = self._read_recommend_records(log_path) + assert len(recs) == 2 + # Order matches call order. + path_a, path_b = recs[0]["data"], recs[1]["data"] + assert path_a["current_cluster_id"] == "c1" + assert path_a["cluster_reward_bias_consulted"] == 0.0 # known but no signal + assert path_b["current_cluster_id"] is None + assert path_b["cluster_reward_bias_consulted"] is None # truly absent + # ───────────────────────────────────────────────────────────────────── # Layer 8: RequestContext binding regression guard