From a770b958969efa5762496954feb1ba93aa0c71c7 Mon Sep 17 00:00:00 2001 From: IF007 <565985999@qq.com> Date: Wed, 3 Jun 2026 00:09:07 +0800 Subject: [PATCH] feat(agent_service): add OpenClaw per-session agent runtime Integrate OpenClaw as an agent_service runtime that spawns one OpenClaw gateway subprocess per RL session, each bound to its own upstream LLM key, so a session's turns are attributable to a distinct per-episode key. - examples/openclaw/agent.py: OpenClawAgent (AgentRunnable) with per-session subprocess lifecycle; drives each turn via the OpenAI chat-completions endpoint using a unique per-turn session key so DataProxy-replayed history is the single source of truth (avoids double-feeding OpenClaw's own memory). - examples/openclaw/run_agent_service.py: runnable launcher with a random admin key and auto-created fileroot. - data_proxy: reset history and reward on episode/start, and clear reward after episode/end, to prevent cross-episode state leakage. - agent_service types/worker/__init__: backward-compatible lifecycle hooks (on_episode_start/end, TrainingContext) for episode-aware runtimes. - examples/openclaw/README: document the OpenClaw agent_service design and usage. Co-Authored-By: Claude Opus 4.7 --- areal/experimental/agent_service/__init__.py | 9 +- .../agent_service/data_proxy/app.py | 74 ++- areal/experimental/agent_service/types.py | 47 +- .../experimental/agent_service/worker/app.py | 40 +- examples/openclaw/README.md | 162 ++++++- examples/openclaw/agent.py | 430 ++++++++++++++++++ examples/openclaw/run_agent_service.py | 186 ++++++++ 7 files changed, 940 insertions(+), 8 deletions(-) create mode 100644 examples/openclaw/agent.py create mode 100644 examples/openclaw/run_agent_service.py diff --git a/areal/experimental/agent_service/__init__.py b/areal/experimental/agent_service/__init__.py index 2c964e550f..6dd35aa9d8 100644 --- a/areal/experimental/agent_service/__init__.py +++ b/areal/experimental/agent_service/__init__.py @@ -16,11 +16,18 @@ - ``protocol`` — WebSocket frame types and helpers """ -from .types import AgentRequest, AgentResponse, AgentRunnable, EventEmitter +from .types import ( + AgentRequest, + AgentResponse, + AgentRunnable, + EventEmitter, + TrainingContext, +) __all__ = [ "AgentRequest", "AgentResponse", "AgentRunnable", "EventEmitter", + "TrainingContext", ] diff --git a/areal/experimental/agent_service/data_proxy/app.py b/areal/experimental/agent_service/data_proxy/app.py index fbf51646ba..2c6fb8d5c6 100644 --- a/areal/experimental/agent_service/data_proxy/app.py +++ b/areal/experimental/agent_service/data_proxy/app.py @@ -10,7 +10,7 @@ from typing import Any import httpx -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException from areal.utils import logging @@ -24,6 +24,8 @@ class _SessionData: history: list[dict[str, Any]] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) last_active: float = field(default_factory=time.monotonic) + reward: float | None = None + training_ctx: dict[str, Any] | None = None def create_data_proxy_app(config: DataProxyConfig) -> FastAPI: @@ -145,6 +147,76 @@ async def close_session(session_key: str): await _close_worker_session(session_key) return {"status": "ok"} + @app.post("/session/{session_key}/episode/start") + async def episode_start(session_key: str, body: dict[str, Any]): + """Open a training episode for a session. + + Forwards a :class:`TrainingContext` payload to the worker so the + agent can route internal LLM calls through the training proxy. + """ + session = sessions.get(session_key) + if session is None: + session = _SessionData() + sessions[session_key] = session + # A new episode starts from a clean slate: the worker respawns a + # fresh subprocess, so any history/reward carried over from a prior + # episode on the same key would corrupt the new trajectory. + session.history.clear() + session.reward = None + session.training_ctx = dict(body) + session.last_active = time.monotonic() + + resp = await http_client.post( + f"{config.worker_addr}/session/{session_key}/episode/start", + json=body, + ) + resp.raise_for_status() + return resp.json() + + @app.post("/session/{session_key}/episode/end") + async def episode_end(session_key: str, body: dict[str, Any]): + """Close a training episode and forward final reward to the worker. + + ``body``: ``{"reward": }``. Reward defaults to the + last value set via ``/session/{key}/reward``. + """ + session = sessions.get(session_key) + reward = body.get("reward") + if reward is None and session is not None: + reward = session.reward + + resp = await http_client.post( + f"{config.worker_addr}/session/{session_key}/episode/end", + json={"reward": reward}, + ) + resp.raise_for_status() + + if session is not None: + # The reward has been consumed by this episode; clear it so a + # subsequent episode on the same key does not inherit a stale value. + session.reward = None + session.last_active = time.monotonic() + return resp.json() + + @app.post("/session/{session_key}/reward") + async def set_reward(session_key: str, body: dict[str, Any]): + """Record a scalar reward for the session. + + The reward is buffered here and forwarded to the worker on the + next ``episode/end`` call. Layer 2 will additionally relay it + to the ProxyGateway's ``/rl/set_reward`` endpoint for training. + """ + session = sessions.get(session_key) + if session is None: + session = _SessionData() + sessions[session_key] = session + reward = body.get("reward") + if isinstance(reward, bool) or not isinstance(reward, (int, float)): + raise HTTPException(status_code=400, detail="reward must be a number") + session.reward = float(reward) + session.last_active = time.monotonic() + return {"status": "ok", "reward": session.reward} + @app.get("/session/{session_key}/history") async def get_history(session_key: str): session = sessions.get(session_key) diff --git a/areal/experimental/agent_service/types.py b/areal/experimental/agent_service/types.py index 7290634e67..e12f5f14dc 100644 --- a/areal/experimental/agent_service/types.py +++ b/areal/experimental/agent_service/types.py @@ -34,6 +34,33 @@ class AgentResponse: metadata: dict[str, Any] = field(default_factory=dict) +@dataclass +class TrainingContext: + """Training-side handle injected into an agent at episode start. + + Carries everything an agent needs to make its internal LLM calls flow + through AReaL's training pipeline (token + logprob capture). Pass to + an agent's optional ``on_episode_start`` hook (see ``AgentRunnable``). + + Attributes: + session_id: Opaque episode identifier for trajectory binding. + llm_base_url: OpenAI-compatible base URL of the proxy gateway + that captures training data. Agents should configure their + internal LLM client to use this as the upstream endpoint. + llm_api_key: Per-episode key bound to ``session_id``. Typically a + ``sk-sess-*`` value returned by ``/rl/start_session``. + llm_model: Model identifier to use for inference. Empty string + means "agent decides". + extras: Implementation-specific overrides (e.g. tool_call_parser). + """ + + session_id: str + llm_base_url: str + llm_api_key: str + llm_model: str = "" + extras: dict[str, Any] = field(default_factory=dict) + + class EventEmitter(Protocol): """Callback interface for streaming events from agent to caller.""" @@ -52,10 +79,22 @@ class AgentRunnable(Protocol): interaction — the Agent Service only provides session lifecycle and event streaming. - Reward computation is **not** part of this interface. Rewards are - calculated externally by the training pipeline (via reward functions - applied to exported trajectories), following AReaL's standard RLVR - pattern. + Only ``run`` is required. The following methods are optional and + discovered via ``getattr`` at runtime — implement them to participate + in training-related lifecycle: + + - ``async close_session(session_key)`` — release per-session state + when a session is closed by the DataProxy. + - ``async close_all_sessions()`` — clean up everything on worker + shutdown. + - ``async on_episode_start(session_key, training_ctx)`` — receive a + :class:`TrainingContext` so the agent can route its internal LLM + calls through the proxy gateway. Called once per RL episode, + before the first ``run`` of that episode. + - ``async on_episode_end(session_key, reward)`` — called once when + an episode terminates (with the final scalar reward, if any). + Agents wiring trajectories to the training pipeline should flush + / finalize here. """ async def run( diff --git a/areal/experimental/agent_service/worker/app.py b/areal/experimental/agent_service/worker/app.py index 55507bc558..7a3b35223e 100644 --- a/areal/experimental/agent_service/worker/app.py +++ b/areal/experimental/agent_service/worker/app.py @@ -14,7 +14,7 @@ from areal.utils.dynamic_import import import_from_string from ..protocol import QueueMode -from ..types import AgentRequest, AgentResponse, AgentRunnable +from ..types import AgentRequest, AgentResponse, AgentRunnable, TrainingContext logger = logging.getLogger("AgentWorker") @@ -52,6 +52,44 @@ def create_worker_app( async def health(): return {"status": "ok"} + @app.post("/session/{session_key}/episode/start") + async def episode_start(session_key: str, body: dict[str, Any]): + hook = getattr(agent, "on_episode_start", None) + if hook is None: + return {"status": "noop"} + ctx = TrainingContext( + session_id=body.get("session_id", session_key), + llm_base_url=body.get("llm_base_url", ""), + llm_api_key=body.get("llm_api_key", ""), + llm_model=body.get("llm_model", ""), + extras=body.get("extras", {}), + ) + try: + await hook(session_key, ctx) + except Exception as exc: + logger.exception("on_episode_start failed (session=%s)", session_key) + return JSONResponse( + {"error": {"message": str(exc), "type": type(exc).__name__}}, + status_code=500, + ) + return {"status": "ok"} + + @app.post("/session/{session_key}/episode/end") + async def episode_end(session_key: str, body: dict[str, Any]): + hook = getattr(agent, "on_episode_end", None) + if hook is None: + return {"status": "noop"} + reward = body.get("reward") + try: + await hook(session_key, reward) + except Exception as exc: + logger.exception("on_episode_end failed (session=%s)", session_key) + return JSONResponse( + {"error": {"message": str(exc), "type": type(exc).__name__}}, + status_code=500, + ) + return {"status": "ok"} + @app.post("/session/{session_key}/close") async def close_session(session_key: str): close_fn = getattr(agent, "close_session", None) diff --git a/examples/openclaw/README.md b/examples/openclaw/README.md index d501ba3fe4..610112f6e1 100644 --- a/examples/openclaw/README.md +++ b/examples/openclaw/README.md @@ -17,7 +17,167 @@ the OpenAI chat-completions protocol. **Disclaimer**: RL-finetuned models may exhibit unexpected behaviors. Please ensure strict permission rules and an isolated execution environment for your agent runtime. -## Prerequisites +## Two ways to use this example + +This directory ships two complementary entry points: + +| Mode | Entry point | What it does | +| ----------------- | ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------- | +| **Agent Service** | `run_agent_service.py` | Hosts OpenClaw behind AReaL's Agent Service so external clients call it over an OpenAI-compatible HTTP API. Start here. | +| **RL training** | `train.py` | Drives end-to-end RL (PPO) where every agent LLM call flows through the training proxy gateway. Documented from *Prerequisites* below. | + +The two share the same building block — the `OpenClawAgent` ([`agent.py`](agent.py)) — +and the same lifecycle hooks, so a setup that serves traffic today can be wired into RL +training tomorrow. + +## Serving OpenClaw via the Agent Service + +### Design + +AReaL's Agent Service is a small fleet of cooperating processes; OpenClaw plugs in as +the per-worker *agent runtime*: + +``` +client ──HTTP /v1/responses──▶ Gateway ──▶ Router ──▶ DataProxy ──▶ Worker + (auth) (session (per-session (hosts OpenClawAgent) + routing) history) │ + │ spawns + drives + ▼ + OpenClaw gateway subprocess + (one per session) + │ + ▼ + upstream LLM (your model) +``` + +- **Gateway** — public OpenAI-compatible surface (`/v1/responses`); enforces the admin + API key. +- **Router** — maps each `session_key` to a worker; owns health state. +- **DataProxy** — keeps the conversation history for a session and replays it to the + worker each turn. +- **Worker** — loads the class named by `AgentConfig.agent_cls_path` (here + `examples.openclaw.agent.OpenClawAgent`) and exposes it as an `AgentRunnable`. + +**Why one OpenClaw subprocess per session?** OpenClaw's configuration (provider, +upstream key, model) is *process-global*. RL requires each session's turns to be +attributable to a distinct per-episode upstream key (`sk-sess-*`), so logical isolation +inside a single process is not enough — each session gets its own OpenClaw process bound +to its own upstream. `OpenClawAgent` manages this: + +- `on_episode_start(session_key, training_ctx)` — pick a free port, render a temporary + `openclaw.json`, spawn `openclaw gateway`, and wait until `/v1/models` is healthy. The + upstream is taken from the injected `TrainingContext` (RL) or the + `OPENCLAW_UPSTREAM_*` environment (serving). +- `run(request)` — one turn = one `POST /v1/chat/completions` to the session's own + subprocess, with `model: "openclaw/default"` and header `x-openclaw-session-key`; the + SSE stream is forwarded as deltas / tool calls. If no episode was opened (plain + serving), the subprocess is spawned lazily from the env upstream. +- `on_episode_end` / `close_session` / `close_all_sessions` — terminate the subprocess + (SIGTERM → SIGKILL fallback), close clients, and remove the temp config directory. + +### Prerequisites (Agent Service) + +1. **AReaL** installed (see *Install AReaL* below). +1. **OpenClaw CLI** on `PATH`: + ```bash + npm install -g openclaw + openclaw --version + ``` +1. An **upstream LLM** reachable over an OpenAI-compatible (or Anthropic) API, with a + base URL, API key, and model id. + +### Launch the service + +```bash +export OPENCLAW_UPSTREAM_BASE_URL="https://your-llm/v1" # OpenAI-compatible endpoint +export OPENCLAW_UPSTREAM_API_KEY="sk-..." # upstream model key +export OPENCLAW_UPSTREAM_MODEL="claude-sonnet-4-6" # upstream model id + +python examples/openclaw/run_agent_service.py +``` + +This boots one Worker + DataProxy pair behind a Router and Gateway, then drops into an +interactive prompt. Expected output: + +``` +Initializing with 1 pair ... + Router: http://x.x.x.x:xxxxx + Gateway: http://x.x.x.x:xxxxx + Pairs: 1 +All services ready. + +You: Reply with exactly: hello +Agent: hello +``` + +The launcher generates a random admin API key by default (a fixed unique key is required +when the Gateway binds to a non-loopback interface). Pass `--admin-api-key ` to +set your own, `--upstream-url` / `--upstream-model` to override the env, and +`--fileroot ` to relocate logs and name-resolve records (defaults to a temp +directory, created automatically). + +### Provide the agent service (call it from a client) + +Once running, any OpenAI-`/v1/responses`-style client can talk to the Gateway. Use the +admin API key as the bearer token and a stable `user` to pin a session: + +```bash +curl -s http:///v1/responses \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{ + "model": "openclaw-agent", + "user": "session-123", + "input": [{"type": "message", "content": "Hello!"}] + }' +``` + +Reusing the same `user` across calls keeps the conversation history (held by the +DataProxy) and reuses the same OpenClaw subprocess. + +### Wiring into RL training + +The same agent participates in RL through three DataProxy endpoints (the controller +calls these once it can mint per-session keys): + +| Endpoint | Effect | +| ----------------------------------- | ---------------------------------------------------------------------------------- | +| `POST /session/{key}/episode/start` | Forwards a `TrainingContext` → `on_episode_start` spawns a subprocess bound to it. | +| `POST /session/{key}/reward` | Buffers a scalar reward for the session. | +| `POST /session/{key}/episode/end` | Forwards the final reward → `on_episode_end` tears the subprocess down. | + +The `TrainingContext` carries `llm_base_url` / `llm_api_key` / `llm_model`, pointing +OpenClaw's upstream at AReaL's proxy gateway so tokens and log-probabilities are +captured for training. + +### Environment variables + +| Variable | Purpose | Default | +| ------------------------------ | ---------------------------------------------------- | --------------------- | +| `OPENCLAW_UPSTREAM_BASE_URL` | Upstream LLM base URL (serving fallback) | — (required to serve) | +| `OPENCLAW_UPSTREAM_API_KEY` | Upstream LLM API key | — (required to serve) | +| `OPENCLAW_UPSTREAM_MODEL` | Upstream model id | `default` | +| `OPENCLAW_UPSTREAM_API` | `openai-completions` or `anthropic-messages` | `openai-completions` | +| `OPENCLAW_BIN` | OpenClaw executable | `openclaw` | +| `OPENCLAW_TIMEOUT` | Per-request timeout (seconds) | `120` | +| `OPENCLAW_STARTUP_TIMEOUT` | Subprocess health-wait (seconds) | `60` | +| `OPENCLAW_NODE_EXTRA_CA_CERTS` | CA bundle for the upstream TLS cert (preferred) | unset | +| `OPENCLAW_TLS_INSECURE` | `1` sets `NODE_TLS_REJECT_UNAUTHORIZED=0` (dev only) | unset | + +Legacy `OPENCLAW_GATEWAY_URL` / `OPENCLAW_GATEWAY_TOKEN` / `OPENCLAW_MODEL` are still +accepted as fallbacks for the upstream URL / key / model. + +### Troubleshooting + +- **`upstream provider timeout` / `UNABLE_TO_GET_ISSUER_CERT_LOCALLY`** — Node cannot + verify the upstream's TLS certificate (common with corporate CAs). Point + `OPENCLAW_NODE_EXTRA_CA_CERTS` at the CA bundle, or, for local dev only, set + `OPENCLAW_TLS_INSECURE=1`. +- **`Refusing to start server ... default admin API key`** — the Gateway binds to a + routable interface. Pass a unique `--admin-api-key` (the launcher already generates + one by default). +- **`name_resolve... does not exist` / `fileroot ... is None`** — pass + `--fileroot ` (auto-created); the default temp directory normally avoids this. 1. A GPU machine with at least **2 NVIDIA GPUs** (compute capability 8.0 or higher, i.e. Ampere / Hopper). diff --git a/examples/openclaw/agent.py b/examples/openclaw/agent.py new file mode 100644 index 0000000000..47e8673bb3 --- /dev/null +++ b/examples/openclaw/agent.py @@ -0,0 +1,430 @@ +"""OpenClaw Agent for AReaL Agent Service (per-session subprocess). + +Implements :class:`AgentRunnable` by spawning **one OpenClaw Gateway +subprocess per RL session**. Each subprocess is bound to its own +upstream LLM (base URL + API key + model) so that, during training, a +session's turns can be attributed to a distinct per-episode key +(``sk-sess-*``). OpenClaw config is process-global, so per-session +isolation requires one process per session. + +Per turn the agent issues a single OpenAI-compatible +``POST /v1/chat/completions`` to the session's own subprocess, replaying +the conversation history that the DataProxy maintains. + +Requires the ``openclaw`` CLI on ``PATH`` (``npm i -g openclaw``). + +Upstream selection +------------------ +At episode start the agent prefers the :class:`TrainingContext` injected +by the controller (``llm_base_url`` / ``llm_api_key`` / ``llm_model``). +Outside training (e.g. the interactive demo) it falls back to the +``OPENCLAW_UPSTREAM_*`` environment variables, then to the legacy +``OPENCLAW_GATEWAY_*`` names. + +Environment variables +--------------------- + OPENCLAW_BIN — openclaw executable (default ``openclaw``). + OPENCLAW_UPSTREAM_BASE_URL — upstream LLM base URL (fallback when no + TrainingContext); legacy fallback ``OPENCLAW_GATEWAY_URL``. + OPENCLAW_UPSTREAM_API_KEY — upstream API key; legacy fallback + ``OPENCLAW_GATEWAY_TOKEN``. + OPENCLAW_UPSTREAM_MODEL — upstream model id; legacy fallback + ``OPENCLAW_MODEL``. + OPENCLAW_UPSTREAM_API — ``openai-completions`` (default) or + ``anthropic-messages``. + OPENCLAW_TIMEOUT — per-request timeout in seconds (default 120). + OPENCLAW_STARTUP_TIMEOUT — subprocess health-wait seconds (default 60). + OPENCLAW_NODE_EXTRA_CA_CERTS— path to a CA bundle for the upstream TLS + cert (preferred over disabling verification). + OPENCLAW_TLS_INSECURE — ``1`` to set ``NODE_TLS_REJECT_UNAUTHORIZED=0`` + (dev only; needed for upstreams whose CA Node cannot verify). +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +import os +import secrets +import shutil +import socket +import tempfile +import uuid +from dataclasses import dataclass, field +from typing import IO, Any + +import httpx + +from areal.experimental.agent_service.types import ( + AgentRequest, + AgentResponse, + EventEmitter, + TrainingContext, +) +from areal.utils import logging + +logger = logging.getLogger("OpenClawAgent") + +_PROVIDER = "areal" + + +def _truthy(value: str) -> bool: + return value.strip().lower() in ("1", "true", "yes", "on") + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return sock.getsockname()[1] + + +@dataclass +class _Upstream: + """Upstream LLM that an OpenClaw subprocess routes its calls to.""" + + base_url: str + api_key: str + model: str + api: str = "openai-completions" + + @classmethod + def from_training_ctx(cls, ctx: TrainingContext) -> _Upstream | None: + if not ctx.llm_base_url or not ctx.llm_api_key: + return None + api = str(ctx.extras.get("openclaw_api", "openai-completions")) + model = ctx.llm_model or "default" + return cls( + base_url=ctx.llm_base_url, + api_key=ctx.llm_api_key, + model=model, + api=api, + ) + + @classmethod + def from_env(cls) -> _Upstream | None: + base_url = os.environ.get("OPENCLAW_UPSTREAM_BASE_URL") or os.environ.get( + "OPENCLAW_GATEWAY_URL", "" + ) + api_key = os.environ.get("OPENCLAW_UPSTREAM_API_KEY") or os.environ.get( + "OPENCLAW_GATEWAY_TOKEN", "" + ) + if not base_url or not api_key: + return None + model = os.environ.get("OPENCLAW_UPSTREAM_MODEL") or os.environ.get( + "OPENCLAW_MODEL", "default" + ) + api = os.environ.get("OPENCLAW_UPSTREAM_API", "openai-completions") + return cls(base_url=base_url.rstrip("/"), api_key=api_key, model=model, api=api) + + +@dataclass +class _SessionState: + port: int + gateway_token: str + config_dir: str + process: asyncio.subprocess.Process + client: httpx.AsyncClient + log_file: IO[str] + training_ctx: TrainingContext | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + +class OpenClawAgent: + """AgentRunnable that runs one OpenClaw subprocess per session.""" + + def __init__(self, **_: Any) -> None: + self._bin = os.environ.get("OPENCLAW_BIN", "openclaw") + self._timeout = float(os.environ.get("OPENCLAW_TIMEOUT", "120")) + self._startup_timeout = float(os.environ.get("OPENCLAW_STARTUP_TIMEOUT", "60")) + self._node_extra_ca_certs = os.environ.get("OPENCLAW_NODE_EXTRA_CA_CERTS", "") + self._tls_insecure = _truthy(os.environ.get("OPENCLAW_TLS_INSECURE", "")) + self._env_upstream = _Upstream.from_env() + + self._sessions: dict[str, _SessionState] = {} + self._locks: dict[str, asyncio.Lock] = {} + self._locks_guard = asyncio.Lock() + + logger.info( + "OpenClawAgent initialized (bin=%s, env_upstream=%s, tls_insecure=%s)", + self._bin, + self._env_upstream is not None, + self._tls_insecure, + ) + + # ------------------------------------------------------------------ + # Subprocess lifecycle + # ------------------------------------------------------------------ + + def _render_config( + self, port: int, token: str, upstream: _Upstream + ) -> dict[str, Any]: + return { + "gateway": { + "mode": "local", + "port": port, + "auth": {"mode": "token", "token": token}, + "http": {"endpoints": {"chatCompletions": {"enabled": True}}}, + }, + "models": { + "providers": { + _PROVIDER: { + "baseUrl": upstream.base_url, + "apiKey": upstream.api_key, + "api": upstream.api, + "models": [{"id": upstream.model, "name": upstream.model}], + } + } + }, + "agents": {"defaults": {"model": f"{_PROVIDER}/{upstream.model}"}}, + } + + async def _session_lock(self, session_key: str) -> asyncio.Lock: + async with self._locks_guard: + lock = self._locks.get(session_key) + if lock is None: + lock = asyncio.Lock() + self._locks[session_key] = lock + return lock + + async def _spawn(self, session_key: str, upstream: _Upstream) -> _SessionState: + port = _free_port() + token = secrets.token_hex(16) + config_dir = tempfile.mkdtemp(prefix="openclaw-") + config_path = os.path.join(config_dir, "openclaw.json") + state_dir = os.path.join(config_dir, "state") + os.makedirs(state_dir, exist_ok=True) + with open(config_path, "w") as fh: + json.dump(self._render_config(port, token, upstream), fh) + + env = dict(os.environ) + env["OPENCLAW_CONFIG_PATH"] = config_path + env["OPENCLAW_STATE_DIR"] = state_dir + if self._node_extra_ca_certs: + env["NODE_EXTRA_CA_CERTS"] = self._node_extra_ca_certs + if self._tls_insecure: + env["NODE_TLS_REJECT_UNAUTHORIZED"] = "0" + + log_file = open(os.path.join(config_dir, "gateway.log"), "w") + proc = await asyncio.create_subprocess_exec( + self._bin, + "gateway", + "--port", + str(port), + "--auth", + "token", + "--token", + token, + "--force", + "--allow-unconfigured", + env=env, + stdout=log_file, + stderr=asyncio.subprocess.STDOUT, + ) + + client = httpx.AsyncClient( + base_url=f"http://127.0.0.1:{port}", + timeout=self._timeout, + headers={"Authorization": f"Bearer {token}"}, + ) + state = _SessionState( + port=port, + gateway_token=token, + config_dir=config_dir, + process=proc, + client=client, + log_file=log_file, + ) + try: + await self._wait_healthy(state) + except Exception: + await self._teardown_state(state) + raise + + logger.info( + "Spawned OpenClaw subprocess (session=%s, port=%d, pid=%s, model=%s)", + session_key, + port, + proc.pid, + upstream.model, + ) + return state + + async def _wait_healthy(self, state: _SessionState) -> None: + loop = asyncio.get_running_loop() + deadline = loop.time() + self._startup_timeout + while loop.time() < deadline: + if state.process.returncode is not None: + raise RuntimeError( + f"openclaw gateway exited early " + f"(rc={state.process.returncode}); see {state.config_dir}/gateway.log" + ) + try: + resp = await state.client.get("/v1/models") + if resp.status_code == 200: + return + except httpx.HTTPError: + pass + await asyncio.sleep(0.5) + raise TimeoutError( + f"openclaw gateway on port {state.port} did not become healthy " + f"within {self._startup_timeout}s" + ) + + async def _teardown_state(self, state: _SessionState) -> None: + with contextlib.suppress(Exception): + await state.client.aclose() + proc = state.process + if proc.returncode is None: + with contextlib.suppress(ProcessLookupError): + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=10) + except TimeoutError: + with contextlib.suppress(ProcessLookupError): + proc.kill() + await proc.wait() + with contextlib.suppress(Exception): + state.log_file.close() + shutil.rmtree(state.config_dir, ignore_errors=True) + + async def _ensure_session( + self, + session_key: str, + upstream: _Upstream, + training_ctx: TrainingContext | None = None, + *, + respawn: bool = False, + ) -> _SessionState: + lock = await self._session_lock(session_key) + async with lock: + existing = self._sessions.get(session_key) + if existing is not None: + if not respawn: + return existing + await self._teardown_state(self._sessions.pop(session_key)) + state = await self._spawn(session_key, upstream) + state.training_ctx = training_ctx + self._sessions[session_key] = state + return state + + # ------------------------------------------------------------------ + # Lifecycle hooks (optional members of the AgentRunnable protocol) + # ------------------------------------------------------------------ + + async def on_episode_start( + self, session_key: str, training_ctx: TrainingContext + ) -> None: + """Spawn (or rebind) a per-session subprocess for the episode. + + Prefers the upstream carried by ``training_ctx`` so the session's + LLM calls flow through AReaL's proxy gateway under a per-episode + key; falls back to the env upstream otherwise. + """ + upstream = _Upstream.from_training_ctx(training_ctx) or self._env_upstream + if upstream is None: + raise RuntimeError( + "No upstream available: TrainingContext lacks llm_base_url/" + "llm_api_key and no OPENCLAW_UPSTREAM_* env is set." + ) + await self._ensure_session(session_key, upstream, training_ctx, respawn=True) + logger.info( + "Episode start (session=%s, ctx=%s)", session_key, training_ctx.session_id + ) + + async def on_episode_end(self, session_key: str, reward: float | None) -> None: + logger.info("Episode end (session=%s, reward=%s)", session_key, reward) + await self.close_session(session_key) + + async def close_session(self, session_key: str) -> None: + state = self._sessions.pop(session_key, None) + if state is not None: + await self._teardown_state(state) + + async def close_all_sessions(self) -> None: + for key in list(self._sessions.keys()): + await self.close_session(key) + + # ------------------------------------------------------------------ + # Per-turn execution + # ------------------------------------------------------------------ + + async def run( + self, + request: AgentRequest, + *, + emitter: EventEmitter, + ) -> AgentResponse: + state = self._sessions.get(request.session_key) + if state is None: + # No episode opened (e.g. interactive demo): lazily spawn from env. + if self._env_upstream is None: + raise RuntimeError( + "No subprocess for session and no OPENCLAW_UPSTREAM_* env " + "configured; call /episode/start first or set the env." + ) + state = await self._ensure_session(request.session_key, self._env_upstream) + + messages = list(request.history) + [ + {"role": "user", "content": request.message} + ] + + text_parts: list[str] = [] + tool_calls: list[dict[str, Any]] = [] + + # Use a fresh OpenClaw session per turn: OpenClaw keeps its own + # per-session-key memory, but AReaL's DataProxy already replays the + # full history below. Reusing a stable key would feed the prior turns + # twice (OpenClaw memory + replayed history) and corrupt the upstream + # prompt that the training proxy captures. A unique key makes the + # replayed history the single source of truth. + turn_key = f"{request.session_key}:{uuid.uuid4().hex}" + + async with state.client.stream( + "POST", + "/v1/chat/completions", + json={ + "model": "openclaw/default", + "messages": messages, + "stream": True, + }, + headers={"x-openclaw-session-key": turn_key}, + ) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + if not line.startswith("data:"): + continue + # The space after "data:" is optional per the SSE spec; + # some OpenAI-compatible gateways omit it. + payload = line[len("data:") :].strip() + if payload == "[DONE]": + break + try: + chunk = json.loads(payload) + except json.JSONDecodeError: + logger.debug("Skipping malformed SSE chunk: %s", payload[:120]) + continue + + choices = chunk.get("choices") or [] + if not choices: + continue + delta = choices[0].get("delta") or {} + + text = delta.get("content") + if text: + await emitter.emit_delta(text) + text_parts.append(text) + + for tc in delta.get("tool_calls") or []: + fn = tc.get("function") or {} + name = fn.get("name") + args = fn.get("arguments", "") + if name: + await emitter.emit_tool_call(name=name, args=args) + tool_calls.append({"name": name, "input": args}) + + summary = "".join(text_parts) + return AgentResponse( + summary=summary[:200], + metadata={"tool_calls": tool_calls}, + ) diff --git a/examples/openclaw/run_agent_service.py b/examples/openclaw/run_agent_service.py new file mode 100644 index 0000000000..71a1fea5a0 --- /dev/null +++ b/examples/openclaw/run_agent_service.py @@ -0,0 +1,186 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Launch the Agent Service with the OpenClaw Agent. + +Mirrors examples/agent_service/run_agent_service.py but loads the +OpenClaw-backed AgentRunnable. The worker spawns one OpenClaw Gateway +subprocess per session, so only the ``openclaw`` CLI on ``PATH`` is +required (``npm i -g openclaw``); no externally-running gateway. + +Usage:: + + OPENCLAW_UPSTREAM_BASE_URL=https://your-llm/v1 \\ + OPENCLAW_UPSTREAM_API_KEY=sk-... \\ + OPENCLAW_UPSTREAM_MODEL=claude-sonnet-4-6 \\ + python examples/openclaw/run_agent_service.py + +The launcher boots one Worker+DataProxy pair behind a Gateway, then +drops into an interactive prompt. Each user message becomes one turn +of the OpenClaw conversation; the per-session OpenClaw subprocess drives +its configured upstream LLM internally. + +Training integration (Layer 2, not wired here yet) +-------------------------------------------------- +Once the controller learns to mint per-session ``sk-sess-*`` keys via +the AReaL ProxyGateway, this script will additionally call +``/session/{key}/episode/start`` with a :class:`TrainingContext` before +the first turn, ``/session/{key}/reward`` at the end, and +``/session/{key}/episode/end`` to flush the trajectory. +""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import secrets +import tempfile +import time + +import httpx + +from areal.api.cli_args import AgentConfig, SchedulingSpec +from areal.experimental.agent_service.controller import AgentController + + +async def _wait_healthy(url: str, timeout: float = 60.0) -> None: + async with httpx.AsyncClient() as client: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + resp = await client.get(url) + if resp.status_code == 200: + return + except httpx.ConnectError: + pass + await asyncio.sleep(0.5) + raise TimeoutError(f"Service at {url} did not become healthy") + + +async def interactive_loop(gateway_addr: str, admin_key: str) -> None: + session_key = f"openclaw-{int(time.time())}" + print(f"Session: {session_key}") + print("Type your message (or 'quit' to exit):\n") + + async with httpx.AsyncClient(timeout=120.0) as client: + while True: + try: + user_input = input("You: ") + except (EOFError, KeyboardInterrupt): + break + if user_input.strip().lower() in ("quit", "exit", "q"): + break + if not user_input.strip(): + continue + + resp = await client.post( + f"{gateway_addr}/v1/responses", + json={ + "input": [{"type": "message", "content": user_input}], + "model": "openclaw-agent", + "user": session_key, + }, + headers={"Authorization": f"Bearer {admin_key}"}, + ) + data = resp.json() + + if data.get("status") == "completed": + for item in data.get("output", []): + if item.get("type") == "message": + for block in item.get("content", []): + if block.get("type") == "output_text": + print(f"Agent: {block['text']}") + elif item.get("type") == "function_call": + print(f"[tool] {item.get('name', '')}") + print() + elif data.get("error"): + print(f"Error: {data['error'].get('message', '')[:200]}\n") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Agent Service — OpenClaw") + parser.add_argument( + "--admin-api-key", + default="", + help="Admin API key for inter-service auth (random if omitted)", + ) + parser.add_argument( + "--fileroot", + default=os.path.join(tempfile.gettempdir(), "areal-openclaw"), + help="Working root for logs and name-resolve records", + ) + parser.add_argument( + "--upstream-url", + default=os.environ.get("OPENCLAW_UPSTREAM_BASE_URL", ""), + help="Upstream LLM base URL the OpenClaw subprocess routes to", + ) + parser.add_argument( + "--upstream-model", + default=os.environ.get("OPENCLAW_UPSTREAM_MODEL", "default"), + help="Upstream model id", + ) + args = parser.parse_args() + + # A random admin key keeps the service usable on non-loopback binds + # without the publicly-known default key (see validate_admin_api_key). + admin_key = args.admin_api_key or secrets.token_hex(16) + + upstream_url = args.upstream_url + upstream_key = os.environ.get("OPENCLAW_UPSTREAM_API_KEY", "") + if not upstream_url or not upstream_key: + raise SystemExit( + "OPENCLAW_UPSTREAM_BASE_URL and OPENCLAW_UPSTREAM_API_KEY must be " + "set (export them before running)." + ) + + from areal.infra.scheduler.local import LocalScheduler + + # LocalScheduler validates these paths exist before starting workers. + os.makedirs(os.path.join(args.fileroot, "name_resolve"), exist_ok=True) + scheduler = LocalScheduler( + experiment_name="openclaw-agent-service", + trial_name="run0", + gpu_devices=[], + fileroot=args.fileroot, + ) + + env_vars = { + "OPENCLAW_UPSTREAM_BASE_URL": upstream_url, + "OPENCLAW_UPSTREAM_API_KEY": upstream_key, + "OPENCLAW_UPSTREAM_MODEL": args.upstream_model, + } + for passthrough in ( + "OPENCLAW_BIN", + "OPENCLAW_UPSTREAM_API", + "OPENCLAW_NODE_EXTRA_CA_CERTS", + "OPENCLAW_TLS_INSECURE", + ): + if passthrough in os.environ: + env_vars[passthrough] = os.environ[passthrough] + + ctrl_config = AgentConfig( + agent_cls_path="examples.openclaw.agent.OpenClawAgent", + admin_api_key=admin_key, + scheduling_spec=(SchedulingSpec(env_vars=env_vars),), + ) + ctrl = AgentController(config=ctrl_config, scheduler=scheduler) + + try: + print("Initializing with 1 pair ...") + ctrl.initialize() + print(f" Router: {ctrl.router_addr}") + print(f" Gateway: {ctrl.gateway_addr}") + print(f" Pairs: {len(ctrl.pairs)}") + + asyncio.run(_wait_healthy(f"{ctrl.gateway_addr}/health")) + print("All services ready.\n") + + asyncio.run(interactive_loop(ctrl.gateway_addr, admin_key=admin_key)) + finally: + print("\nShutting down ...") + ctrl.destroy() + print("Done.") + + +if __name__ == "__main__": + main()