Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions tests/test_taosctl_observatory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Tests for the taosctl observatory command group."""
from __future__ import annotations

import pytest

from tinyagentos.cli.taosctl import client as cli_client
from tinyagentos.cli.taosctl import __main__ as cli_main


class _FakeClient:
def __init__(self, *a, **k):
self.calls = []
self.base_url = "http://x"
self.token = "t"
self._raise = None

def get(self, path, params=None):
self.calls.append(("GET", path, params))
if self._raise:
raise self._raise
return {"global": False, "lanes": {}}

def post(self, path, body=None, params=None, json=None):
self.calls.append(("POST", path, body))
if self._raise:
raise self._raise
return {"ok": True}


def _run(monkeypatch, argv, fake):
monkeypatch.setattr(cli_main, "TaosClient", lambda **k: fake)
return cli_main.main(argv)


def test_fleet_calls_endpoint(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "fleet"], fake)
assert rc == 0
assert ("GET", "/api/observatory/fleet", None) in fake.calls


def test_pause_status_calls_endpoint(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "pause-status"], fake)
assert rc == 0
assert ("GET", "/api/observatory/pause", None) in fake.calls


def test_pause_global_by_default(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "pause"], fake)
assert rc == 0
assert ("POST", "/api/observatory/pause",
{"scope": "global", "paused": True}) in fake.calls


def test_pause_specific_lane(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "pause", "owl-lane-1"], fake)
assert rc == 0
assert ("POST", "/api/observatory/pause",
{"scope": "owl-lane-1", "paused": True}) in fake.calls


def test_resume_sends_paused_false(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "resume", "owl-lane-1"], fake)
assert rc == 0
assert ("POST", "/api/observatory/pause",
{"scope": "owl-lane-1", "paused": False}) in fake.calls


def test_throttle_status_calls_endpoint(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "throttle-status"], fake)
assert rc == 0
assert ("GET", "/api/observatory/throttle", None) in fake.calls


def test_throttle_set_max(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "throttle", "owl-lane-1", "--max", "3"], fake)
assert rc == 0
assert ("POST", "/api/observatory/throttle",
{"scope": "owl-lane-1", "max_concurrent": 3}) in fake.calls


def test_throttle_clear_global_sends_null(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "throttle", "--clear"], fake)
assert rc == 0
assert ("POST", "/api/observatory/throttle",
{"scope": "global", "max_concurrent": None}) in fake.calls


def test_throttle_clear_specific_lane_sends_null(monkeypatch):
fake = _FakeClient()
rc = _run(monkeypatch, ["observatory", "throttle", "owl-lane-1", "--clear"], fake)
assert rc == 0
assert ("POST", "/api/observatory/throttle",
{"scope": "owl-lane-1", "max_concurrent": None}) in fake.calls


def test_throttle_requires_max_or_clear(monkeypatch):
fake = _FakeClient()
with pytest.raises(SystemExit):
_run(monkeypatch, ["observatory", "throttle", "owl-lane-1"], fake)


def test_throttle_rejects_nonpositive_max(monkeypatch):
fake = _FakeClient()
with pytest.raises(SystemExit):
_run(monkeypatch, ["observatory", "throttle", "--max", "0"], fake)


def test_api_error_maps_to_exit_2(monkeypatch, capsys):
fake = _FakeClient()
fake._raise = cli_client.ApiError(403, "forbidden")
rc = _run(monkeypatch, ["observatory", "pause"], fake)
assert rc == 2
assert "forbidden" in capsys.readouterr().err


def test_transport_error_maps_to_exit_1(monkeypatch, capsys):
fake = _FakeClient()
fake._raise = cli_client.TransportError("cannot reach http://x: refused")
rc = _run(monkeypatch, ["observatory", "fleet"], fake)
assert rc == 1
assert "cannot reach" in capsys.readouterr().err
79 changes: 79 additions & 0 deletions tinyagentos/cli/taosctl/commands/observatory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""taosctl observatory -- watch the agent fleet and steer the work queue.

The Observatory backend exposes the fleet view plus the pause and concurrency
dials the dispatch loop polls each iteration. This group is the terminal/script
control surface for them, so steering the queue is a command rather than a hand
edit of a local dispatch script. Pause/throttle changes are admin-only server
side; reads are open to any authenticated caller.
"""
from __future__ import annotations

from tinyagentos.cli.taosctl.argtypes import positive_int

NOUN = "observatory"

_GLOBAL = "global"


def register(subparsers) -> None:
p = subparsers.add_parser(NOUN, help="Watch the agent fleet and steer the queue")
verbs = p.add_subparsers(dest="verb", required=True, metavar="<verb>")

fp = verbs.add_parser("fleet", help="Fleet view: which agents are working and what they hold")
fp.set_defaults(func=_fleet)

psp = verbs.add_parser("pause-status", help="Show the current pause state")
psp.set_defaults(func=_pause_status)

pp = verbs.add_parser("pause", help="Pause the queue globally or for one lane")
pp.add_argument("scope", nargs="?", default=_GLOBAL,
help="Lane handle, or 'global' (default) for the whole fleet")
pp.set_defaults(func=_pause)

rp = verbs.add_parser("resume", help="Resume the queue globally or for one lane")
rp.add_argument("scope", nargs="?", default=_GLOBAL,
help="Lane handle, or 'global' (default)")
rp.set_defaults(func=_resume)

tsp = verbs.add_parser("throttle-status", help="Show the current concurrency caps")
tsp.set_defaults(func=_throttle_status)

tp = verbs.add_parser("throttle", help="Set or clear a concurrency cap")
tp.add_argument("scope", nargs="?", default=_GLOBAL,
help="Lane handle, or 'global' (default)")
cap = tp.add_mutually_exclusive_group(required=True)
cap.add_argument("--max", dest="max_concurrent", type=positive_int,
help="Max cards a lane may hold in flight at once")
cap.add_argument("--clear", action="store_true",
help="Clear the cap (fall back to the loop default)")
tp.set_defaults(func=_throttle)


def _fleet(args, client):
return client.get("/api/observatory/fleet")


def _pause_status(args, client):
return client.get("/api/observatory/pause")


def _pause(args, client):
return client.post("/api/observatory/pause", body={"scope": args.scope, "paused": True})


def _resume(args, client):
return client.post("/api/observatory/pause", body={"scope": args.scope, "paused": False})


def _throttle_status(args, client):
return client.get("/api/observatory/throttle")


def _throttle(args, client):
# --clear and --max are mutually exclusive and one is required; a clear sends
# an explicit null so the server drops the override.
limit = None if args.clear else args.max_concurrent
return client.post(
"/api/observatory/throttle",
body={"scope": args.scope, "max_concurrent": limit},
)
Loading