diff --git a/packages/browseros-agent/apps/server/src/api/routes/openclaw.ts b/packages/browseros-agent/apps/server/src/api/routes/openclaw.ts index ab3bbd0a7..091acb42b 100644 --- a/packages/browseros-agent/apps/server/src/api/routes/openclaw.ts +++ b/packages/browseros-agent/apps/server/src/api/routes/openclaw.ts @@ -581,7 +581,7 @@ export function createOpenClawRoutes() { sessionKey, composedMessage, history, - { messageParts }, + { messageParts, signal: c.req.raw.signal }, ) c.header('Content-Type', 'text/event-stream') @@ -658,6 +658,31 @@ export function createOpenClawRoutes() { } }) + .post('/agents/:id/chat/stop', async (c) => { + const { id } = c.req.param() + let body: { sessionKey?: string; runId?: string } + try { + body = await c.req.json<{ sessionKey?: string; runId?: string }>() + } catch { + return c.json({ error: 'Invalid JSON body' }, 400) + } + if (!body.sessionKey?.trim()) { + return c.json({ error: 'sessionKey is required' }, 400) + } + try { + const result = await getOpenClawService().abortChat( + id, + body.sessionKey, + body.runId, + ) + return c.json({ ok: true, ...result }) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + logger.warn('Chat abort failed', { agentId: id, error: message }) + return c.json({ error: message }, 500) + } + }) + .post('/agents/:id/queue', async (c) => { const { id } = c.req.param() const body = await c.req.json<{ diff --git a/packages/browseros-agent/apps/server/src/api/services/openclaw/container-runtime.ts b/packages/browseros-agent/apps/server/src/api/services/openclaw/container-runtime.ts index b36cb9ed2..29a94d946 100644 --- a/packages/browseros-agent/apps/server/src/api/services/openclaw/container-runtime.ts +++ b/packages/browseros-agent/apps/server/src/api/services/openclaw/container-runtime.ts @@ -176,6 +176,52 @@ export class ContainerRuntime { ]) } + /** + * Call a gateway WS RPC method via OpenClaw's CLI from inside the gateway + * container. We do this (instead of opening our own WS from outside) so + * the connection appears as `direct_local` to the gateway, which preserves + * full operator scopes for token-mode auth — including `operator.write`, + * which is required by chat.send / chat.abort / sessions.abort. + * + * Throws on non-zero exit, on transport failure, or when stdout cannot be + * parsed as JSON. + */ + async callGatewayRpc(input: { + method: string + params?: Record + token: string + timeoutMs?: number + }): Promise { + const args = [ + 'node', + 'dist/index.js', + 'gateway', + 'call', + input.method, + '--params', + JSON.stringify(input.params ?? {}), + '--token', + input.token, + '--json', + ] + if (input.timeoutMs !== undefined) { + args.push('--timeout', String(input.timeoutMs)) + } + const result = await this.runInContainer(args) + if (result.exitCode !== 0) { + throw new Error( + `gateway call ${input.method} exit=${result.exitCode}: ${result.stderr.trim() || result.stdout.trim()}`, + ) + } + const parsed = parseGatewayCallStdout(result.stdout) + if (parsed === null) { + throw new Error( + `gateway call ${input.method}: failed to parse stdout: ${result.stdout.slice(0, 200)}`, + ) + } + return parsed + } + async runGatewaySetupCommand( command: string[], spec: GatewayContainerSpec, @@ -315,3 +361,64 @@ export class ContainerRuntime { return hostPathToGuest(path) } } + +/** + * Parse the stdout of `openclaw gateway call --json`. Tries the trimmed + * stdout as a single JSON object first; on failure, scans for the largest + * balanced JSON object (handles cases where logs land on stdout above the + * payload). + */ +function parseGatewayCallStdout(stdout: string): T | null { + const trimmed = stdout.trim() + if (!trimmed) return null + try { + return JSON.parse(trimmed) as T + } catch {} + // Fallback: scan for largest balanced JSON substring. + let bestStart = -1 + let bestEnd = -1 + for (let i = 0; i < trimmed.length; i++) { + const ch = trimmed[i] + if (ch !== '{' && ch !== '[') continue + const end = endOfJsonAt(trimmed, i) + if (end > i && end - i > bestEnd - bestStart) { + bestStart = i + bestEnd = end + } + } + if (bestStart < 0) return null + try { + return JSON.parse(trimmed.slice(bestStart, bestEnd + 1)) as T + } catch { + return null + } +} + +function endOfJsonAt(s: string, start: number): number { + const stack: string[] = [s[start] === '{' ? '}' : ']'] + let inString = false + let escaped = false + for (let i = start + 1; i < s.length; i++) { + const ch = s[i] + if (inString) { + if (escaped) { + escaped = false + continue + } + if (ch === '\\') escaped = true + else if (ch === '"') inString = false + continue + } + if (ch === '"') { + inString = true + continue + } + if (ch === '{') stack.push('}') + else if (ch === '[') stack.push(']') + else if (ch === stack[stack.length - 1]) { + stack.pop() + if (stack.length === 0) return i + } + } + return -1 +} diff --git a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-http-client.ts b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-http-client.ts index 3e67be651..fee8fc75b 100644 --- a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-http-client.ts +++ b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-http-client.ts @@ -2,22 +2,26 @@ * @license * Copyright 2025 BrowserOS * SPDX-License-Identifier: AGPL-3.0-or-later + * + * HTTP-side client for OpenClaw operations that still go over /v1 HTTP: + * - `isAuthenticated()` — health probe used by isGatewayAvailable. + * - `getSessionHistory(...)` / `streamSessionHistory(...)` — read history + * by session key. (Chat sending moved to WS chat.send via the CLI; see + * OpenClawService.chatStream.) + * + * The legacy `streamChat()` over /v1/chat/completions was removed when chat + * migrated to WS — that path doesn't register runs in OpenClaw's abort + * registry, so chat.abort can't stop them. WS chat.send does, which is what + * makes the Stop button work end-to-end. */ import { createParser, type EventSourceMessage } from 'eventsource-parser' import { OpenClawSessionNotFoundError } from './errors' -import type { OpenClawStreamEvent } from './openclaw-types' - -export interface OpenClawChatHistoryMessage { - role: 'user' | 'assistant' - content: string -} /** - * OpenAI-compatible content parts for multimodal user messages. OpenClaw's - * gateway accepts the standard `content: [{ type: 'text', ... }, { type: - * 'image_url', image_url: { url } }]` shape on /v1/chat/completions and - * routes it to whichever upstream provider the agent's model points at. + * OpenAI-compatible content parts for multimodal user messages. Used by the + * route's attachment validator and translated to OpenClaw chat.send + * `attachments` shape inside OpenClawService.chatStream. */ export type OpenClawChatContentPart = | { type: 'text'; text: string } @@ -26,18 +30,6 @@ export type OpenClawChatContentPart = image_url: { url: string; detail?: 'auto' | 'low' | 'high' } } -export interface OpenClawChatRequest { - agentId: string - sessionKey: string - message: string - // When present, sent as the user message's `content` array verbatim. The - // legacy string `message` is folded into a leading text part if no text - // part is present in `messageParts`. - messageParts?: OpenClawChatContentPart[] - history?: OpenClawChatHistoryMessage[] - signal?: AbortSignal -} - export interface OpenClawSessionHistoryMessage { role: 'user' | 'assistant' | 'system' | 'tool' content: string @@ -79,19 +71,6 @@ export class OpenClawHttpClient { private readonly getToken: () => Promise, ) {} - async streamChat( - input: OpenClawChatRequest, - ): Promise> { - const response = await this.fetchChat(input) - const body = response.body - - if (!body) { - throw new Error('OpenClaw chat response had no body') - } - - return createEventStream(body, input.signal) - } - async getSessionHistory( sessionKey: string, input: OpenClawSessionHistoryInput = {}, @@ -132,40 +111,6 @@ export class OpenClawHttpClient { } } - private async fetchChat(input: OpenClawChatRequest): Promise { - const token = await this.getToken() - const userContent = buildUserContent(input) - const response = await fetch( - `http://127.0.0.1:${this.hostPort}/v1/chat/completions`, - { - method: 'POST', - headers: { - Authorization: `Bearer ${token}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - model: resolveAgentModel(input.agentId), - stream: true, - messages: [ - ...(input.history ?? []), - { role: 'user', content: userContent }, - ], - user: `browseros:${input.agentId}:${input.sessionKey}`, - }), - signal: input.signal, - }, - ) - - if (response.ok) { - return response - } - - const detail = await response.text() - throw new Error( - detail || `OpenClaw chat failed with status ${response.status}`, - ) - } - private async fetchSessionHistory( sessionKey: string, input: OpenClawSessionHistoryInput, @@ -211,214 +156,6 @@ function buildHistoryPath( }` } -function resolveAgentModel(agentId: string): string { - return agentId === 'main' ? 'openclaw' : `openclaw/${agentId}` -} - -/** - * Build the OpenAI-compatible `content` payload for the trailing user - * message. When the caller supplies multimodal parts via `messageParts`, - * use them as-is, ensuring at least one text part is present (we fold the - * legacy `message` string in as a leading text part if not). Otherwise, - * fall back to a plain string `content` so simple text-only sends keep - * the same wire shape we've always sent. - */ -function buildUserContent( - input: OpenClawChatRequest, -): string | OpenClawChatContentPart[] { - if (!input.messageParts || input.messageParts.length === 0) { - return input.message - } - - const hasText = input.messageParts.some((p) => p.type === 'text') - if (hasText) return input.messageParts - - const trimmed = input.message.trim() - if (!trimmed) return input.messageParts - - return [{ type: 'text', text: input.message }, ...input.messageParts] -} - -function createEventStream( - body: ReadableStream, - signal?: AbortSignal, -): ReadableStream { - return new ReadableStream({ - start(controller) { - void pumpChatEvents(body, controller, signal) - }, - }) -} - -async function pumpChatEvents( - body: ReadableStream, - controller: ReadableStreamDefaultController, - signal?: AbortSignal, -): Promise { - const reader = body.getReader() - const decoder = new TextDecoder() - let text = '' - let done = false - const parser = createParser({ - onEvent(message) { - if (done) return - const nextText = updateAccumulatedText(message, text) - done = handleMessage(message, controller, nextText, done) - if (!done) { - text = nextText - } - }, - }) - - try { - while (true) { - if (signal?.aborted) { - await reader.cancel() - done = true - controller.close() - return - } - - const { done: streamDone, value } = await reader.read() - if (streamDone) break - parser.feed(decoder.decode(value, { stream: true })) - } - } catch (error) { - if (!done) { - controller.enqueue({ - type: 'error', - data: { - message: error instanceof Error ? error.message : String(error), - }, - }) - done = true - controller.close() - } - } finally { - if (!done) { - controller.close() - } - reader.releaseLock() - } -} - -function handleMessage( - message: EventSourceMessage, - controller: ReadableStreamDefaultController, - text: string, - done: boolean, -): boolean { - if (message.data === '[DONE]') { - return finishStream(controller, text, done) - } - - const chunk = parseChunk(message.data) - if (!chunk) { - controller.enqueue({ - type: 'error', - data: { message: 'Failed to parse OpenClaw chat stream chunk' }, - }) - controller.close() - return true - } - - for (const event of mapChunkToEvents(chunk)) { - controller.enqueue(event) - } - - return hasFinishReason(chunk) ? finishStream(controller, text, done) : false -} - -function updateAccumulatedText( - message: EventSourceMessage, - text: string, -): string { - const chunk = parseChunk(message.data) - if (!chunk) return text - - let next = text - for (const choice of readChoices(chunk)) { - const delta = readDeltaText(choice) - if (delta) { - next += delta - } - } - return next -} - -function finishStream( - controller: ReadableStreamDefaultController, - text: string, - done: boolean, -): boolean { - if (!done) { - if (!text.trim()) { - controller.enqueue({ - type: 'error', - data: { - message: "Agent couldn't generate a response. Please try again.", - }, - }) - controller.close() - return true - } - controller.enqueue({ - type: 'done', - data: { text }, - }) - controller.close() - } - - return true -} - -function mapChunkToEvents( - chunk: Record, -): OpenClawStreamEvent[] { - const events: OpenClawStreamEvent[] = [] - - for (const choice of readChoices(chunk)) { - const delta = readDeltaText(choice) - if (delta) { - events.push({ - type: 'text-delta', - data: { text: delta }, - }) - } - } - - return events -} - -function hasFinishReason(chunk: Record): boolean { - return readChoices(chunk).some((choice) => !!readFinishReason(choice)) -} - -function readChoices( - chunk: Record, -): Array> { - const choices = chunk.choices - return Array.isArray(choices) - ? choices.filter( - (choice): choice is Record => - !!choice && typeof choice === 'object', - ) - : [] -} - -function readDeltaText(choice: Record): string { - const delta = choice.delta - if (!delta || typeof delta !== 'object') return '' - - const content = (delta as Record).content - return typeof content === 'string' ? content : '' -} - -function readFinishReason(choice: Record): string | null { - const reason = choice.finish_reason - return typeof reason === 'string' && reason ? reason : null -} - function parseChunk(data: string): Record | null { try { return JSON.parse(data) as Record diff --git a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-observer.ts b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-observer.ts index a77f50734..fbf838cc1 100644 --- a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-observer.ts +++ b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-observer.ts @@ -40,6 +40,27 @@ type IncomingFrame = } | { type: 'event'; event: string; payload?: unknown } +type EventListener = (payload: unknown) => void + +interface PendingRequest { + resolve: (payload: unknown) => void + reject: (err: Error) => void + timer: ReturnType + method: string +} + +const DEFAULT_RPC_TIMEOUT_MS = 10_000 + +export class OpenClawRpcError extends Error { + constructor( + public readonly code: string, + message: string, + ) { + super(message) + this.name = 'OpenClawRpcError' + } +} + // --------------------------------------------------------------------------- // Observer // --------------------------------------------------------------------------- @@ -51,6 +72,9 @@ export class OpenClawObserver { private closed = false private gatewayUrl: string | null = null private gatewayToken: string | null = null + private nextRequestId = 1 + private readonly pendingRequests = new Map() + private readonly eventListeners = new Map>() constructor(private readonly session: ClawSession) {} @@ -66,6 +90,7 @@ export class OpenClawObserver { disconnect(): void { this.closed = true this.clearReconnect() + this.failAllPendingRequests('OpenClaw observer disconnected') if (this.ws) { try { this.ws.close() @@ -80,6 +105,65 @@ export class OpenClawObserver { return this.connected } + /** + * Send an RPC request over the WS. The observer must already be connected + * (and past handshake). Returns the response payload, or rejects with an + * OpenClawRpcError on a non-ok response, or with a plain Error on transport + * failure / timeout. + */ + async request( + method: string, + params: Record = {}, + timeoutMs = DEFAULT_RPC_TIMEOUT_MS, + ): Promise { + if (!this.connected || !this.ws) { + throw new Error(`OpenClaw observer not connected; cannot call ${method}`) + } + const id = `bos-${this.nextRequestId++}` + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + if (this.pendingRequests.delete(id)) { + reject(new Error(`OpenClaw RPC timed out: ${method}`)) + } + }, timeoutMs) + this.pendingRequests.set(id, { + resolve: (payload) => resolve(payload as T), + reject, + timer, + method, + }) + try { + this.ws!.send(JSON.stringify({ type: 'req', id, method, params })) + } catch (err) { + this.pendingRequests.delete(id) + clearTimeout(timer) + reject( + err instanceof Error ? err : new Error(`failed to send ${method}`), + ) + } + }) + } + + /** + * Subscribe to broadcast events with a given name (e.g. 'chat', 'agent'). + * Returns an unsubscribe function. Listeners are invoked AFTER the + * observer's own state-machine routing for the same event. + */ + on(eventName: string, listener: EventListener): () => void { + let set = this.eventListeners.get(eventName) + if (!set) { + set = new Set() + this.eventListeners.set(eventName, set) + } + set.add(listener) + return () => { + const current = this.eventListeners.get(eventName) + if (!current) return + current.delete(listener) + if (current.size === 0) this.eventListeners.delete(eventName) + } + } + // ── Private ───────────────────────────────────────────────────────── private doConnect(): void { @@ -124,15 +208,21 @@ export class OpenClawObserver { params: { minProtocol: PROTOCOL_VERSION, maxProtocol: PROTOCOL_VERSION, + // gateway-client + backend mode is OpenClaw's documented "trusted + // backend self-connect" identity. Avoids the CONTROL_UI_DEVICE_IDENTITY_REQUIRED + // gate that 'openclaw-tui' triggers (TUI/control-ui clients are + // required to present a paired device identity, which we don't + // want for our local single-client backend integration). + // We omit `scopes` so the gateway grants the default operator + // scope set for shared-secret auth. client: { - id: 'openclaw-tui', + id: 'gateway-client', displayName: 'browseros-observer', version: '1.0.0', platform: 'node', - mode: 'ui', + mode: 'backend', }, role: 'operator', - scopes: ['operator.read'], auth: { token: this.gatewayToken }, }, } @@ -155,9 +245,28 @@ export class OpenClawObserver { return } + // RPC response routing for caller-issued requests. + if (frame.type === 'res' && this.pendingRequests.has(frame.id)) { + const pending = this.pendingRequests.get(frame.id)! + this.pendingRequests.delete(frame.id) + clearTimeout(pending.timer) + if (frame.ok) { + pending.resolve(frame.payload) + } else { + pending.reject( + new OpenClawRpcError( + frame.error?.code ?? 'unknown', + frame.error?.message ?? `${pending.method} failed`, + ), + ) + } + return + } + // Broadcast events (only process after handshake completes) if (frame.type === 'event' && this.connected) { this.handleEvent(frame.event, frame.payload) + this.dispatchToListeners(frame.event, frame.payload) } }) @@ -165,6 +274,10 @@ export class OpenClawObserver { clearTimeout(connectTimeout) this.connected = false this.ws = null + // Anything we sent and never got a response for must reject — the + // socket is gone, the response will never come. Listeners (event + // subscriptions) are kept across reconnects. + this.failAllPendingRequests('OpenClaw observer connection closed') // Reset any agents stuck in "working" to "unknown" — we missed // the final/end event because the WS closed mid-task. The @@ -244,6 +357,33 @@ export class OpenClawObserver { this.reconnectTimer = null } } + + private dispatchToListeners(eventName: string, payload: unknown): void { + const set = this.eventListeners.get(eventName) + if (!set || set.size === 0) return + // Snapshot so a listener can call its own unsubscribe inside the loop + // without mutating the iteration. + for (const listener of [...set]) { + try { + listener(payload) + } catch (err) { + logger.warn('OpenClaw observer listener threw', { + event: eventName, + error: err instanceof Error ? err.message : String(err), + }) + } + } + } + + private failAllPendingRequests(reason: string): void { + if (this.pendingRequests.size === 0) return + const pendings = [...this.pendingRequests.values()] + this.pendingRequests.clear() + for (const pending of pendings) { + clearTimeout(pending.timer) + pending.reject(new Error(reason)) + } + } } // --------------------------------------------------------------------------- diff --git a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-service.ts b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-service.ts index cb81bf7fd..5e730ecb2 100644 --- a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-service.ts +++ b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-service.ts @@ -1199,38 +1199,338 @@ export class OpenClawService { // ── Chat Stream (HTTP) ─────────────────────────────────────────────── + /** + * Stream a chat with the agent. + * + * Sends the prompt via OpenClaw's WS `chat.send` RPC (called through the + * OpenClaw CLI from inside the gateway container so it appears as + * direct_local and gets full operator scope). Tokens and tool events + * stream back as `chat`/`agent` broadcasts on the WS observer. + * + * Runs initiated this way are registered in OpenClaw's chatAbortControllers + * registry and can be aborted via `abortChat()` — that's what makes the + * Stop button actually work. + * + * The returned stream emits richer events than the legacy HTTP path: + * text deltas, reasoning, tool start/end with arguments and durations, + * lifecycle phases, usage, and typed errors. + */ async chatStream( agentId: string, sessionKey: string, message: string, - history: MonitoringChatTurn[] = [], + _history: MonitoringChatTurn[] = [], options: { messageParts?: OpenClawChatContentPart[] signal?: AbortSignal + idempotencyKey?: string } = {}, ): Promise> { await this.assertGatewayReady() - const normalizedSessionKey = normalizeBrowserOSChatSessionKey( - agentId, - sessionKey, - ) + const innerKey = normalizeBrowserOSChatSessionKey(agentId, sessionKey) + const fullKey = toOpenClawBrowserOSSessionKey(agentId, innerKey) + + // Translate our internal multimodal `messageParts` (OpenAI shape) to + // OpenClaw chat.send `attachments` (the shape its normalizer expects). + const attachments = messagePartsToOpenClawAttachments(options.messageParts) + logger.info('Starting OpenClaw chat stream', { agentId, - sessionKey: normalizedSessionKey, + sessionKey: innerKey, messageLength: message.length, - historyLength: history.length, - contentPartCount: options.messageParts?.length ?? 0, + attachmentCount: attachments.length, }) - return this.runControlPlaneCall(() => - this.httpClient.streamChat({ - agentId, - sessionKey: normalizedSessionKey, - message, - messageParts: options.messageParts, - history, - signal: options.signal, - }), + + // Ensure the observer is connected before we send — we need its + // broadcast subscription to receive tokens for the run we're about + // to start. runControlPlaneCall's side effect (ensureObserverConnected) + // handles this. + await this.runControlPlaneCall(async () => undefined) + if (!this.observer.isConnected()) { + throw new Error('OpenClaw observer is not connected; cannot stream chat') + } + + const { runId } = await this.sendChatViaCli({ + sessionKey: fullKey, + message, + attachments, + idempotencyKey: options.idempotencyKey, + }) + + const observer = this.observer + const abortChat = (rid: string) => + this.abortChat(agentId, sessionKey, rid).catch(() => undefined) + return new ReadableStream({ + start: (controller) => { + let closed = false + let assistantTextLen = 0 + let reasoningTextLen = 0 + const toolStartedAt = new Map() + let lastEmittedText = '' + + const close = (final?: OpenClawStreamEvent) => { + if (closed) return + closed = true + if (final) controller.enqueue(final) + offChat() + offAgent() + if (signalListener) { + options.signal?.removeEventListener('abort', signalListener) + } + controller.close() + } + + const onChat = (rawPayload: unknown) => { + if (closed) return + if (!isPlainObject(rawPayload)) return + if (rawPayload.runId !== runId) return + + const state = rawPayload.state + if (state === 'aborted') { + close({ type: 'lifecycle', data: { phase: 'aborted', runId } }) + return + } + if (state === 'error') { + close({ + type: 'error', + data: { + message: + typeof rawPayload.errorMessage === 'string' + ? rawPayload.errorMessage + : 'OpenClaw chat run failed', + errorKind: + typeof rawPayload.errorKind === 'string' + ? rawPayload.errorKind + : undefined, + }, + }) + return + } + + // Walk message.content blocks. Text comes as cumulative; we + // compute true deltas. Reasoning is similar. + const message = isPlainObject(rawPayload.message) + ? rawPayload.message + : null + const content = message + ? isArray(message.content) + ? message.content + : [] + : [] + + let cumulativeText = '' + let cumulativeReasoning = '' + for (const block of content) { + if (!isPlainObject(block)) continue + if (block.type === 'text' && typeof block.text === 'string') { + cumulativeText += block.text + } else if (block.type === 'thinking') { + const thinking = + (typeof block.thinking === 'string' && block.thinking) || + (typeof block.text === 'string' && block.text) || + '' + cumulativeReasoning += thinking + } + // toolCall blocks are surfaced via the `agent` event with stream:'tool' + } + + // Filter BrowserOS-specific sentinels (HEARTBEAT etc). + if (cumulativeText.startsWith('HEARTBEAT')) return + + if (cumulativeText.length > assistantTextLen) { + const delta = cumulativeText.slice(assistantTextLen) + assistantTextLen = cumulativeText.length + lastEmittedText = cumulativeText + controller.enqueue({ type: 'text-delta', data: { text: delta } }) + } + if (cumulativeReasoning.length > reasoningTextLen) { + const delta = cumulativeReasoning.slice(reasoningTextLen) + reasoningTextLen = cumulativeReasoning.length + controller.enqueue({ + type: 'reasoning-delta', + data: { text: delta }, + }) + } + + if (state === 'final') { + // Surface usage if present + const usage = isPlainObject(message?.usage) + ? (message.usage as Record) + : null + if (usage) { + const data: Record = {} + if (typeof usage.input === 'number') data.tokensIn = usage.input + if (typeof usage.output === 'number') + data.tokensOut = usage.output + const cost = isPlainObject(usage.cost) ? usage.cost : null + if (cost && typeof cost.total === 'number') + data.costUsd = cost.total + if (Object.keys(data).length > 0) { + controller.enqueue({ type: 'usage', data }) + } + } + close({ type: 'done', data: { text: lastEmittedText } }) + } + } + + const onAgent = (rawPayload: unknown) => { + if (closed) return + if (!isPlainObject(rawPayload)) return + if (rawPayload.runId !== runId) return + + const stream = rawPayload.stream + const data = isPlainObject(rawPayload.data) ? rawPayload.data : {} + + if (stream === 'tool') { + const phase = data.phase + const toolCallId = + typeof data.toolCallId === 'string' ? data.toolCallId : null + if (!toolCallId) return + if (phase === 'start') { + toolStartedAt.set(toolCallId, Date.now()) + const { label, subject } = buildToolLabel( + typeof data.name === 'string' ? data.name : 'tool', + isPlainObject(data.arguments) ? data.arguments : undefined, + ) + controller.enqueue({ + type: 'tool-start', + data: { + toolCallId, + toolName: typeof data.name === 'string' ? data.name : '', + label, + subject, + arguments: data.arguments, + }, + }) + } else if (phase === 'end') { + const startedAt = toolStartedAt.get(toolCallId) + if (startedAt) toolStartedAt.delete(toolCallId) + controller.enqueue({ + type: 'tool-end', + data: { + toolCallId, + output: + typeof data.output === 'string' ? data.output : undefined, + isError: data.isError === true, + durationMs: startedAt ? Date.now() - startedAt : undefined, + }, + }) + } + return + } + + if (stream === 'lifecycle') { + const phase = data.phase + if (phase === 'start' || phase === 'end' || phase === 'error') { + controller.enqueue({ + type: 'lifecycle', + data: { phase, runId }, + }) + } + } + } + + const offChat = observer.on('chat', onChat) + const offAgent = observer.on('agent', onAgent) + + let signalListener: (() => void) | null = null + if (options.signal) { + if (options.signal.aborted) { + // Already aborted before we attached. Best-effort fire abort + // and close, but don't throw — the run might already be done. + void abortChat(runId) + close({ type: 'lifecycle', data: { phase: 'aborted', runId } }) + return + } + signalListener = () => { + void abortChat(runId) + } + options.signal.addEventListener('abort', signalListener, { + once: true, + }) + } + }, + }) + } + + /** + * Send a chat via OpenClaw's WS `chat.send` RPC, invoked through the + * OpenClaw CLI from inside the gateway container. Returns the assigned + * runId immediately; tokens stream as broadcast events on the observer. + */ + private async sendChatViaCli(input: { + sessionKey: string + message: string + attachments?: unknown[] + idempotencyKey?: string + timeoutMs?: number + }): Promise<{ runId: string }> { + await this.ensureTokenLoaded() + if (!this.tokenLoaded) { + throw new Error( + 'OpenClaw gateway token not loaded; setup may not have completed', + ) + } + const idempotencyKey = input.idempotencyKey ?? crypto.randomUUID() + const params: Record = { + sessionKey: input.sessionKey, + message: input.message, + idempotencyKey, + } + if (input.attachments && input.attachments.length > 0) { + params.attachments = input.attachments + } + const result = await this.runtime.callGatewayRpc<{ + runId: string + status?: string + }>({ + method: 'chat.send', + params, + token: this.token, + timeoutMs: input.timeoutMs ?? 5000, + }) + if (typeof result.runId !== 'string' || !result.runId) { + throw new Error('OpenClaw chat.send did not return a runId') + } + return { runId: result.runId } + } + + /** + * Abort any in-flight runs for an agent's session. Looks up runs by + * sessionKey in OpenClaw's chatAbortControllers registry. + */ + async abortChat( + agentId: string, + sessionKey: string, + runId?: string, + ): Promise<{ aborted: boolean; runIds: string[] }> { + await this.assertGatewayReady() + await this.ensureTokenLoaded() + if (!this.tokenLoaded) { + throw new Error( + 'OpenClaw gateway token not loaded; setup may not have completed', + ) + } + const fullSessionKey = toOpenClawBrowserOSSessionKey( + agentId, + normalizeBrowserOSChatSessionKey(agentId, sessionKey), ) + const params: Record = { sessionKey: fullSessionKey } + if (runId) params.runId = runId + const result = await this.runtime.callGatewayRpc<{ + ok?: boolean + aborted?: boolean + runIds?: string[] + }>({ + method: 'chat.abort', + params, + token: this.token, + timeoutMs: 5000, + }) + return { + aborted: Boolean(result.aborted), + runIds: Array.isArray(result.runIds) ? result.runIds : [], + } } private resolveSpecificAgentSession( @@ -2042,3 +2342,38 @@ function sameVmCacheRuntimeConfig( left?.ensureSynced === right?.ensureSynced ) } + +function isPlainObject(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value) +} + +function isArray(value: unknown): value is unknown[] { + return Array.isArray(value) +} + +/** + * Translate our internal `messageParts` (OpenAI multimodal shape, used by + * the route's attachment validator) to OpenClaw chat.send `attachments` + * (the shape its `normalizeRpcAttachmentsToChatAttachments` expects). + * + * Today we only have image data: URLs. The data URL carries mime + base64 + * inline, which OpenClaw's normalizer accepts as `{type, mimeType, content}`. + */ +function messagePartsToOpenClawAttachments( + parts: OpenClawChatContentPart[] | undefined, +): unknown[] { + if (!parts || parts.length === 0) return [] + const out: unknown[] = [] + for (const p of parts) { + if (p.type !== 'image_url') continue + const url = p.image_url?.url + if (typeof url !== 'string' || !url.startsWith('data:')) continue + const semi = url.indexOf(';') + const comma = url.indexOf(',') + if (semi < 0 || comma < 0) continue + const mimeType = url.slice(5, semi) + const content = url.slice(comma + 1) + out.push({ type: 'image', mimeType, content }) + } + return out +} diff --git a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-types.ts b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-types.ts index a9419b4fc..bb88f004d 100644 --- a/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-types.ts +++ b/packages/browseros-agent/apps/server/src/api/services/openclaw/openclaw-types.ts @@ -7,11 +7,13 @@ export interface OpenClawStreamEvent { type: | 'text-delta' + | 'reasoning-delta' | 'thinking' | 'tool-start' | 'tool-end' | 'tool-output' | 'lifecycle' + | 'usage' | 'done' | 'error' data: Record diff --git a/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-http-client.test.ts b/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-http-client.test.ts index 24240691b..810fd5d0c 100644 --- a/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-http-client.test.ts +++ b/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-http-client.test.ts @@ -14,122 +14,6 @@ describe('OpenClawHttpClient', () => { globalThis.fetch = originalFetch }) - it('maps chat completion deltas into BrowserOS stream events', async () => { - const fetchMock = mock((_url: string | URL, _init?: RequestInit) => - Promise.resolve( - new Response( - new ReadableStream({ - start(controller) { - const encoder = new TextEncoder() - controller.enqueue( - encoder.encode( - 'data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n', - ), - ) - controller.enqueue( - encoder.encode( - 'data: {"choices":[{"delta":{"content":" world"}}]}\n\n', - ), - ) - controller.enqueue( - encoder.encode( - 'data: {"choices":[{"delta":{},"finish_reason":"stop"}]}\n\n', - ), - ) - controller.enqueue(encoder.encode('data: [DONE]\n\n')) - controller.close() - }, - }), - { - status: 200, - headers: { 'Content-Type': 'text/event-stream' }, - }, - ), - ), - ) - globalThis.fetch = fetchMock as typeof globalThis.fetch - const client = new OpenClawHttpClient(18789, async () => 'gateway-token') - - const stream = await client.streamChat({ - agentId: 'research', - sessionKey: 'session-123', - message: 'hi', - history: [{ role: 'assistant', content: 'Earlier reply' }], - }) - - const events = await readEvents(stream) - const call = fetchMock.mock.calls[0] - - expect(call?.[0]).toBe('http://127.0.0.1:18789/v1/chat/completions') - expect(call?.[1]).toMatchObject({ - method: 'POST', - headers: { - Authorization: 'Bearer gateway-token', - 'Content-Type': 'application/json', - }, - }) - expect(JSON.parse(String(call?.[1]?.body))).toEqual({ - model: 'openclaw/research', - stream: true, - messages: [ - { role: 'assistant', content: 'Earlier reply' }, - { role: 'user', content: 'hi' }, - ], - user: 'browseros:research:session-123', - }) - expect(events).toEqual([ - { type: 'text-delta', data: { text: 'Hello' } }, - { type: 'text-delta', data: { text: ' world' } }, - { type: 'done', data: { text: 'Hello world' } }, - ]) - }) - - it('uses openclaw for the main agent', async () => { - const fetchMock = mock(() => - Promise.resolve( - new Response( - new ReadableStream({ - start(controller) { - controller.close() - }, - }), - { - status: 200, - headers: { 'Content-Type': 'text/event-stream' }, - }, - ), - ), - ) - globalThis.fetch = fetchMock as typeof globalThis.fetch - const client = new OpenClawHttpClient(18789, async () => 'gateway-token') - - await client.streamChat({ - agentId: 'main', - sessionKey: 'session-123', - message: 'hi', - }) - - const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body)) as { - model: string - } - expect(body.model).toBe('openclaw') - }) - - it('throws on non-success HTTP responses', async () => { - globalThis.fetch = mock(() => - Promise.resolve(new Response('Unauthorized', { status: 401 })), - ) as typeof globalThis.fetch - const client = new OpenClawHttpClient(18789, async () => 'gateway-token') - - await expect( - client.streamChat({ - agentId: 'research', - sessionKey: 'session-123', - message: 'hi', - }), - ).rejects.toThrow('Unauthorized') - }) - it('checks gateway authentication with the current bearer token', async () => { const fetchMock = mock(() => Promise.resolve(new Response('{}'))) globalThis.fetch = fetchMock as typeof globalThis.fetch @@ -166,127 +50,6 @@ describe('OpenClawHttpClient', () => { await expect(client.isAuthenticated()).resolves.toBe(false) }) - it('surfaces an error when OpenClaw finishes without assistant text', async () => { - globalThis.fetch = mock(() => - Promise.resolve( - new Response( - new ReadableStream({ - start(controller) { - const encoder = new TextEncoder() - controller.enqueue( - encoder.encode( - 'data: {"choices":[{"delta":{},"finish_reason":"stop"}]}\n\n', - ), - ) - controller.enqueue(encoder.encode('data: [DONE]\n\n')) - controller.close() - }, - }), - { - status: 200, - headers: { 'Content-Type': 'text/event-stream' }, - }, - ), - ), - ) as typeof globalThis.fetch - const client = new OpenClawHttpClient(18789, async () => 'gateway-token') - - const stream = await client.streamChat({ - agentId: 'main', - sessionKey: 'session-123', - message: 'hi', - }) - - await expect(readEvents(stream)).resolves.toEqual([ - { - type: 'error', - data: { - message: "Agent couldn't generate a response. Please try again.", - }, - }, - ]) - }) - - it('stops processing batched SSE events after a malformed chunk closes the stream', async () => { - const fetchMock = mock(() => - Promise.resolve( - new Response( - new ReadableStream({ - start(controller) { - const encoder = new TextEncoder() - controller.enqueue( - encoder.encode( - 'data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n' + - 'data: not-json\n\n' + - 'data: {"choices":[{"delta":{"content":" world"}}]}\n\n', - ), - ) - controller.close() - }, - }), - { - status: 200, - headers: { 'Content-Type': 'text/event-stream' }, - }, - ), - ), - ) - globalThis.fetch = fetchMock as typeof globalThis.fetch - const client = new OpenClawHttpClient(18789, async () => 'gateway-token') - - const stream = await client.streamChat({ - agentId: 'research', - sessionKey: 'session-123', - message: 'hi', - }) - - await expect(readEvents(stream)).resolves.toEqual([ - { type: 'text-delta', data: { text: 'Hello' } }, - { - type: 'error', - data: { message: 'Failed to parse OpenClaw chat stream chunk' }, - }, - ]) - }) - - it('does not double-close the stream controller when the request is aborted', async () => { - globalThis.fetch = mock(() => - Promise.resolve( - new Response( - new ReadableStream({ - start(controller) { - const encoder = new TextEncoder() - controller.enqueue( - encoder.encode( - 'data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n', - ), - ) - }, - cancel() { - return Promise.resolve() - }, - }), - { - status: 200, - headers: { 'Content-Type': 'text/event-stream' }, - }, - ), - ), - ) as typeof globalThis.fetch - const client = new OpenClawHttpClient(18789, async () => 'gateway-token') - const abortController = new AbortController() - abortController.abort() - - const stream = await client.streamChat({ - agentId: 'research', - sessionKey: 'session-123', - message: 'hi', - signal: abortController.signal, - }) - - await expect(readEvents(stream)).resolves.toEqual([]) - }) - describe('getSessionHistory', () => { it('sends GET with bearer auth and forwards limit/cursor as query params', async () => { const fetchMock = mock(() => diff --git a/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-service.test.ts b/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-service.test.ts index e64824dfe..b472327fd 100644 --- a/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-service.test.ts +++ b/packages/browseros-agent/apps/server/tests/api/services/openclaw/openclaw-service.test.ts @@ -408,31 +408,47 @@ describe('OpenClawService', () => { expect(typeof page.page.cursor).toBe('string') }) - it('normalizes recursive session keys before streaming chat', async () => { + it('normalizes recursive session keys and wraps them as the OpenClaw chat.send sessionKey', async () => { const service = new OpenClawService() as MutableOpenClawService - const stream = new ReadableStream() - const streamChat = mock(async () => stream) + const callGatewayRpc = mock(async () => ({ + runId: 'run-1', + status: 'started', + })) service.runtime = { isReady: async () => true, + callGatewayRpc, } - service.httpClient = { - streamChat, + service.token = 'cli-token' + service.tokenLoaded = true + // Pretend the observer is connected so chatStream proceeds. + service.observer = { + isConnected: () => true, + // chatStream registers chat/agent listeners; we don't need to fire any + // events for this test since we only care about the call into + // runtime.callGatewayRpc. + on: () => () => undefined, } + service.runControlPlaneCall = async (fn: () => Promise) => fn() - await expect( - service.chatStream( - 'main', - 'agent:main:openai-user:browseros:main:agent:main:openai-user:browseros:main:e1ee8e17-4fdb-4072-99ce-8f680853ec00', - 'hello', - ), - ).resolves.toBe(stream) - expect(streamChat).toHaveBeenCalledWith({ - agentId: 'main', - sessionKey: 'e1ee8e17-4fdb-4072-99ce-8f680853ec00', - message: 'hello', - history: [], - }) + await service.chatStream( + 'main', + 'agent:main:openai-user:browseros:main:agent:main:openai-user:browseros:main:e1ee8e17-4fdb-4072-99ce-8f680853ec00', + 'hello', + ) + + expect(callGatewayRpc).toHaveBeenCalledWith( + expect.objectContaining({ + method: 'chat.send', + params: expect.objectContaining({ + sessionKey: + 'agent:main:openai-user:browseros:main:e1ee8e17-4fdb-4072-99ce-8f680853ec00', + message: 'hello', + idempotencyKey: expect.any(String), + }), + token: 'cli-token', + }), + ) }) it('maps successful cli client probes into connected status', async () => {