|
1 | | -"""Drain-state routing bridge for the LLM router (Plan 4 Stages C4 + C4.5). |
| 1 | +"""Drain-state routing bridge for the LLM router (Plan 4 Stages C4 + C4.5 + C4.6). |
2 | 2 |
|
3 | 3 | Bridges the gap between the mesh drain-state layer (``peer/drain_state.py``, |
4 | 4 | which operates on node names) and the LLM router (``models/language/router.py``, |
5 | 5 | which operates on provider keys). |
6 | 6 |
|
7 | | -Two components: |
| 7 | +Three components: |
8 | 8 |
|
9 | 9 | - :class:`DrainConstraint` (C4, read-only): injected as ``drain_constraint`` |
10 | 10 | callback. Checks whether a provider maps to a drained mesh node. |
11 | 11 | - :class:`AutoDrainWriter` (C4.5, write): injected as ``auto_drain_callback``. |
12 | 12 | Writes auto-drain entries when a provider crosses the failure threshold. |
13 | | - Entries are tagged ``# auto:<timestamp> reason:<type>`` so C4.6's |
14 | | - auto-undrain can distinguish them from sticky operator drains. |
| 13 | + Entries are tagged ``# auto:<timestamp> reason:<type>`` so the prober can |
| 14 | + distinguish them from sticky operator drains. |
| 15 | +- :class:`AutoUndrainProber` (C4.6, background): daemon thread that |
| 16 | + periodically probes auto-drained nodes and clears their drain entries |
| 17 | + when healthy. **NEVER touches operator drains** (entries without |
| 18 | + ``# auto:`` tag are sticky by design). |
15 | 19 |
|
16 | 20 | The URL lookup table is built once at construction from ``mesh.yml`` topology. |
17 | 21 | Topology changes require a router restart to take effect — consistent with the |
|
20 | 24 |
|
21 | 25 | from __future__ import annotations |
22 | 26 |
|
| 27 | +import atexit |
23 | 28 | import logging |
24 | 29 | import os |
| 30 | +import threading |
25 | 31 | import time |
26 | 32 | from pathlib import Path |
27 | 33 | from typing import Any |
@@ -404,3 +410,276 @@ def build_auto_drain_writer( |
404 | 410 | if drain_path is None: |
405 | 411 | drain_path = constraint._drain_path |
406 | 412 | return AutoDrainWriter(dict(constraint._provider_to_node), drain_path) |
| 413 | + |
| 414 | + |
| 415 | +# ─── auto-undrain probe interval (C4.6) ────────────────────────────────── |
| 416 | + |
| 417 | +_DEFAULT_PROBE_INTERVAL_S = 90.0 |
| 418 | +_MIN_PROBE_INTERVAL_S = 30.0 |
| 419 | +_MAX_PROBE_INTERVAL_S = 600.0 |
| 420 | + |
| 421 | + |
| 422 | +def _probe_interval() -> float: |
| 423 | + """Read ``MAXIM_AUTO_UNDRAIN_PROBE_INTERVAL_S``, clamped [30, 600].""" |
| 424 | + raw = os.environ.get("MAXIM_AUTO_UNDRAIN_PROBE_INTERVAL_S", "").strip() |
| 425 | + if not raw: |
| 426 | + return _DEFAULT_PROBE_INTERVAL_S |
| 427 | + try: |
| 428 | + val = float(raw) |
| 429 | + except (ValueError, TypeError): |
| 430 | + return _DEFAULT_PROBE_INTERVAL_S |
| 431 | + return max(_MIN_PROBE_INTERVAL_S, min(_MAX_PROBE_INTERVAL_S, val)) |
| 432 | + |
| 433 | + |
| 434 | +# ─── AutoUndrainProber (C4.6) ──────────────────────────────────────────── |
| 435 | + |
| 436 | + |
| 437 | +class AutoUndrainProber: |
| 438 | + """Background daemon that probes auto-drained nodes and restores them. |
| 439 | +
|
| 440 | + Plan 4 C4.6. Completes the self-healing loop started by |
| 441 | + :class:`AutoDrainWriter` (C4.5): the writer auto-drains nodes after |
| 442 | + persistent failure, and this prober auto-undrains them when a health |
| 443 | + check passes. |
| 444 | +
|
| 445 | + **Critical invariant:** NEVER clears operator drains. Only entries |
| 446 | + tagged with ``# auto:`` in the drain state file are candidates. |
| 447 | + Entries without a tag (operator drains, C3.3 install drains) are |
| 448 | + sticky and untouched. |
| 449 | +
|
| 450 | + Lifecycle: |
| 451 | +
|
| 452 | + - Constructed by :func:`build_auto_undrain_prober` at router |
| 453 | + construction time in ``lane_backends.py``. |
| 454 | + - ``start()`` spawns a daemon thread that runs until the process |
| 455 | + exits (``atexit`` handler calls ``stop()``). |
| 456 | + - The thread sleeps for ``interval_s`` between probe cycles. |
| 457 | + - Each cycle reads the drain file, filters for ``# auto:`` entries, |
| 458 | + probes each via ``_MaximPeerBackend.for_url(...).health_check()``, |
| 459 | + and clears successful entries under filelock. |
| 460 | + """ |
| 461 | + |
| 462 | + def __init__( |
| 463 | + self, |
| 464 | + node_to_url: dict[str, str], |
| 465 | + cluster_key: str, |
| 466 | + drain_path: Path, |
| 467 | + *, |
| 468 | + interval_s: float | None = None, |
| 469 | + ) -> None: |
| 470 | + self._node_to_url = node_to_url |
| 471 | + self._cluster_key = cluster_key |
| 472 | + self._drain_path = drain_path |
| 473 | + self._interval_s = interval_s if interval_s is not None else _probe_interval() |
| 474 | + self._stop_event = threading.Event() |
| 475 | + self._thread: threading.Thread | None = None |
| 476 | + self._atexit_registered = False |
| 477 | + |
| 478 | + def start(self) -> None: |
| 479 | + """Start the background probe thread. |
| 480 | +
|
| 481 | + Idempotent — calling ``start()`` when already running is a no-op. |
| 482 | + Review fold: ``atexit.register`` guarded by ``_atexit_registered`` |
| 483 | + so start/stop/restart cycles don't accumulate handlers. |
| 484 | + """ |
| 485 | + if self._thread is not None and self._thread.is_alive(): |
| 486 | + return |
| 487 | + self._stop_event.clear() |
| 488 | + self._thread = threading.Thread( |
| 489 | + target=self._run_loop, |
| 490 | + name="auto-undrain-prober", |
| 491 | + daemon=True, |
| 492 | + ) |
| 493 | + self._thread.start() |
| 494 | + if not self._atexit_registered: |
| 495 | + atexit.register(self.stop) |
| 496 | + self._atexit_registered = True |
| 497 | + logger.debug("auto-undrain prober started (interval=%.0fs)", self._interval_s) |
| 498 | + |
| 499 | + def stop(self) -> None: |
| 500 | + """Signal the probe thread to stop and wait for it to exit.""" |
| 501 | + self._stop_event.set() |
| 502 | + if self._thread is not None and self._thread.is_alive(): |
| 503 | + self._thread.join(timeout=5.0) |
| 504 | + self._thread = None |
| 505 | + |
| 506 | + def _run_loop(self) -> None: |
| 507 | + """Main loop: sleep → probe auto-drained nodes → repeat.""" |
| 508 | + while not self._stop_event.is_set(): |
| 509 | + # Sleep first so the prober doesn't fire immediately at startup |
| 510 | + # (the node was just auto-drained, give it time to recover). |
| 511 | + if self._stop_event.wait(timeout=self._interval_s): |
| 512 | + break # stop requested during sleep |
| 513 | + try: |
| 514 | + self._probe_cycle() |
| 515 | + except Exception: |
| 516 | + # Review fold: WARNING not DEBUG — a persistent cycle |
| 517 | + # failure silently kills the self-healing loop and |
| 518 | + # operators need to see it. |
| 519 | + logger.warning("auto-undrain probe cycle failed", exc_info=True) |
| 520 | + |
| 521 | + def _probe_cycle(self) -> None: |
| 522 | + """One probe cycle: read auto-drained nodes, probe each, clear healthy ones.""" |
| 523 | + entries = _load_tagged_entries(self._drain_path) |
| 524 | + auto_drained = {name: tag for name, tag in entries.items() if tag is not None and tag.startswith("auto:")} |
| 525 | + |
| 526 | + if not auto_drained: |
| 527 | + return # no auto-drained nodes — nothing to do |
| 528 | + |
| 529 | + for name in list(auto_drained): |
| 530 | + url = self._node_to_url.get(name) |
| 531 | + if url is None: |
| 532 | + # Node not in mesh.yml — orphan auto-drain entry. |
| 533 | + # Don't probe (no URL to probe), don't clear (operator |
| 534 | + # can clean up with `resume`). Log once per cycle. |
| 535 | + logger.debug( |
| 536 | + "auto-undrain: skipping orphan %r (not in mesh.yml)", |
| 537 | + name, |
| 538 | + ) |
| 539 | + continue |
| 540 | + |
| 541 | + if self._probe_node(name, url): |
| 542 | + self._clear_auto_drain(name) |
| 543 | + |
| 544 | + def _probe_node(self, name: str, url: str) -> bool: |
| 545 | + """Probe a single node. Returns True if healthy.""" |
| 546 | + try: |
| 547 | + from maxim.models.language.maxim_peer_backend import ( |
| 548 | + _MaximPeerBackend, |
| 549 | + ) |
| 550 | + except ImportError: |
| 551 | + logger.debug( |
| 552 | + "auto-undrain: backend import failed, skipping probe for %s", |
| 553 | + name, |
| 554 | + ) |
| 555 | + return False |
| 556 | + |
| 557 | + try: |
| 558 | + backend = _MaximPeerBackend.for_url(url, api_key=self._cluster_key) |
| 559 | + result = backend.health_check(enable_stage2=False) |
| 560 | + outcome = getattr(result, "outcome", "other") |
| 561 | + if outcome == "ok": |
| 562 | + logger.info( |
| 563 | + "auto-undrain: node %s is healthy, clearing auto-drain", |
| 564 | + name, |
| 565 | + ) |
| 566 | + return True |
| 567 | + logger.debug( |
| 568 | + "auto-undrain: node %s probe returned %s, staying drained", |
| 569 | + name, |
| 570 | + outcome, |
| 571 | + ) |
| 572 | + return False |
| 573 | + except Exception: |
| 574 | + logger.debug("auto-undrain: probe failed for %s", name, exc_info=True) |
| 575 | + return False |
| 576 | + |
| 577 | + def _clear_auto_drain(self, name: str) -> None: |
| 578 | + """Remove an auto-drain entry from the drain file under filelock. |
| 579 | +
|
| 580 | + Only removes the entry if it still has the ``# auto:`` tag at |
| 581 | + the time we hold the lock (TOCTOU safety: an operator may have |
| 582 | + manually resumed and re-drained the node between our probe and |
| 583 | + this write — if the new entry has no auto tag, it's an operator |
| 584 | + drain and we must not touch it). |
| 585 | + """ |
| 586 | + try: |
| 587 | + from filelock import FileLock |
| 588 | + |
| 589 | + from maxim.utils.atomic_io import atomic_write_text |
| 590 | + |
| 591 | + lock = FileLock(str(self._drain_path) + ".lock", timeout=10) |
| 592 | + with lock: |
| 593 | + # Re-read under lock. |
| 594 | + entries = _load_tagged_entries(self._drain_path) |
| 595 | + tag = entries.get(name) |
| 596 | + if tag is None or not tag.startswith("auto:"): |
| 597 | + # Not auto-drained anymore (operator resumed and |
| 598 | + # possibly re-drained, or already cleared by another |
| 599 | + # cycle). No-op. |
| 600 | + return |
| 601 | + |
| 602 | + # Rebuild the file without this entry. |
| 603 | + try: |
| 604 | + content = self._drain_path.read_text() |
| 605 | + except FileNotFoundError: |
| 606 | + return |
| 607 | + lines = content.splitlines(keepends=True) |
| 608 | + new_lines = [] |
| 609 | + for line in lines: |
| 610 | + # Match: the line's name portion (before #) matches. |
| 611 | + stripped_name = line.split("#", 1)[0].strip() |
| 612 | + if stripped_name == name: |
| 613 | + continue # drop this entry |
| 614 | + new_lines.append(line) |
| 615 | + atomic_write_text(self._drain_path, "".join(new_lines)) |
| 616 | + |
| 617 | + except Exception: |
| 618 | + logger.warning( |
| 619 | + "auto-undrain: failed to clear drain for %s", |
| 620 | + name, |
| 621 | + exc_info=True, |
| 622 | + ) |
| 623 | + return |
| 624 | + |
| 625 | + log_structured( |
| 626 | + logger, |
| 627 | + logging.INFO, |
| 628 | + event="auto_undrain", |
| 629 | + data={"node": name}, |
| 630 | + ) |
| 631 | + |
| 632 | + |
| 633 | +_active_prober: AutoUndrainProber | None = None |
| 634 | +"""Module-level singleton. Review fold (cross-confirmed BLOCKING): |
| 635 | +``_build_local_backend`` runs once per lane (large/medium/small), so |
| 636 | +without a singleton, three probers would spawn three threads all probing |
| 637 | +the same drain file. The singleton ensures exactly one prober per process. |
| 638 | +""" |
| 639 | + |
| 640 | + |
| 641 | +def build_auto_undrain_prober( |
| 642 | + mesh_cfg: Any, |
| 643 | + cluster_key: str, |
| 644 | + drain_path: Path | None = None, |
| 645 | + *, |
| 646 | + interval_s: float | None = None, |
| 647 | +) -> AutoUndrainProber: |
| 648 | + """Build or return the singleton :class:`AutoUndrainProber`. |
| 649 | +
|
| 650 | + Review fold: returns the existing prober if one is already alive, |
| 651 | + preventing duplicate threads when called from multiple lanes. |
| 652 | +
|
| 653 | + Parameters |
| 654 | + ---------- |
| 655 | + mesh_cfg |
| 656 | + A ``MeshConfig`` instance. Provides node names + URLs. |
| 657 | + cluster_key |
| 658 | + The cluster key for authenticating health-check probes. |
| 659 | + drain_path |
| 660 | + Override for the drain state file path. Defaults to |
| 661 | + ``drain_state.drain_state_path()`` (role-scoped). |
| 662 | + interval_s |
| 663 | + Override probe interval. Defaults to |
| 664 | + ``MAXIM_AUTO_UNDRAIN_PROBE_INTERVAL_S`` or 90s. |
| 665 | + """ |
| 666 | + global _active_prober # noqa: PLW0603 |
| 667 | + if _active_prober is not None and _active_prober._thread is not None and _active_prober._thread.is_alive(): |
| 668 | + return _active_prober |
| 669 | + |
| 670 | + node_to_url: dict[str, str] = {} |
| 671 | + for node in mesh_cfg.nodes: |
| 672 | + node_to_url[node.name] = node.url |
| 673 | + |
| 674 | + if drain_path is None: |
| 675 | + from maxim.peer.drain_state import drain_state_path |
| 676 | + |
| 677 | + drain_path = drain_state_path() |
| 678 | + |
| 679 | + _active_prober = AutoUndrainProber( |
| 680 | + node_to_url, |
| 681 | + cluster_key, |
| 682 | + drain_path, |
| 683 | + interval_s=interval_s, |
| 684 | + ) |
| 685 | + return _active_prober |
0 commit comments