diff --git a/CLAUDE.md b/CLAUDE.md index 1aec5b6f..2ef75322 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -398,6 +398,10 @@ MAXIM_NAC_MIN_CONFIDENCE=0.0 # Override propose_via_substrate's min_confiden # EC activation instrumentation (release_0_9_1.md Stage 0d, cross_modal_substrate_binding.md Stage 1) MAXIM_EC_TRACE_ACTIVATIONS=1 # Gate per-tick `sim_ec_activation` JSONL events from EntorhinalCortex.pattern_complete_or_separate. Fields: agent_id, tick (int second bucket), active_node_id, activation_strength, modality_tag (linguistic/drive/sensor), modality, is_new. Off by default — Roy-4 sets it in the runner environment for the cross-modal binding pre-implementation validation experiment (scripts/analyze_roy_4_coactivation.py is the post-hoc analyzer). Falsy values ("0", "false", "no", "off", empty) disable. The instrumentation fires even on cold-start when active_node_id is freshly allocated, so pattern-separation events are visible in the co-activation matrix. +# Action JSONL + recommend_action telemetry (release_0_9_1.md Stages 0b + 0c) — no env var; structural +# Stage 0b: actions.jsonl gains a header line with `_format_version: "1.1"` (minor bump from pre-0b unversioned "0.x" per CC1) + per-record `agent_id` / `session_id` / `entity_class` fields populated from utils/http.py::current_context() (bound at the sim orchestrator entry on both AUT + orchestrator threads via `context_scope()`). InstrumentedExecutor derives entity_class strictly opt-in (params["entity_class"] → params["target"]/["entity"]/["object"]); the verb-prefix-strip heuristic was dropped in pre-merge review fold (too noisy on non-entity tools like `get_status` → "status"). Tool authors opt into Roy-3 attribution by passing entity_class through params; 1.1 ships declared `Tool.entity_class` field per the docstring TODO. ActionRecord is shape-frozen at 1.0 (CC3) with three optional fields appended at the end — back-compat with existing ActionSink consumers. **Reader contract:** `_record_kind == "header"` MUST be skipped before interpreting per-action fields. +# Stage 0c: NAc.recommend_action emits one `sim_log("NAc_RECOMMEND", ...)` event per call (including all FOUR early-return paths: empty available_tools, empty scores, sub-threshold, success — Roy-3 needs to distinguish "gate fired, consumer did nothing" from "consumer didn't run at all"). Fields: tick (int(time.time() - sim_logger._sim_start) — ALIGNED with Stage 0d's `sim_ec_activation` tick space so Roy-3 cross-channel joins work), current_cluster_id, cluster_reward_bias_consulted (0.0 sentinel when cluster_id known but no tool scored; None when cluster_id truly absent — distinction is load-bearing for Roy-3 H1 disambiguation), best_tool, best_score, min_confidence, passed_gate. Routes through the standard sim_log JSONL writer + the MAXIM_LOG_FILE bridge. Fail-soft on ImportError only (non-sim runtime); other exceptions propagate so a real sim_logger bug surfaces. + # Leader proxy admission control MAXIM_PROXY_MAX_CONCURRENT=4 # Max in-flight requests to upstream (0=unlimited) MAXIM_PROXY_RATE_LIMIT_RPM=0 # Per-peer requests/minute (0=unlimited) diff --git a/src/maxim/decisions/nac.py b/src/maxim/decisions/nac.py index b01d229e..10486993 100644 --- a/src/maxim/decisions/nac.py +++ b/src/maxim/decisions/nac.py @@ -29,6 +29,71 @@ logger = logging.getLogger(__name__) +def _emit_recommend_action_event( + *, + agent_id: str, + current_cluster_id: str | None, + cluster_reward_bias_consulted: float | None, + best_tool: str | None, + best_score: float, + min_confidence: float, + passed_gate: bool, +) -> None: + """Emit a ``sim_recommend_action`` event for Stage 0c telemetry. + + Per release_0_9_1.md Stage 0c, every ``recommend_action`` call MUST + emit exactly one event — even the early-return paths (empty + available_tools, empty scores, sub-threshold) — so Roy-3 measurement + can distinguish "gate fired but consumer did nothing" from + "consumer ran and proposed nothing." + + The event lands on the ``sim_log("NAc_RECOMMEND", ...)`` channel, + which routes through the standard sim_log JSONL writer + the + MAXIM_LOG_FILE bridge. + + **Tick alignment with Stage 0d (CRITICAL):** the ``tick`` field + matches Stage 0d's ``sim_ec_activation`` tick space — + ``int(time.time() - sim_logger._sim_start)``, NOT raw epoch seconds. + Without this alignment Roy-3 cannot left-join the two channels + on tick (a 1e9 offset returns zero matches every time). For + sub-second ordering use the sim_log JSONL's top-level ``t`` field, + which sim_log auto-attaches with millisecond resolution from the + same ``_sim_start`` reference. + + The emission is fail-soft: ``ImportError`` (non-sim runtime where + sim_logger isn't importable at all) is swallowed silently. Any + other exception propagates — a real sim_logger bug should surface + rather than masquerade as silent annotation-off. + """ + try: + from maxim.simulation import sim_logger as _sl + + tick = int(time.time() - _sl._sim_start) if _sl._sim_start > 0.0 else 0 + _sl.sim_log( + "NAc_RECOMMEND", + f"recommend_action: passed_gate={passed_gate}", + { + "tick": tick, + "current_cluster_id": current_cluster_id, + "cluster_reward_bias_consulted": cluster_reward_bias_consulted, + "best_tool": best_tool, + "best_score": round(best_score, 4), + "min_confidence": min_confidence, + "passed_gate": passed_gate, + }, + agent_id=agent_id, + ) + except ImportError: + # Non-sim runtime: sim_logger isn't importable at all (e.g., + # headless API without the simulation extras). Stage 0c is + # observability only, not load-bearing for correctness — + # swallow silently. Any OTHER exception (a real sim_logger + # bug, an attribute error from a broken refactor) propagates + # so we don't silently disable telemetry the Roy-3 measurement + # arm depends on. + pass + + @dataclass(frozen=True) class NACConfig: """Configuration for Nucleus Accumbens.""" @@ -1218,6 +1283,19 @@ def recommend_action( if not agent_id: raise ValueError("recommend_action requires non-empty agent_id") if not available_tools: + # Stage 0c: empty available_tools is a legitimate early return + # (e.g., the scene_actor filter trimmed the executor's tool set + # to nothing). Still emit so Roy-3 can distinguish "no tools + # available" from "no tools scored above gate." + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=None, + best_tool=None, + best_score=0.0, + min_confidence=min_confidence, + passed_gate=False, + ) return None drives = current_drives or {} @@ -1292,14 +1370,67 @@ def recommend_action( scores[tool_name] = score reasoning_parts[tool_name] = parts + # Stage 0c (release_0_9_1.md): emit `sim_recommend_action` for + # post-hoc Roy-3 measurement. Every recommend_action call emits + # exactly one event — even on the early-return paths (no scores, + # sub-threshold) — so Roy iterations can distinguish "gate fired + # but consumer didn't run" from "consumer ran and proposed + # nothing." Per the plan: "the event MUST emit even when + # recommend_action returns None." if not scores: + # Bio-fidelity review fold: distinguish "cluster known, no + # tool scored" (0.0 sentinel — agent had context but nothing + # rewarded) from "cluster unknown" (None — no + # current_cluster_id at all). Roy-3 needs this distinction + # to expose the Wire-A vs recommend_action gap; collapsing + # both into None would elide the H1 signal. + _consulted_on_empty: float | None = 0.0 if current_cluster_id else None + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=_consulted_on_empty, + best_tool=None, + best_score=0.0, + min_confidence=min_confidence, + passed_gate=False, + ) return None best_tool = max(scores, key=lambda t: (scores[t], t)) best_score = scores[best_tool] + + # Record the cluster_reward_bias consulted for the best tool — + # informative for Roy-3 because Wire-A renders aggregate biases + # across all clusters, but recommend_action only consults the + # active-cluster value. Mismatch between rendered Wire-A signal + # and consulted recommend_action signal is the failure mode the + # H1 sub-hypothesis branches (cross_modal_substrate_binding.md / + # jepa_cross_modal_alignment.md) eventually address. + consulted_bias: float | None = None + if current_cluster_id: + consulted_bias = self.cluster_reward_bias(agent_id, current_cluster_id, f"tool:{best_tool}") + if best_score < min_confidence: + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=consulted_bias, + best_tool=best_tool, + best_score=best_score, + min_confidence=min_confidence, + passed_gate=False, + ) return None + _emit_recommend_action_event( + agent_id=agent_id, + current_cluster_id=current_cluster_id, + cluster_reward_bias_consulted=consulted_bias, + best_tool=best_tool, + best_score=best_score, + min_confidence=min_confidence, + passed_gate=True, + ) return { "tool_name": best_tool, "params": {}, diff --git a/src/maxim/simulation/instrumented_executor.py b/src/maxim/simulation/instrumented_executor.py index 65334a7d..e4e0ef61 100644 --- a/src/maxim/simulation/instrumented_executor.py +++ b/src/maxim/simulation/instrumented_executor.py @@ -3,6 +3,13 @@ Captures every tool execution (success, failure, and autonomy rejections) as ActionRecords in a RecordingSink. Transparently wraps an existing Executor without changing its interface. + +Stage 0b (release_0_9_1.md) telemetry: each record carries +``agent_id`` / ``session_id`` from the ``utils/http.py::current_context`` +ContextVar (bound at the sim orchestrator entry) and a best-effort +``entity_class`` derived from the action's params. The fields default +to ``None`` when context isn't bound (e.g., unit tests, headless API), +so the producer never raises. """ from __future__ import annotations @@ -12,6 +19,65 @@ from maxim.simulation.sinks import ActionRecord, ActionSink from maxim.tools.base import ToolOutput +from maxim.utils.http import current_context + + +def _derive_entity_class(tool_name: str, params: dict[str, Any]) -> str | None: + """Best-effort entity-class extraction for Stage 0b telemetry. + + **DO NOT consume this field from any substrate write path** (NAc, + EC, ATL, Hippocampus, PainBus). It exists for Roy-3 post-hoc + exposure-count normalization and the Roy harness's per-class + plotting. Substrate consumers must derive entity identity from + the percept text + EC pattern completion, NEVER from this field. + The bio-fidelity guardrail in the bio-lens review: this field is + walled off from the substrate so it can stay a best-effort + heuristic without contaminating the 1.0 thesis ("substrate carries + cognition; language is I/O"). + + **Strict opt-in derivation:** ships explicit-param-only at 0.9.1 + after the pre-merge review caught the verb-strip heuristic + producing noisy buckets on non-entity tools (``get_status`` → + ``"status"``, ``set_entity_sensor`` → ``"entity_sensor"``, + ``do_something_clever`` → ``"something_clever"``). Roy-3 + normalization explicitly skips ``None``, so being conservative is + strictly safer than producing wrong buckets — silent miscount is + worse than missing data. + + Heuristics in priority order: + 1. ``params["entity_class"]`` — explicit caller override. + 2. ``params["target"]`` / ``params["entity"]`` / ``params["object"]`` — + the conventional param names entity-binding tools use. + + Returns ``None`` when neither (1) nor (2) is present, including + for tools whose name suggests an entity binding but didn't pass + one through params (``infant_humanoid_pick_up`` with no target → + None). The field is best-effort metadata. + + TODO (1.1): replace this opt-in heuristic with a declared + ``Tool.entity_class: str | None`` field on the Tool ABC, so tool + authors can opt their tools into Roy-3 attribution explicitly + without participating in this derivation logic at all. Tracks + the same surface as ``feedback_two_identity_schemes.md`` — the + substrate already uses tool-name AND EC-cluster identity for one + concept; declared ``entity_class`` would be a third explicit + handle that tooling can rely on. + """ + if not isinstance(params, dict): + return None + # 1. Explicit caller override. + explicit = params.get("entity_class") + if isinstance(explicit, str) and explicit: + return explicit + # 2. Conventional param names. + for key in ("target", "entity", "object"): + val = params.get(key) + if isinstance(val, str) and val: + return val + # No verb-strip path: pre-merge review showed it produced noise + # on non-entity tools that Roy-3 normalization would silently + # mis-attribute. Future work tracked in the docstring TODO. + return None class InstrumentedExecutor: @@ -33,6 +99,16 @@ def __init__(self, executor: Any, sink: ActionSink) -> None: self._executor = executor self._sink = sink + def _telemetry_fields(self, tool_name: str, params: dict[str, Any]) -> dict[str, Any]: + """Pull Stage 0b telemetry (agent_id, session_id, entity_class) + off the bound RequestContext + tool action.""" + ctx = current_context() + return { + "agent_id": ctx.agent_id if ctx is not None else None, + "session_id": ctx.session_id if ctx is not None else None, + "entity_class": _derive_entity_class(tool_name, params), + } + def execute(self, action: dict[str, Any]) -> ToolOutput: """Execute a tool action and record the result.""" tool_name = action.get("tool_name", "unknown") @@ -54,6 +130,7 @@ def execute(self, action: dict[str, Any]) -> ToolOutput: result_error=result.error, blocked=is_blocked, block_reason=result.error if is_blocked else None, + **self._telemetry_fields(tool_name, params), ) ) @@ -61,13 +138,15 @@ def execute(self, action: dict[str, Any]) -> ToolOutput: def record_block(self, tool_name: str, reason: str, params: dict[str, Any] | None = None) -> None: """Record that an action was blocked (e.g., by FearAgent or autonomy).""" + params = params or {} self._sink.record( ActionRecord( timestamp=time.time(), tool_name=tool_name, - tool_args=params or {}, + tool_args=params, blocked=True, block_reason=reason, + **self._telemetry_fields(tool_name, params), ) ) diff --git a/src/maxim/simulation/orchestrator.py b/src/maxim/simulation/orchestrator.py index 91a4eff9..b8078882 100644 --- a/src/maxim/simulation/orchestrator.py +++ b/src/maxim/simulation/orchestrator.py @@ -1494,32 +1494,57 @@ def _get_component_integrity(name: str) -> float: aut_error: list[Exception] = [] def _aut_worker() -> None: + # Stage 0b (release_0_9_1.md): bind RequestContext on the AUT + # thread via context_scope() so InstrumentedExecutor.execute(), + # recommend_action's sim_recommend_action emitter, and any + # other downstream code reading utils/http.py::current_context() + # see the right agent_id + session_id pair. ContextVars are + # per-thread; the main-thread binding doesn't reach here + # without copy_context. context_scope is a context manager — + # its __exit__ resets the binding even on exception, which is + # what the pre-merge review (architecture lens I5) recommended + # over manual set_context/reset_context in try/finally so + # future sim entry points cannot forget the reset. + # Bound BEFORE sim_agent_context so the typed RequestContext + # and the sim_logger contextvar agree on agent identity. + # + # NOTE: this binding is correct for the current sim topology + # (one AUT, one orch, no AgentFactory sub-agents in the sim + # path). If NPCs spawned via AgentFactory start producing + # ActionRecords in this orchestrator session, every record + # will carry agent_id="sim_aut" instead of the NPC's per-agent + # stash id. Bio-lens nice-to-have: per-spawn context_scope + # inside the NPC's tool-dispatch boundary would be the fix + # when that surface ships. + from maxim.utils.http import context_scope, new_request_context + try: - with sim_agent_context("sim_aut"): - run_agentic_loop( - aut_agent, - aut_env, - aut_state, - aut_memory, - aut_decision_engine, - aut_executor, - autonomy_controller=aut_autonomy, - llm_worker=aut_llm_worker, - default_network=aut_default_network, - hippocampus=aut_hippocampus, - memory_hub=aut_memory_hub, - max_steps=0, # unlimited — AUT stops when bridge.finish() is called - stop_event=stop_event, - target_hz=2.0, - percept_source=bridge.percept_source, - action_sink=bridge.action_sink, - pain_bus=aut_pain_bus, - imagination_trigger=aut_imagination_trigger, - bio_enrichment_pipeline=aut_bio_enrichment_pipeline, - thought_gate=_aut_thought_gate, - aut_mode=aut_mode, - substrate_telemetry=aut_substrate_telemetry, - ) + with context_scope(new_request_context(agent_id="sim_aut", session_id=session_id)): + with sim_agent_context("sim_aut"): + run_agentic_loop( + aut_agent, + aut_env, + aut_state, + aut_memory, + aut_decision_engine, + aut_executor, + autonomy_controller=aut_autonomy, + llm_worker=aut_llm_worker, + default_network=aut_default_network, + hippocampus=aut_hippocampus, + memory_hub=aut_memory_hub, + max_steps=0, # unlimited — AUT stops when bridge.finish() is called + stop_event=stop_event, + target_hz=2.0, + percept_source=bridge.percept_source, + action_sink=bridge.action_sink, + pain_bus=aut_pain_bus, + imagination_trigger=aut_imagination_trigger, + bio_enrichment_pipeline=aut_bio_enrichment_pipeline, + thought_gate=_aut_thought_gate, + aut_mode=aut_mode, + substrate_telemetry=aut_substrate_telemetry, + ) except Exception as e: aut_error.append(e) logger.error("AUT loop failed: %s", e) @@ -2371,27 +2396,39 @@ def _orch_action_count() -> int: # so there's only one spinner managing the terminal line. bridge._spinner.start("Orchestrator planning first probe...") + # Stage 0b: bind RequestContext on the orchestrator thread so + # orch-side action records (rare — most actions land on the AUT + # sink, but orchestrator tools still execute) carry agent_id + + # session_id. Symmetric with the AUT thread binding above. Runs + # on the main thread; context_scope's __exit__ resets the bind + # on normal return AND on exception, so the bind is scoped to + # exactly the run_agentic_loop window. Per pre-merge review + # architecture lens I5, this replaces a manual set_context / + # reset_context try/finally with the canonical helper. + from maxim.utils.http import context_scope, new_request_context + try: - with sim_agent_context("sim_orchestrator"): - run_agentic_loop( - orch_agent, - orch_env, - orch_state, - orch_memory, - orch_decision_engine, - orch_executor, - autonomy_controller=orch_autonomy, - llm_worker=orch_llm_worker, - # NOTE: orchestrator hippocampus disabled for now — it captures - # every tool call as an episodic memory, which is noisy. - # Re-enable when cross-session learning (Phase 3) is tuned. - # hippocampus=orch_hippocampus, - # memory_hub=orch_memory_hub, - max_steps=0, # unlimited — stops via FinishSimulationTool or /cancel - stop_event=stop_event, - target_hz=2.0, - percept_source=orchestrator_source, - ) + with context_scope(new_request_context(agent_id="sim_orchestrator", session_id=session_id)): + with sim_agent_context("sim_orchestrator"): + run_agentic_loop( + orch_agent, + orch_env, + orch_state, + orch_memory, + orch_decision_engine, + orch_executor, + autonomy_controller=orch_autonomy, + llm_worker=orch_llm_worker, + # NOTE: orchestrator hippocampus disabled for now — it captures + # every tool call as an episodic memory, which is noisy. + # Re-enable when cross-session learning (Phase 3) is tuned. + # hippocampus=orch_hippocampus, + # memory_hub=orch_memory_hub, + max_steps=0, # unlimited — stops via FinishSimulationTool or /cancel + stop_event=stop_event, + target_hz=2.0, + percept_source=orchestrator_source, + ) except KeyboardInterrupt: display_summary(["Simulation stopped by user"]) except Exception as e: diff --git a/src/maxim/simulation/report.py b/src/maxim/simulation/report.py index 0ebede77..b16bc3fb 100644 --- a/src/maxim/simulation/report.py +++ b/src/maxim/simulation/report.py @@ -350,14 +350,53 @@ def save_report(report: SimulationReport, base_dir: str | None = None) -> Path: return report_path +_ACTIONS_JSONL_FORMAT_VERSION = "1.1" +"""actions.jsonl ``_format_version``. Per release_0_9_1.md Stage 0b +("Cross-cutting: persistence schema"), this file ships at "1.1" — the +addition of the header line + ``agent_id`` / ``session_id`` / +``entity_class`` per-record fields is a minor bump from the pre-0b +unversioned ("0.x" per CC1) format. A future change that requires +readers to handle a removed field is a major bump.""" + + def save_action_log(bridge: Any, base_dir: str, session_id: str) -> Path | None: - """Save all action records as JSONL for post-hoc analysis.""" + """Save all action records as JSONL for post-hoc analysis. + + **Reader contract (Stage 0b):** the first line is a header record + carrying ``_format_version`` per CLAUDE.md CC1. Consumers MUST + skip any line where ``_record_kind == "header"`` before + interpreting per-action fields. Roy analyzers shipped in 0.9.1+ + already follow this rule; third-party tooling that iterated the + file assuming every line is a record needs a one-line filter + update. + + Per-action records carry Stage 0b telemetry fields (``agent_id``, + ``session_id``, ``entity_class``) populated by + ``InstrumentedExecutor`` from the bound ``RequestContext`` — + ``None`` when the context was unbound at execution time (e.g., + pre-0b sims, headless API runs) or when ``entity_class`` couldn't + be derived from tool params. + + Format-version evolution rule: appending optional fields to the + per-action record is back-compat (existing parsers ignore unknown + keys); removing or renaming fields requires a major bump. + """ session_dir = Path(base_dir) / session_id session_dir.mkdir(parents=True, exist_ok=True) log_path = session_dir / "actions.jsonl" try: with open(str(log_path), "w", encoding="utf-8") as f: + # Stage 0b: format-version header (one-line schema marker + # at the top of the JSONL). Consumers MUST skip lines + # where _record_kind=="header"; the docstring contract + # above is the single source of truth. + header = { + "_format_version": _ACTIONS_JSONL_FORMAT_VERSION, + "_record_kind": "header", + "session_id": session_id, + } + f.write(json.dumps(header) + "\n") for a in bridge.get_all_actions(): entry = { "timestamp": a.timestamp, @@ -368,6 +407,12 @@ def save_action_log(bridge: Any, base_dir: str, session_id: str) -> Path | None: "error": a.result_error, "blocked": a.blocked, "block_reason": a.block_reason, + # Stage 0b telemetry — None when RequestContext was unbound + # (pre-0b sims, headless API) or entity_class couldn't be + # derived from tool_args. + "agent_id": a.agent_id, + "session_id": a.session_id, + "entity_class": a.entity_class, } f.write(json.dumps(entry, default=str) + "\n") logger.info("Action log saved: %s (%d records)", log_path, len(bridge.get_all_actions())) diff --git a/src/maxim/simulation/sim_logger.py b/src/maxim/simulation/sim_logger.py index 580d5ed7..594301e0 100644 --- a/src/maxim/simulation/sim_logger.py +++ b/src/maxim/simulation/sim_logger.py @@ -934,12 +934,35 @@ def sim_fear(tool: str, allowed: bool, reason: str = "", *, agent_id: str | None sim_log("BLOCKED", f"🚫 BLOCKED: {tool} — {reason}", agent_id=agent_id) -def sim_action(tool: str, success: bool, summary: str = "", *, agent_id: str | None = None) -> None: - """Log a tool execution.""" +def sim_action( + tool: str, + success: bool, + summary: str = "", + *, + agent_id: str | None = None, + entity_class: str | None = None, + **kwargs: Any, +) -> None: + """Log a tool execution. + + Stage 0b (release_0_9_1.md): accepts ``entity_class`` for + exposure-count normalization in Roy-3 analysis. Falls into the + structured ``data`` dict alongside any other kwargs. None → field + omitted from the persisted record. The plain ``sim_action(tool, + success)`` call shape from earlier callers continues to work + unchanged (entity_class defaults to None and is dropped before + the dict is passed to sim_log). + """ icon = "⚔️" if success else "❌" status = "OK" if success else "FAIL" + data: dict[str, Any] = dict(kwargs) + if entity_class is not None: + data["entity_class"] = entity_class sim_log( - "MOTOR", f"{icon} [{status}] {tool}: {summary}" if summary else f"{icon} [{status}] {tool}", agent_id=agent_id + "MOTOR", + f"{icon} [{status}] {tool}: {summary}" if summary else f"{icon} [{status}] {tool}", + data if data else None, + agent_id=agent_id, ) diff --git a/src/maxim/simulation/sinks.py b/src/maxim/simulation/sinks.py index d9d000db..542b8297 100644 --- a/src/maxim/simulation/sinks.py +++ b/src/maxim/simulation/sinks.py @@ -38,7 +38,23 @@ @dataclass(frozen=True) class ActionRecord: - """Captured output action from the agent pipeline.""" + """Captured output action from the agent pipeline. + + SHAPE-FROZEN at 1.0 (CC3) — appending optional fields at the end + with sensible defaults is the only allowed evolution. Required + fields, type changes, and field reorderings are major-version + bumps. The fields are observability-focused; no isolation-hygiene + rule applies, so an ``extra`` escape hatch is unnecessary — + add purpose-specific fields here as new telemetry needs surface. + + Stage 0b additions (release_0_9_1.md): ``agent_id`` / + ``session_id`` thread through from the ``RequestContext`` ContextVar + bound at the sim orchestrator entry; ``entity_class`` carries the + target entity classification (food, weapon, body-part, etc.) for + pain-aversion exposure normalization in Roy-3 analysis. All three + are optional ``None`` defaults so existing test fixtures and + third-party ``ActionSink`` implementations keep working. + """ timestamp: float tool_name: str @@ -48,6 +64,13 @@ class ActionRecord: result_error: str | None = None blocked: bool = False block_reason: str | None = None + # Stage 0b (release_0_9_1.md): per-record agent + session attribution. + # Populated by InstrumentedExecutor from utils/http.py::current_context(). + agent_id: str | None = None + session_id: str | None = None + # Stage 0b: entity classification for exposure-count normalization. + # Best-effort — derived from tool_args target where present. + entity_class: str | None = None @runtime_checkable @@ -105,6 +128,13 @@ def _compress_oldest(self) -> None: result_error=rec.result_error[:100] if rec.result_error else None, blocked=rec.blocked, block_reason=rec.block_reason, + # Stage 0b telemetry fields are kept through compression — + # they're tiny and the whole point of 0b is post-hoc + # attribution analysis across the full action stream, + # which would break if compressed records dropped them. + agent_id=rec.agent_id, + session_id=rec.session_id, + entity_class=rec.entity_class, ) ) self._actions = compressed + self._actions[half:] diff --git a/tests/unit/test_stage_0b_0c_telemetry.py b/tests/unit/test_stage_0b_0c_telemetry.py new file mode 100644 index 00000000..4ce4ece8 --- /dev/null +++ b/tests/unit/test_stage_0b_0c_telemetry.py @@ -0,0 +1,699 @@ +"""Tests for Stages 0b + 0c of release_0_9_1.md (telemetry instrumentation). + +Stage 0b layers: +- ``ActionRecord`` gains optional ``agent_id`` / ``session_id`` / + ``entity_class`` fields (CC3-additive on the frozen dataclass). +- ``InstrumentedExecutor`` populates the fields from + ``utils/http.py::current_context()`` + tool params. +- ``RecordingSink._compress_oldest`` preserves the new fields. +- ``save_action_log`` writes the new fields + a ``_format_version`` + header at the JSONL head. +- ``sim_action`` accepts an ``entity_class`` kwarg routed through + ``sim_log``'s data dict. + +Stage 0c layer: +- ``NAc.recommend_action`` emits a ``sim_log("NAc_RECOMMEND", ...)`` + event on every call (no-scores, sub-threshold, success — three + early-exit paths), with the fields per the plan. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from maxim.decisions.nac import NAc, NACConfig +from maxim.simulation.instrumented_executor import InstrumentedExecutor, _derive_entity_class +from maxim.simulation.sinks import ActionRecord, RecordingSink +from maxim.tools.base import ToolOutput +from maxim.utils.http import new_request_context, reset_context, set_context + + +# ───────────────────────────────────────────────────────────────────── +# Layer 1: ActionRecord field additions (CC3-additive) +# ───────────────────────────────────────────────────────────────────── + + +class TestActionRecordFields: + """The new optional fields ship with ``None`` defaults so existing + callers and third-party ActionSink consumers keep working without + modification. The CC3 audit rule for shape-frozen dataclasses is: + optional fields appended at the end with sensible defaults are + non-breaking; renames or required-field additions are major bumps.""" + + def test_record_constructs_without_new_fields(self) -> None: + """Back-compat shape: pre-0b callers construct ActionRecord + with only the original 8 fields and the new 3 default to None.""" + rec = ActionRecord( + timestamp=1.0, + tool_name="respond", + ) + assert rec.agent_id is None + assert rec.session_id is None + assert rec.entity_class is None + + def test_record_carries_new_fields(self) -> None: + rec = ActionRecord( + timestamp=1.0, + tool_name="sense_food_source", + agent_id="sim_aut", + session_id="20260515_120000", + entity_class="food", + ) + assert rec.agent_id == "sim_aut" + assert rec.session_id == "20260515_120000" + assert rec.entity_class == "food" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 2: entity_class derivation heuristic +# ───────────────────────────────────────────────────────────────────── + + +class TestEntityClassDerivation: + """Best-effort heuristic — Roy-3 analysis aggregates with ``None`` + skipped from exposure-count normalization, so the heuristic only + needs to produce sensible classes when the tool is entity-bound. + Verb-only tools (``respond``, ``examine``) should return None.""" + + def test_explicit_param_wins(self) -> None: + assert _derive_entity_class("any_tool", {"entity_class": "food"}) == "food" + + def test_target_param_fallback(self) -> None: + assert _derive_entity_class("any_tool", {"target": "weapon"}) == "weapon" + + def test_entity_param_fallback(self) -> None: + assert _derive_entity_class("any_tool", {"entity": "infant_humanoid"}) == "infant_humanoid" + + def test_object_param_fallback(self) -> None: + assert _derive_entity_class("any_tool", {"object": "fire"}) == "fire" + + def test_param_priority_explicit_over_target(self) -> None: + """params["entity_class"] beats params["target"] when both present.""" + assert _derive_entity_class("any_tool", {"entity_class": "food", "target": "drink"}) == "food" + + def test_tool_name_alone_does_not_derive(self) -> None: + """Pre-merge review fold: the verb-strip heuristic was dropped + as too noisy. Tools whose name suggests an entity binding + (``sense_food_source``, ``infant_humanoid_pick_up``) but don't + pass an entity param through the call now return None. + + Roy-3 normalization explicitly skips None, so being conservative + is strictly safer than producing wrong buckets — silent miscount + is worse than missing data. The future fix (1.1 TODO in the + derivation docstring) declares ``Tool.entity_class`` on the + Tool ABC so authors opt in explicitly.""" + assert _derive_entity_class("sense_food_source", {}) is None + assert _derive_entity_class("infant_humanoid_pick_up", {}) is None + assert _derive_entity_class("use_weapon", {}) is None + + def test_verb_only_tools_return_none(self) -> None: + """``respond`` / ``examine`` / no underscore → None (not entity-bound).""" + assert _derive_entity_class("respond", {}) is None + assert _derive_entity_class("examine", {}) is None + assert _derive_entity_class("examine", {"target": ""}) is None + + def test_non_entity_tools_with_underscores_return_none(self) -> None: + """Architecture lens I3 regression guard. The pre-fold verb-strip + heuristic produced noise on these tools — Roy-3 would have + attributed pain events to ``"status"`` or ``"entity_sensor"`` + as if they were real entity classes. With the heuristic dropped, + these all return None.""" + assert _derive_entity_class("get_status", {}) is None + assert _derive_entity_class("set_entity_sensor", {}) is None + assert _derive_entity_class("do_something_clever", {}) is None + assert _derive_entity_class("make_recommendation", {}) is None + assert _derive_entity_class("look_around", {}) is None + + def test_non_dict_params_returns_none(self) -> None: + """Defensive — params might be None or something weird in some paths.""" + assert _derive_entity_class("any_tool", None) is None # type: ignore[arg-type] + + def test_entity_param_with_underscore_tool_name_wins(self) -> None: + """Even when the tool name has underscores, an explicit + entity-class param wins. This is the supported path tool + authors use to opt into Roy-3 attribution today (until 1.1 + ships the ``Tool.entity_class`` declared field).""" + # Old verb-strip would have stripped "sense" → "food" too, + # but the param is the authoritative source either way. + assert _derive_entity_class("sense_food_source", {"target": "apple"}) == "apple" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 3: InstrumentedExecutor reads RequestContext +# ───────────────────────────────────────────────────────────────────── + + +class _StubExecutor: + """Minimal stand-in for the real Executor.""" + + def execute(self, action: dict[str, Any]) -> ToolOutput: + return ToolOutput(success=True, output={"ok": True}) + + +class TestInstrumentedExecutorTelemetry: + def test_record_populated_from_request_context(self) -> None: + sink = RecordingSink() + executor = InstrumentedExecutor(_StubExecutor(), sink) + ctx = new_request_context(agent_id="sim_aut", session_id="20260515_120000") + token = set_context(ctx) + try: + # Tool author opts into Roy-3 attribution via the + # explicit ``entity_class`` param (the only path the + # post-fold heuristic accepts). + executor.execute( + { + "tool_name": "sense_food_source", + "params": {"entity_class": "food"}, + } + ) + finally: + reset_context(token) + rec = sink.actions[-1] + assert rec.agent_id == "sim_aut" + assert rec.session_id == "20260515_120000" + assert rec.entity_class == "food" + + def test_no_context_bound_yields_none_fields(self) -> None: + """Unit tests / non-sim runtime paths don't bind RequestContext. + InstrumentedExecutor must not raise — record gets None fields.""" + sink = RecordingSink() + executor = InstrumentedExecutor(_StubExecutor(), sink) + # No set_context call — current_context() returns None. + executor.execute({"tool_name": "respond", "params": {}}) + rec = sink.actions[-1] + assert rec.agent_id is None + assert rec.session_id is None + assert rec.entity_class is None # respond is verb-only + + def test_record_block_populates_telemetry(self) -> None: + """Blocked actions also carry telemetry — useful for the + normalization analysis (blocked-but-attempted is meaningful).""" + sink = RecordingSink() + executor = InstrumentedExecutor(_StubExecutor(), sink) + ctx = new_request_context(agent_id="sim_aut", session_id="sid") + token = set_context(ctx) + try: + executor.record_block("infant_humanoid_pick_up", reason="too_heavy") + finally: + reset_context(token) + rec = sink.actions[-1] + assert rec.blocked is True + assert rec.block_reason == "too_heavy" + assert rec.agent_id == "sim_aut" + assert rec.session_id == "sid" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 4: RecordingSink compression preserves new fields +# ───────────────────────────────────────────────────────────────────── + + +class TestCompressionPreservesTelemetry: + """Per the plan: actions.jsonl is meant for POST-HOC analysis. If + compression dropped agent_id/session_id/entity_class, normalization + counts for long-running sims would silently skew (compressed half + looks anonymous; uncompressed half attributes correctly). Pin + the preservation contract.""" + + def test_compression_keeps_new_fields(self) -> None: + """``_compress_oldest`` strips heavy fields (tool_args, + result_output) but keeps a lightweight summary. The telemetry + fields are tiny and MUST be preserved so post-hoc attribution + analysis doesn't get an anonymous half of the record stream.""" + sink = RecordingSink(max_actions=4) + for i in range(6): + sink.record( + ActionRecord( + timestamp=float(i), + tool_name=f"tool_{i}", + tool_args={"large": "args"}, # heavy field that compression drops + result_output={"big": "output"}, # ditto + agent_id=f"agent_{i}", + session_id="shared_session", + entity_class=f"class_{i}", + ) + ) + # Compression fires when len > max — verified by checking + # the oldest half had heavy fields stripped. + compressed_count = sum(1 for r in sink.actions if r.tool_args == {}) + assert compressed_count > 0, "compression should have stripped at least some records" + # Telemetry fields survive compression in every record + # (compressed AND uncompressed). + for rec in sink.actions: + assert rec.agent_id is not None + assert rec.session_id == "shared_session" + assert rec.entity_class is not None + + +# ───────────────────────────────────────────────────────────────────── +# Layer 5: save_action_log writes new fields + format-version header +# ───────────────────────────────────────────────────────────────────── + + +class TestSaveActionLog: + """Per the plan: actions.jsonl now carries a ``_format_version`` + header line + telemetry fields per record. The header pattern is + CC1 contract — every persisted JSONL Maxim writes carries + ``_format_version`` at the head so future parsers can branch on + schema evolution.""" + + def test_writes_format_version_header(self, tmp_path: Path) -> None: + from maxim.simulation.report import _ACTIONS_JSONL_FORMAT_VERSION, save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [ + ActionRecord(timestamp=1.0, tool_name="respond", agent_id="sim_aut", session_id="sid"), + ] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="sid") + assert log_path is not None + lines = log_path.read_text().splitlines() + header = json.loads(lines[0]) + # Plan release_0_9_1.md § "Cross-cutting: persistence schema" + # pins this at "1.1" — minor bump from pre-0b unversioned ("0.x"). + assert _ACTIONS_JSONL_FORMAT_VERSION == "1.1" + assert header["_format_version"] == "1.1" + assert header["_record_kind"] == "header" + assert header["session_id"] == "sid" + + def test_consumer_can_skip_header_line(self, tmp_path: Path) -> None: + """Architecture lens I4 regression guard. The header-line is a + schema change for actions.jsonl consumers — third-party tooling + iterating the file as "every line is a record" needs to skip + ``_record_kind == "header"``. The docstring contract is + load-bearing; this test pins it as a real reader pattern.""" + from maxim.simulation.report import save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [ + ActionRecord( + timestamp=1.0, + tool_name="sense_food_source", + agent_id="sim_aut", + session_id="sid", + entity_class="food", + ), + ActionRecord(timestamp=2.0, tool_name="respond", agent_id="sim_aut", session_id="sid"), + ] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="sid") + assert log_path is not None + # Simulate the documented reader pattern: + records = [] + with log_path.open() as f: + for line in f: + obj = json.loads(line) + if obj.get("_record_kind") == "header": + continue + records.append(obj) + assert len(records) == 2 + assert records[0]["tool"] == "sense_food_source" + assert records[1]["tool"] == "respond" + + def test_writes_telemetry_fields_per_record(self, tmp_path: Path) -> None: + from maxim.simulation.report import save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [ + ActionRecord( + timestamp=1.0, + tool_name="sense_food_source", + agent_id="sim_aut", + session_id="20260515_120000", + entity_class="food", + ), + ] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="20260515_120000") + assert log_path is not None + lines = log_path.read_text().splitlines() + # Skip the header line. + record = json.loads(lines[1]) + assert record["agent_id"] == "sim_aut" + assert record["session_id"] == "20260515_120000" + assert record["entity_class"] == "food" + + def test_header_appears_even_with_zero_records(self, tmp_path: Path) -> None: + """Empty action log still has the header — schema discovery + shouldn't depend on a record existing.""" + from maxim.simulation.report import save_action_log + + bridge = MagicMock() + bridge.get_all_actions.return_value = [] + log_path = save_action_log(bridge, base_dir=str(tmp_path), session_id="sid") + assert log_path is not None + lines = log_path.read_text().splitlines() + assert len(lines) == 1 + assert json.loads(lines[0])["_format_version"] == "1.1" + + +# ───────────────────────────────────────────────────────────────────── +# Layer 6: sim_action entity_class kwarg +# ───────────────────────────────────────────────────────────────────── + + +class TestSimActionEntityClass: + """sim_action grows an ``entity_class`` kwarg that routes through + sim_log's data dict for post-hoc Roy-3 analysis. Existing + callers using the positional shape continue to work.""" + + def test_legacy_call_shape_unchanged(self) -> None: + """The plain ``sim_action(tool, success)`` call must still + work — most existing callers don't pass entity_class yet.""" + from maxim.simulation.sim_logger import sim_action + + # Should not raise; nothing else to assert because sim_log + # silently no-ops when sim logging isn't enabled. + sim_action("respond", True) + + def test_entity_class_threaded_through(self, tmp_path: Path) -> None: + """When sim_action is called with entity_class, it shows up + in the JSONL record's data dict.""" + from maxim.simulation.sim_logger import ( + disable_sim_logging, + enable_sim_logging, + sim_action, + ) + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + sim_action("sense_food_source", True, summary="found", entity_class="food") + finally: + disable_sim_logging() + + records = [json.loads(line) for line in log_path.read_text().splitlines()] + # Find the MOTOR record. + motor_records = [r for r in records if r.get("subsystem") == "MOTOR"] + assert len(motor_records) >= 1 + assert motor_records[0]["data"]["entity_class"] == "food" + + def test_entity_class_none_omits_field(self, tmp_path: Path) -> None: + """When entity_class is None (the default), it should NOT + appear as a key in the data dict — cluttering Roy-3 records + with ``entity_class: null`` for every verb-only tool is noise.""" + from maxim.simulation.sim_logger import ( + disable_sim_logging, + enable_sim_logging, + sim_action, + ) + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + sim_action("respond", True, summary="hi") + finally: + disable_sim_logging() + + records = [json.loads(line) for line in log_path.read_text().splitlines()] + motor_records = [r for r in records if r.get("subsystem") == "MOTOR"] + assert len(motor_records) >= 1 + # The `data` field is either absent or doesn't contain entity_class. + data = motor_records[0].get("data") or {} + assert "entity_class" not in data + + +# ───────────────────────────────────────────────────────────────────── +# Layer 7: Stage 0c — sim_recommend_action emission +# ───────────────────────────────────────────────────────────────────── + + +class TestRecommendActionEmission: + """Per the plan: EVERY recommend_action call emits exactly one + sim_recommend_action event, including the early-return paths. + Roy-3 needs to distinguish "gate fired, consumer did nothing" + from "consumer didn't run at all".""" + + def _fresh_nac(self) -> NAc: + return NAc(config=NACConfig()) + + def _read_recommend_records(self, log_path: Path) -> list[dict[str, Any]]: + records = [json.loads(line) for line in log_path.read_text().splitlines()] + return [r for r in records if r.get("subsystem") == "NAc_RECOMMEND"] + + def test_emission_on_success_path(self, tmp_path: Path) -> None: + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # Seed cluster bias so a tool wins. + nac.update_cluster_reward( + agent_id="sim_aut", + cluster_id="cluster-1", + tool_signature="tool:sense_food_source", + reward=10.0, + ) + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["sense_food_source"], + current_cluster_id="cluster-1", + ) + finally: + disable_sim_logging() + + assert result is not None + assert result["tool_name"] == "sense_food_source" + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is True + assert data["best_tool"] == "sense_food_source" + assert data["best_score"] > 0.0 + assert data["current_cluster_id"] == "cluster-1" + assert data["cluster_reward_bias_consulted"] is not None + + def test_emission_on_no_scores_path(self, tmp_path: Path) -> None: + """When recommend_action returns None because no tool scored, + the event MUST still emit — passed_gate=False, best_tool=None.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # No bias seeded; no drives; no causal links → empty scores. + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["sense_food_source"], + ) + finally: + disable_sim_logging() + + assert result is None + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is False + assert data["best_tool"] is None + assert data["best_score"] == 0.0 + + def test_emission_on_sub_threshold_path(self, tmp_path: Path) -> None: + """When scores exist but best_score < min_confidence, the event + emits with passed_gate=False AND best_tool populated.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # Seed a tiny bias well under min_confidence (0.3). + nac.update_cluster_reward( + agent_id="sim_aut", + cluster_id="cluster-1", + tool_signature="tool:sense_food_source", + reward=0.5, # alpha=0.15 → +0.075, below the 0.3 default gate + ) + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["sense_food_source"], + current_cluster_id="cluster-1", + min_confidence=0.3, + ) + finally: + disable_sim_logging() + + assert result is None + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is False + assert data["best_tool"] == "sense_food_source" + assert 0.0 < data["best_score"] < 0.3 + + def test_fail_soft_when_sim_logging_disabled(self) -> None: + """Non-sim runtime path: sim_log is a no-op when sim logging + isn't enabled, so recommend_action returns normally without + raising or even leaving a partial state.""" + nac = self._fresh_nac() + nac.update_cluster_reward("sim_aut", "c1", "tool:foo", reward=10.0) + # sim logging NOT enabled here. + result = nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id="c1", + ) + assert result is not None + assert result["tool_name"] == "foo" + + def test_emission_on_empty_available_tools_path(self, tmp_path: Path) -> None: + """Architecture lens C3 regression guard. Pre-fold, + ``available_tools=[]`` short-circuited before emitting, + leaving Roy-3 unable to distinguish "no tools available" + from "no tools scored above gate." Post-fold, this path + emits with best_tool=None, best_score=0.0, passed_gate=False.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + result = nac.recommend_action(agent_id="sim_aut", available_tools=[]) + finally: + disable_sim_logging() + + assert result is None + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + data = recs[0]["data"] + assert data["passed_gate"] is False + assert data["best_tool"] is None + assert data["best_score"] == 0.0 + + def test_tick_aligned_with_sim_logger_start(self, tmp_path: Path) -> None: + """Architecture lens C1 regression guard. Pre-fold, tick was + ``int(time.time())`` (raw epoch ~1.7e9), while Stage 0d's + ``sim_ec_activation`` uses ``int(time.time() - _sim_start)`` + (elapsed seconds, 0..N). A 1e9 offset means Roy-3 left-joins + on tick return zero matches every time. Post-fold, both + channels emit comparable tick values from the same _sim_start + reference — a tick within ~1s of sim start is a small int, + not an epoch.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + nac.update_cluster_reward("sim_aut", "c1", "tool:foo", reward=10.0) + nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id="c1", + ) + finally: + disable_sim_logging() + + recs = self._read_recommend_records(log_path) + assert len(recs) == 1 + tick = recs[0]["data"]["tick"] + # Elapsed-seconds tick within a fresh sim should be tiny. + # If we accidentally regress to raw epoch, this would be ~1.7e9. + assert 0 <= tick < 60, f"tick={tick} looks like raw epoch (regression to int(time.time()))" + + def test_empty_scores_sentinel_distinguishes_cluster_known_vs_unknown(self, tmp_path: Path) -> None: + """Bio-fidelity I3 regression guard. On the empty-scores path: + - cluster_id known but no tool scored → 0.0 sentinel + - cluster_id absent → None + Roy-3 disambiguation depends on this — otherwise "agent had no + active cluster" and "agent had a cluster but no tools scored" + collapse into the same record.""" + from maxim.simulation.sim_logger import disable_sim_logging, enable_sim_logging + + log_path = tmp_path / "sim_log.jsonl" + enable_sim_logging(log_path=str(log_path)) + try: + nac = self._fresh_nac() + # Path A: cluster known, no tool scored (no bias seeded). + nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id="c1", + ) + # Path B: cluster unknown. + nac.recommend_action( + agent_id="sim_aut", + available_tools=["foo"], + current_cluster_id=None, + ) + finally: + disable_sim_logging() + + recs = self._read_recommend_records(log_path) + assert len(recs) == 2 + # Order matches call order. + path_a, path_b = recs[0]["data"], recs[1]["data"] + assert path_a["current_cluster_id"] == "c1" + assert path_a["cluster_reward_bias_consulted"] == 0.0 # known but no signal + assert path_b["current_cluster_id"] is None + assert path_b["cluster_reward_bias_consulted"] is None # truly absent + + +# ───────────────────────────────────────────────────────────────────── +# Layer 8: RequestContext binding regression guard +# ───────────────────────────────────────────────────────────────────── + + +class TestRequestContextBindingShape: + """The Stage 0b binding is done in _aut_worker (orchestrator.py) + via new_request_context + set_context + reset_context. The actual + sim-orchestrator binding happens at runtime inside a thread, so + we test the SHAPE here: a fresh context binds, current_context() + reads back, reset restores.""" + + def test_round_trip(self) -> None: + ctx = new_request_context(agent_id="sim_aut", session_id="20260515_120000") + token = set_context(ctx) + try: + from maxim.utils.http import current_context + + current = current_context() + assert current is not None + assert current.agent_id == "sim_aut" + assert current.session_id == "20260515_120000" + finally: + reset_context(token) + # After reset, the binding is gone (or restored to whatever + # was bound before — pytest fixtures generally start with None). + from maxim.utils.http import current_context + + after = current_context() + # Either None or pre-existing; the new binding is gone. + if after is not None: + assert after.agent_id != "sim_aut" or after.session_id != "20260515_120000" + + def test_reset_in_finally_handles_exception(self) -> None: + """If the sim worker raises, the reset_context call in finally + must still fire so the binding doesn't leak to the next test.""" + ctx = new_request_context(agent_id="sim_aut", session_id="sid") + token = set_context(ctx) + try: + try: + raise RuntimeError("simulated worker failure") + except RuntimeError: + pass # The orchestrator catches this; here we just want + # to ensure reset is still reachable. + finally: + reset_context(token) + from maxim.utils.http import current_context + + # Either None or the prior binding — not the failed binding. + after = current_context() + if after is not None: + assert after.agent_id != "sim_aut" or after.session_id != "sid" + + +@pytest.fixture(autouse=True) +def _reset_sim_logger_state() -> Any: + """sim_logger has module-level state (the _sim_active flag, the + _log_file handle, the _log_records deque). Make sure each test + starts with sim logging DISABLED, even if a prior test forgot to + call disable_sim_logging().""" + from maxim.simulation.sim_logger import disable_sim_logging + + disable_sim_logging() + yield + disable_sim_logging()