Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions areal/experimental/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SPDX-License-Identifier: Apache-2.0

"""AReaL operator CLI — companion to the v2 microservice control plane.

This package exposes a single ``areal`` console-script that drives the v2
service gateways (inference / agent / training / weight-update) from a
shell, rather than from a Python script that has to instantiate the
matching controller. It is intentionally light at import time so that
adding a verb in a follow-up PR does not pull torch / ray / megatron /
sglang / vllm into the parser-construction path.

The full per-verb design surface is tracked in the upstream design
discussion issue.
"""
109 changes: 109 additions & 0 deletions areal/experimental/cli/_exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# SPDX-License-Identifier: Apache-2.0

"""Driver execution wrapper.

``run_with_wrapper`` runs a driver in the current process with a heartbeat
thread, SIGTERM trap, and final-state write. Used by both:

- ``areal run`` (attached / foreground): CLI process is the wrapper
- ``areal start`` (detached / background): wrapper is a child process
spawned via ``python -m areal.experimental.cli._exec`` (entry: ``main``)
"""

from __future__ import annotations

import argparse
import importlib
import os
import signal
import sys
import threading
import time
import traceback
from pathlib import Path

from areal.experimental.cli.state import RunState


HEARTBEAT_INTERVAL_S = 5.0


def _heartbeat_loop(name: str, stop_event: threading.Event) -> None:
while not stop_event.is_set():
try:
s = RunState.load(name)
s.last_heartbeat = time.time()
s.save()
except (FileNotFoundError, ValueError, OSError):
pass
stop_event.wait(HEARTBEAT_INTERVAL_S)


def _install_sigterm_handler() -> None:
def _handler(signum, frame):
raise SystemExit(143)
signal.signal(signal.SIGTERM, _handler)


def run_with_wrapper(
name: str,
driver_spec: str,
config_path: str | Path,
overrides: list[str],
) -> int:
_install_sigterm_handler()

stop = threading.Event()
hb = threading.Thread(target=_heartbeat_loop, args=(name, stop), daemon=True)
hb.start()

rc = 0
try:
mod_path, func_name = driver_spec.split(":", 1)
mod = importlib.import_module(mod_path)
fn = getattr(mod, func_name)
argv = ["--config", str(config_path), *overrides]
result = fn(argv)
if isinstance(result, int):
rc = result
except SystemExit as e:
rc = e.code if isinstance(e.code, int) else (1 if e.code else 0)
except BaseException:
traceback.print_exc(file=sys.stderr)
rc = 1
finally:
stop.set()
try:
final = RunState.load(name)
final.status = "completed" if rc == 0 else "failed"
final.exit_code = rc
final.last_heartbeat = time.time()
final.save()
except Exception:
pass

return rc


def main() -> int:
p = argparse.ArgumentParser(prog="areal-exec", add_help=False)
p.add_argument("--name", required=True)
p.add_argument("--driver", required=True)
p.add_argument("--config", required=True)
p.add_argument("overrides", nargs=argparse.REMAINDER)
args = p.parse_args()

overrides = args.overrides or []
if overrides and overrides[0] == "--":
overrides = overrides[1:]

state = RunState.load(args.name)
state.pid = os.getpid()
state.last_heartbeat = time.time()
state.save()

return run_with_wrapper(args.name, args.driver, args.config, overrides)


if __name__ == "__main__":
sys.exit(main())
1 change: 1 addition & 0 deletions areal/experimental/cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# SPDX-License-Identifier: Apache-2.0
49 changes: 49 additions & 0 deletions areal/experimental/cli/commands/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# SPDX-License-Identifier: Apache-2.0

"""``areal agent`` — agent service operator console (scaffold).

Drives an agent service (gateway + router + N (worker, data-proxy) pairs)
for session-centric operator and debugging work. No verbs are implemented
in this scaffold release; this module only reserves the ``areal agent``
command name and tells the user what is coming.

The agent CLI is session-centric (not model-centric like ``areal inf``).
Sessions can negotiate an RL session key with a configured inference
service when they start, enabling online RL trajectory tracking.
"""

from __future__ import annotations

import argparse

_DESCRIPTION = """\
Operate an agent service: gateway + router + (worker, data-proxy) pairs.
Session-centric: the primary unit of interaction is an agent session,
not a model.

NO VERBS IMPLEMENTED YET. This namespace currently only reserves the
`areal agent ...` command surface.

Planned verb surface (flag matrices live in the design discussion issue):
run launch router + N pairs + gateway
stop tear them down
status health for one service
ps list locally known services
logs show gateway / router / worker / data-proxy logs

State lives under ~/.areal/agent/.
"""


def add_parser(subparsers: argparse._SubParsersAction) -> None:
p = subparsers.add_parser(
"agent",
help="Operate an agent service (scaffold — no verbs yet).",
description=_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.set_defaults(func=_handle)


def _handle(_: argparse.Namespace) -> int:
return 0
34 changes: 34 additions & 0 deletions areal/experimental/cli/commands/inf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# SPDX-License-Identifier: Apache-2.0

"""``areal inf`` — inference service operator console."""

from __future__ import annotations

import argparse


_DESCRIPTION = """\
Operate an inference service: gateway + router + optional model backends.

Implemented verbs:
run Launch the gateway+router stack (detached).

Planned (not yet implemented):
stop / status / ps / register / deregister / models / logs

State lives under ~/.areal/inf/.
"""


def add_parser(subparsers: argparse._SubParsersAction) -> None:
p = subparsers.add_parser(
"inf",
help="Operate an inference service.",
description=_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = p.add_subparsers(dest="verb", required=True, metavar="VERB")

from areal.experimental.cli.commands.inf import run as cmd_run

cmd_run.add_parser(sub)
86 changes: 86 additions & 0 deletions areal/experimental/cli/commands/inf/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# SPDX-License-Identifier: Apache-2.0

"""``areal inf run`` — launch the v2 inference service (detached)."""

from __future__ import annotations

import argparse
import time


_DESCRIPTION = """\
Spawn the v2 inference gateway + router as detached subprocesses, wait
for HTTP /health, persist state under ~/.areal/inf/, and exit.

The CLI process exits after the service is healthy; the gateway and
router keep running. Later commands (stop / status / ps / logs / ...)
reconcile via state + PID + HTTP.
"""


def add_parser(subparsers: argparse._SubParsersAction) -> None:
p = subparsers.add_parser(
"run",
help="Launch the inference service (detached).",
description=_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument("--service", default="default", help="Service instance name.")
p.add_argument("--gateway-host", default="127.0.0.1")
p.add_argument("--gateway-port", type=int, default=8080)
p.add_argument("--router-host", default="127.0.0.1")
p.add_argument("--router-port", type=int, default=8081)
p.add_argument("--admin-api-key", default="areal-admin-key")
p.add_argument(
"--routing-strategy", default="round_robin",
choices=["round_robin", "least_busy"],
)
p.add_argument("--poll-interval", type=float, default=5.0)
p.add_argument("--router-timeout", type=float, default=2.0)
p.add_argument("--forward-timeout", type=float, default=120.0)
p.add_argument(
"--log-level", default="info",
choices=["debug", "info", "warning", "error"],
)
p.add_argument("--launch-timeout", type=float, default=30.0)
p.add_argument(
"--force", action="store_true",
help="Replace an existing healthy instance with the same name.",
)
p.set_defaults(func=_handle)


def _handle(args: argparse.Namespace) -> int:
from areal.experimental.cli.inf_launcher import start_service
from areal.experimental.cli.inf_state import (
get_current_service,
service_logs_dir,
set_current_service,
)

state = start_service(
name=args.service,
gateway_host=args.gateway_host,
gateway_port=args.gateway_port,
router_host=args.router_host,
router_port=args.router_port,
admin_api_key=args.admin_api_key,
routing_strategy=args.routing_strategy,
poll_interval=args.poll_interval,
router_timeout=args.router_timeout,
forward_timeout=args.forward_timeout,
log_level=args.log_level,
force=args.force,
launch_timeout=args.launch_timeout,
)

if get_current_service() is None:
set_current_service(state.name)

logs = service_logs_dir(state.name)
print(f"Started service {state.name!r}.")
print(f" gateway: {state.gateway_url} (pid {state.gateway_pid})")
print(f" router: {state.router_url} (pid {state.router_pid})")
print(f" logs: {logs}")
print(f" started: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(state.created_at))}")
return 0
34 changes: 34 additions & 0 deletions areal/experimental/cli/commands/logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# SPDX-License-Identifier: Apache-2.0

"""``areal logs`` — tail a training job's stdout/stderr (scaffold)."""

from __future__ import annotations

import argparse

_DESCRIPTION = """\
Tail a training job's combined stdout/stderr.

NO BEHAVIOR YET. Reserves the `areal logs` command name.

Planned flags:
run_name Name of the run (positional, required).
-f, --follow Stream new log lines as they arrive.
-n, --lines N Number of recent lines to print initially (default: 200).

Log files live under ~/.areal/runs/<name>/.
"""


def add_parser(subparsers: argparse._SubParsersAction) -> None:
p = subparsers.add_parser(
"logs",
help="Tail a training job's stdout/stderr (scaffold).",
description=_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.set_defaults(func=_handle)


def _handle(_: argparse.Namespace) -> int:
return 0
33 changes: 33 additions & 0 deletions areal/experimental/cli/commands/ps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# SPDX-License-Identifier: Apache-2.0

"""``areal ps`` — list locally tracked training jobs (scaffold)."""

from __future__ import annotations

import argparse

_DESCRIPTION = """\
List locally tracked training jobs.

NO BEHAVIOR YET. Reserves the `areal ps` command name.

Planned flags:
--json Emit machine-readable JSON.
--all Include completed / failed runs (default: running only).

State lives under ~/.areal/runs/.
"""


def add_parser(subparsers: argparse._SubParsersAction) -> None:
p = subparsers.add_parser(
"ps",
help="List locally tracked training jobs (scaffold).",
description=_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.set_defaults(func=_handle)


def _handle(_: argparse.Namespace) -> int:
return 0
Loading
Loading