Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion areal/experimental/agent_service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
- ``protocol`` — WebSocket frame types and helpers
"""

from .types import AgentRequest, AgentResponse, AgentRunnable, EventEmitter
from .types import (
AgentRequest,
AgentResponse,
AgentRunnable,
EventEmitter,
TrainingContext,
)

__all__ = [
"AgentRequest",
"AgentResponse",
"AgentRunnable",
"EventEmitter",
"TrainingContext",
]
74 changes: 73 additions & 1 deletion areal/experimental/agent_service/data_proxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any

import httpx
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException

from areal.utils import logging

Expand All @@ -24,6 +24,8 @@ class _SessionData:
history: list[dict[str, Any]] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
last_active: float = field(default_factory=time.monotonic)
reward: float | None = None
training_ctx: dict[str, Any] | None = None


def create_data_proxy_app(config: DataProxyConfig) -> FastAPI:
Expand Down Expand Up @@ -145,6 +147,76 @@ async def close_session(session_key: str):
await _close_worker_session(session_key)
return {"status": "ok"}

@app.post("/session/{session_key}/episode/start")
async def episode_start(session_key: str, body: dict[str, Any]):
"""Open a training episode for a session.

Forwards a :class:`TrainingContext` payload to the worker so the
agent can route internal LLM calls through the training proxy.
"""
session = sessions.get(session_key)
if session is None:
session = _SessionData()
sessions[session_key] = session
# A new episode starts from a clean slate: the worker respawns a
# fresh subprocess, so any history/reward carried over from a prior
# episode on the same key would corrupt the new trajectory.
session.history.clear()
session.reward = None
session.training_ctx = dict(body)
session.last_active = time.monotonic()

resp = await http_client.post(
f"{config.worker_addr}/session/{session_key}/episode/start",
json=body,
)
resp.raise_for_status()
return resp.json()

@app.post("/session/{session_key}/episode/end")
async def episode_end(session_key: str, body: dict[str, Any]):
"""Close a training episode and forward final reward to the worker.

``body``: ``{"reward": <float|None>}``. Reward defaults to the
last value set via ``/session/{key}/reward``.
"""
session = sessions.get(session_key)
reward = body.get("reward")
if reward is None and session is not None:
reward = session.reward

resp = await http_client.post(
f"{config.worker_addr}/session/{session_key}/episode/end",
json={"reward": reward},
)
resp.raise_for_status()

if session is not None:
# The reward has been consumed by this episode; clear it so a
# subsequent episode on the same key does not inherit a stale value.
session.reward = None
session.last_active = time.monotonic()
return resp.json()

@app.post("/session/{session_key}/reward")
async def set_reward(session_key: str, body: dict[str, Any]):
"""Record a scalar reward for the session.

The reward is buffered here and forwarded to the worker on the
next ``episode/end`` call. Layer 2 will additionally relay it
to the ProxyGateway's ``/rl/set_reward`` endpoint for training.
"""
session = sessions.get(session_key)
if session is None:
session = _SessionData()
sessions[session_key] = session
reward = body.get("reward")
if isinstance(reward, bool) or not isinstance(reward, (int, float)):
raise HTTPException(status_code=400, detail="reward must be a number")
session.reward = float(reward)
session.last_active = time.monotonic()
return {"status": "ok", "reward": session.reward}

@app.get("/session/{session_key}/history")
async def get_history(session_key: str):
session = sessions.get(session_key)
Expand Down
47 changes: 43 additions & 4 deletions areal/experimental/agent_service/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ class AgentResponse:
metadata: dict[str, Any] = field(default_factory=dict)


@dataclass
class TrainingContext:
"""Training-side handle injected into an agent at episode start.

Carries everything an agent needs to make its internal LLM calls flow
through AReaL's training pipeline (token + logprob capture). Pass to
an agent's optional ``on_episode_start`` hook (see ``AgentRunnable``).

Attributes:
session_id: Opaque episode identifier for trajectory binding.
llm_base_url: OpenAI-compatible base URL of the proxy gateway
that captures training data. Agents should configure their
internal LLM client to use this as the upstream endpoint.
llm_api_key: Per-episode key bound to ``session_id``. Typically a
``sk-sess-*`` value returned by ``/rl/start_session``.
llm_model: Model identifier to use for inference. Empty string
means "agent decides".
extras: Implementation-specific overrides (e.g. tool_call_parser).
"""

session_id: str
llm_base_url: str
llm_api_key: str
llm_model: str = ""
extras: dict[str, Any] = field(default_factory=dict)


class EventEmitter(Protocol):
"""Callback interface for streaming events from agent to caller."""

Expand All @@ -52,10 +79,22 @@ class AgentRunnable(Protocol):
interaction — the Agent Service only provides session lifecycle and
event streaming.

Reward computation is **not** part of this interface. Rewards are
calculated externally by the training pipeline (via reward functions
applied to exported trajectories), following AReaL's standard RLVR
pattern.
Only ``run`` is required. The following methods are optional and
discovered via ``getattr`` at runtime — implement them to participate
in training-related lifecycle:

- ``async close_session(session_key)`` — release per-session state
when a session is closed by the DataProxy.
- ``async close_all_sessions()`` — clean up everything on worker
shutdown.
- ``async on_episode_start(session_key, training_ctx)`` — receive a
:class:`TrainingContext` so the agent can route its internal LLM
calls through the proxy gateway. Called once per RL episode,
before the first ``run`` of that episode.
- ``async on_episode_end(session_key, reward)`` — called once when
an episode terminates (with the final scalar reward, if any).
Agents wiring trajectories to the training pipeline should flush
/ finalize here.
"""

async def run(
Expand Down
40 changes: 39 additions & 1 deletion areal/experimental/agent_service/worker/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from areal.utils.dynamic_import import import_from_string

from ..protocol import QueueMode
from ..types import AgentRequest, AgentResponse, AgentRunnable
from ..types import AgentRequest, AgentResponse, AgentRunnable, TrainingContext

logger = logging.getLogger("AgentWorker")

Expand Down Expand Up @@ -52,6 +52,44 @@ def create_worker_app(
async def health():
return {"status": "ok"}

@app.post("/session/{session_key}/episode/start")
async def episode_start(session_key: str, body: dict[str, Any]):
hook = getattr(agent, "on_episode_start", None)
if hook is None:
return {"status": "noop"}
ctx = TrainingContext(
session_id=body.get("session_id", session_key),
llm_base_url=body.get("llm_base_url", ""),
llm_api_key=body.get("llm_api_key", ""),
llm_model=body.get("llm_model", ""),
extras=body.get("extras", {}),
)
try:
await hook(session_key, ctx)
except Exception as exc:
logger.exception("on_episode_start failed (session=%s)", session_key)
return JSONResponse(
{"error": {"message": str(exc), "type": type(exc).__name__}},
status_code=500,
)
return {"status": "ok"}

@app.post("/session/{session_key}/episode/end")
async def episode_end(session_key: str, body: dict[str, Any]):
hook = getattr(agent, "on_episode_end", None)
if hook is None:
return {"status": "noop"}
reward = body.get("reward")
try:
await hook(session_key, reward)
except Exception as exc:
logger.exception("on_episode_end failed (session=%s)", session_key)
return JSONResponse(
{"error": {"message": str(exc), "type": type(exc).__name__}},
status_code=500,
)
return {"status": "ok"}

@app.post("/session/{session_key}/close")
async def close_session(session_key: str):
close_fn = getattr(agent, "close_session", None)
Expand Down
162 changes: 161 additions & 1 deletion examples/openclaw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,167 @@ the OpenAI chat-completions protocol.
**Disclaimer**: RL-finetuned models may exhibit unexpected behaviors. Please ensure
strict permission rules and an isolated execution environment for your agent runtime.

## Prerequisites
## Two ways to use this example

This directory ships two complementary entry points:

| Mode | Entry point | What it does |
| ----------------- | ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| **Agent Service** | `run_agent_service.py` | Hosts OpenClaw behind AReaL's Agent Service so external clients call it over an OpenAI-compatible HTTP API. Start here. |
| **RL training** | `train.py` | Drives end-to-end RL (PPO) where every agent LLM call flows through the training proxy gateway. Documented from *Prerequisites* below. |

The two share the same building block — the `OpenClawAgent` ([`agent.py`](agent.py)) —
and the same lifecycle hooks, so a setup that serves traffic today can be wired into RL
training tomorrow.

## Serving OpenClaw via the Agent Service

### Design

AReaL's Agent Service is a small fleet of cooperating processes; OpenClaw plugs in as
the per-worker *agent runtime*:

```
client ──HTTP /v1/responses──▶ Gateway ──▶ Router ──▶ DataProxy ──▶ Worker
(auth) (session (per-session (hosts OpenClawAgent)
routing) history) │
│ spawns + drives
OpenClaw gateway subprocess
(one per session)
upstream LLM (your model)
```

- **Gateway** — public OpenAI-compatible surface (`/v1/responses`); enforces the admin
API key.
- **Router** — maps each `session_key` to a worker; owns health state.
- **DataProxy** — keeps the conversation history for a session and replays it to the
worker each turn.
- **Worker** — loads the class named by `AgentConfig.agent_cls_path` (here
`examples.openclaw.agent.OpenClawAgent`) and exposes it as an `AgentRunnable`.

**Why one OpenClaw subprocess per session?** OpenClaw's configuration (provider,
upstream key, model) is *process-global*. RL requires each session's turns to be
attributable to a distinct per-episode upstream key (`sk-sess-*`), so logical isolation
inside a single process is not enough — each session gets its own OpenClaw process bound
to its own upstream. `OpenClawAgent` manages this:

- `on_episode_start(session_key, training_ctx)` — pick a free port, render a temporary
`openclaw.json`, spawn `openclaw gateway`, and wait until `/v1/models` is healthy. The
upstream is taken from the injected `TrainingContext` (RL) or the
`OPENCLAW_UPSTREAM_*` environment (serving).
- `run(request)` — one turn = one `POST /v1/chat/completions` to the session's own
subprocess, with `model: "openclaw/default"` and header `x-openclaw-session-key`; the
SSE stream is forwarded as deltas / tool calls. If no episode was opened (plain
serving), the subprocess is spawned lazily from the env upstream.
- `on_episode_end` / `close_session` / `close_all_sessions` — terminate the subprocess
(SIGTERM → SIGKILL fallback), close clients, and remove the temp config directory.

### Prerequisites (Agent Service)

1. **AReaL** installed (see *Install AReaL* below).
1. **OpenClaw CLI** on `PATH`:
```bash
npm install -g openclaw
openclaw --version
```
1. An **upstream LLM** reachable over an OpenAI-compatible (or Anthropic) API, with a
base URL, API key, and model id.

### Launch the service

```bash
export OPENCLAW_UPSTREAM_BASE_URL="https://your-llm/v1" # OpenAI-compatible endpoint
export OPENCLAW_UPSTREAM_API_KEY="sk-..." # upstream model key
export OPENCLAW_UPSTREAM_MODEL="claude-sonnet-4-6" # upstream model id

python examples/openclaw/run_agent_service.py
```

This boots one Worker + DataProxy pair behind a Router and Gateway, then drops into an
interactive prompt. Expected output:

```
Initializing with 1 pair ...
Router: http://x.x.x.x:xxxxx
Gateway: http://x.x.x.x:xxxxx
Pairs: 1
All services ready.

You: Reply with exactly: hello
Agent: hello
```

The launcher generates a random admin API key by default (a fixed unique key is required
when the Gateway binds to a non-loopback interface). Pass `--admin-api-key <secret>` to
set your own, `--upstream-url` / `--upstream-model` to override the env, and
`--fileroot <dir>` to relocate logs and name-resolve records (defaults to a temp
directory, created automatically).

### Provide the agent service (call it from a client)

Once running, any OpenAI-`/v1/responses`-style client can talk to the Gateway. Use the
admin API key as the bearer token and a stable `user` to pin a session:

```bash
curl -s http://<gateway>/v1/responses \
-H "Authorization: Bearer <admin-api-key>" \
-H "Content-Type: application/json" \
-d '{
"model": "openclaw-agent",
"user": "session-123",
"input": [{"type": "message", "content": "Hello!"}]
}'
```

Reusing the same `user` across calls keeps the conversation history (held by the
DataProxy) and reuses the same OpenClaw subprocess.

### Wiring into RL training

The same agent participates in RL through three DataProxy endpoints (the controller
calls these once it can mint per-session keys):

| Endpoint | Effect |
| ----------------------------------- | ---------------------------------------------------------------------------------- |
| `POST /session/{key}/episode/start` | Forwards a `TrainingContext` → `on_episode_start` spawns a subprocess bound to it. |
| `POST /session/{key}/reward` | Buffers a scalar reward for the session. |
| `POST /session/{key}/episode/end` | Forwards the final reward → `on_episode_end` tears the subprocess down. |

The `TrainingContext` carries `llm_base_url` / `llm_api_key` / `llm_model`, pointing
OpenClaw's upstream at AReaL's proxy gateway so tokens and log-probabilities are
captured for training.

### Environment variables

| Variable | Purpose | Default |
| ------------------------------ | ---------------------------------------------------- | --------------------- |
| `OPENCLAW_UPSTREAM_BASE_URL` | Upstream LLM base URL (serving fallback) | — (required to serve) |
| `OPENCLAW_UPSTREAM_API_KEY` | Upstream LLM API key | — (required to serve) |
| `OPENCLAW_UPSTREAM_MODEL` | Upstream model id | `default` |
| `OPENCLAW_UPSTREAM_API` | `openai-completions` or `anthropic-messages` | `openai-completions` |
| `OPENCLAW_BIN` | OpenClaw executable | `openclaw` |
| `OPENCLAW_TIMEOUT` | Per-request timeout (seconds) | `120` |
| `OPENCLAW_STARTUP_TIMEOUT` | Subprocess health-wait (seconds) | `60` |
| `OPENCLAW_NODE_EXTRA_CA_CERTS` | CA bundle for the upstream TLS cert (preferred) | unset |
| `OPENCLAW_TLS_INSECURE` | `1` sets `NODE_TLS_REJECT_UNAUTHORIZED=0` (dev only) | unset |

Legacy `OPENCLAW_GATEWAY_URL` / `OPENCLAW_GATEWAY_TOKEN` / `OPENCLAW_MODEL` are still
accepted as fallbacks for the upstream URL / key / model.

### Troubleshooting

- **`upstream provider timeout` / `UNABLE_TO_GET_ISSUER_CERT_LOCALLY`** — Node cannot
verify the upstream's TLS certificate (common with corporate CAs). Point
`OPENCLAW_NODE_EXTRA_CA_CERTS` at the CA bundle, or, for local dev only, set
`OPENCLAW_TLS_INSECURE=1`.
- **`Refusing to start server ... default admin API key`** — the Gateway binds to a
routable interface. Pass a unique `--admin-api-key` (the launcher already generates
one by default).
- **`name_resolve... does not exist` / `fileroot ... is None`** — pass
`--fileroot <dir>` (auto-created); the default temp directory normally avoids this.

1. A GPU machine with at least **2 NVIDIA GPUs** (compute capability 8.0 or higher, i.e.
Ampere / Hopper).
Expand Down
Loading
Loading