diff --git a/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/AgentCommandConversation.tsx b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/AgentCommandConversation.tsx index 24be3ef30..179aca9c5 100644 --- a/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/AgentCommandConversation.tsx +++ b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/AgentCommandConversation.tsx @@ -2,6 +2,12 @@ import { ArrowLeft, Bot, Home } from 'lucide-react' import { type FC, useEffect, useMemo, useRef } from 'react' import { Navigate, useNavigate, useParams, useSearchParams } from 'react-router' import { Button } from '@/components/ui/button' +import { + cancelHarnessTurn, + useEnqueueHarnessMessage, + useHarnessAgents, + useRemoveHarnessQueuedMessage, +} from '@/entrypoints/app/agents/useAgents' import { type AgentEntry, getModelDisplayName, @@ -15,6 +21,7 @@ import { filterTurnsPersistedInHistory, flattenHistoryPages, } from './claw-chat-types' +import { QueuePanel } from './QueuePanel' import { useAgentConversation } from './useAgentConversation' import { useHarnessChatHistory } from './useHarnessChatHistory' @@ -212,15 +219,33 @@ function AgentConversationController({ [historyMessages], ) + // Listing query feeds queue + active-turn state for this agent. We + // already poll it every 5s for the rail; reusing the same cache + // keeps cross-tab queue state in sync without a second poll. + const { harnessAgents } = useHarnessAgents() + const harnessAgent = harnessAgents.find((entry) => entry.id === agentId) + const queue = harnessAgent?.queue ?? [] + const activeTurnId = harnessAgent?.activeTurnId ?? null + const { turns, streaming, send } = useAgentConversation(agentId, { runtime: 'agent-harness', sessionKey: null, history: chatHistory, + activeTurnId, onComplete: () => { void harnessHistoryQuery.refetch() }, onSessionKeyChange: () => {}, }) + const enqueueMessage = useEnqueueHarnessMessage() + const removeQueuedMessage = useRemoveHarnessQueuedMessage() + + const handleStop = () => { + void cancelHarnessTurn(agentId, { + turnId: activeTurnId ?? undefined, + reason: 'user pressed stop', + }) + } const visibleTurns = useMemo( () => filterTurnsPersistedInHistory(turns, historyMessages), [historyMessages, turns], @@ -281,7 +306,15 @@ function AgentConversationController({ />
-
+
+ {queue.length > 0 ? ( + + removeQueuedMessage.mutate({ agentId, messageId }) + } + /> + ) : null} navigate(createAgentPath)} + onStop={handleStop} streaming={streaming} disabled={disabled} status="running" attachmentsEnabled={true} - placeholder={`Message ${agentName}...`} + placeholder={ + streaming + ? `Type to queue another message for ${agentName}...` + : `Message ${agentName}...` + } />
diff --git a/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/ConversationInput.tsx b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/ConversationInput.tsx index 702b2149e..a1c5cb89d 100644 --- a/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/ConversationInput.tsx +++ b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/ConversationInput.tsx @@ -54,25 +54,40 @@ interface ConversationInputProps { placeholder?: string attachmentsEnabled?: boolean variant?: 'home' | 'conversation' + /** + * When set, a Stop button surfaces to the left of the voice mic + * while `streaming === true`. Click cancels the active turn + * server-side via the chat-cancel endpoint. Absent → no Stop + * button (legacy behaviour for the home composer). + */ + onStop?: () => void } function InputActionButton({ disabled, onClick, streaming, + hasContent, }: { disabled: boolean onClick: () => void streaming: boolean + hasContent: boolean }) { + // Show the spinner while streaming only when there's nothing to + // send — once the user types something, the icon flips back to the + // paper-plane so it reads as "queue this message" instead of + // "still working". + const showSpinner = streaming && !hasContent return ( + ) +} + function VoiceButton({ isRecording, isTranscribing, @@ -299,6 +330,7 @@ export const ConversationInput: FC = ({ placeholder, attachmentsEnabled = true, variant = 'conversation', + onStop, }) => { const [input, setInput] = useState('') const [selectedTabs, setSelectedTabs] = useState([]) @@ -379,10 +411,17 @@ export const ConversationInput: FC = ({ } const hasContent = input.trim().length > 0 || attachments.length > 0 + // Queue-aware composers (the conversation panel passes `onStop`) + // accept input while streaming — the parent decides whether the + // submission opens a new turn or enqueues onto the active one. + // Surfaces without a Stop hook (home) keep the legacy behaviour + // and block input until the current turn finishes. + const queueAware = Boolean(onStop) const handleSend = () => { const text = input.trim() - if (disabled || isStaging || streaming) return + if (disabled || isStaging) return + if (streaming && !queueAware) return if (!text && attachments.length === 0) return onSend({ text, attachments }) setInput('') @@ -512,6 +551,7 @@ export const ConversationInput: FC = ({ )} />
+ {streaming && onStop ? : null} = ({ !!disabled || voice.isRecording || voice.isTranscribing || - streaming + (streaming && !queueAware) } onClick={handleSend} // Spinner stays the user-facing "agent is busy" hint; with the // queue active we still spin while a turn is in flight. streaming={streaming} + hasContent={hasContent} /> {voice.error ? ( diff --git a/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/QueuePanel.tsx b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/QueuePanel.tsx new file mode 100644 index 000000000..570169d4d --- /dev/null +++ b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/QueuePanel.tsx @@ -0,0 +1,94 @@ +import { ListPlus, X } from 'lucide-react' +import type { FC } from 'react' +import { + Queue, + QueueItem, + QueueItemAction, + QueueItemActions, + QueueItemAttachment, + QueueItemContent, + QueueItemFile, + QueueItemImage, + QueueList, + QueueSection, + QueueSectionContent, + QueueSectionLabel, + QueueSectionTrigger, +} from '@/components/ai-elements/queue' +import type { + HarnessQueuedMessage, + HarnessQueuedMessageAttachment, +} from '@/entrypoints/app/agents/agent-harness-types' +import { firstNonBlankLine } from '@/entrypoints/app/agents/agent-row/agent-row.helpers' + +interface QueuePanelProps { + queue: HarnessQueuedMessage[] + onRemove: (messageId: string) => void +} + +/** + * Renders the agent's pending message queue using the shared AI + * Elements `Queue` primitives. Caller is expected to gate render on + * `queue.length > 0` — when empty, this returns null so the panel + * disappears cleanly between turns. + */ +export const QueuePanel: FC = ({ queue, onRemove }) => { + if (queue.length === 0) return null + return ( + + + + } + /> + + + + {queue.map((entry) => ( + +
+ + {firstNonBlankLine(entry.message)} + + + onRemove(entry.id)} + > + + + +
+ {entry.attachments && entry.attachments.length > 0 ? ( + + {entry.attachments.map((attachment, idx) => + renderAttachment(entry.id, attachment, idx), + )} + + ) : null} +
+ ))} +
+
+
+
+ ) +} + +function renderAttachment( + messageId: string, + attachment: HarnessQueuedMessageAttachment, + idx: number, +) { + if (attachment.mediaType.startsWith('image/')) { + const src = `data:${attachment.mediaType};base64,${attachment.data}` + return + } + return ( + + {attachment.mediaType} + + ) +} diff --git a/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts index 9b780ee5a..d859abcef 100644 --- a/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts +++ b/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts @@ -36,6 +36,15 @@ interface UseAgentConversationOptions { history?: OpenClawChatHistoryMessage[] onComplete?: () => void onSessionKeyChange?: (sessionKey: string) => void + /** + * Server-side active turn id, surfaced via the listing query. When + * this changes from null/ to a different non-null id while we + * aren't already streaming (e.g. the server just popped a queued + * message and started a new turn), the hook reattaches via + * /chat/active so the chat panel picks up the live stream without + * waiting for a remount. + */ + activeTurnId?: string | null } export function useAgentConversation( @@ -211,31 +220,46 @@ export function useAgentConversation( } processEventRef.current = processAgentHarnessStreamEvent - // On mount (and whenever the agent changes), check whether the - // server has an in-flight turn for this agent and reattach to it. - // This is what makes the chat resilient across tab close/reopen, - // refresh, and navigation: the runtime call kept running on the - // server while we were away. Effect only depends on `agentId` — - // the event handler is read off a ref so this doesn't re-subscribe - // every render. + const activeTurnIdDep = options.activeTurnId ?? null + + // On mount, on agent change, and whenever the listing reports a + // *new* active turn id, check whether the server has an in-flight + // turn for this agent and reattach to it. This catches three + // cases at once: the chat resilience flow (tab close/reopen), + // navigation between agents, AND queue drain (the server starts a + // new turn from a queued message → activeTurnId flips → attach). useEffect(() => { let cancelled = false const abortController = new AbortController() + // Reference the dep inside the body so biome's exhaustive-deps + // rule sees it consumed; the value is just an "any non-null + // active turn id" trigger — the actual id we attach to comes + // from the fresh fetchActiveHarnessTurn call below. + void activeTurnIdDep const attemptResume = async () => { + // Track whether *we* started a stream in this run. When the + // early-return paths fire (no active turn, or a `send()` / + // earlier resume already owns `streamAbortRef`), the finally + // block must NOT touch streaming/turnIdRef/lastSeqRef — + // otherwise we clobber the in-flight stream's state and the + // Stop button drops out mid-turn while events keep arriving. + let weStartedStream = false try { const active = await fetchActiveHarnessTurn(agentId) if (cancelled || !active || active.status !== 'running') return - if (streamAbortRef.current) return // a fresh send already in flight + if (streamAbortRef.current) return // someone else already owns the stream // Stage a placeholder turn so the streamed events have a row - // to render into. We don't have the user message text on - // resume; the assistant turn is what we're catching up on. + // to render into. The server now persists the kicking-off + // prompt on the active turn, so we render it as the user + // bubble immediately — no empty-bubble flicker when a queued + // message starts running. setTurns((prev) => [ ...prev, { id: crypto.randomUUID(), - userText: '', + userText: active.prompt ?? '', parts: [], done: false, timestamp: active.startedAt, @@ -247,6 +271,7 @@ export function useAgentConversation( lastSeqRef.current = null streamAbortRef.current = abortController setStreaming(true) + weStartedStream = true const response = await attachToHarnessTurn(agentId, { turnId: active.turnId, @@ -265,10 +290,20 @@ export function useAgentConversation( // Resume is best-effort; transient errors fall back to the // user starting a new turn manually. } finally { - if (!cancelled) { - if (streamAbortRef.current === abortController) { - streamAbortRef.current = null - } + // Always release `streamAbortRef` if we owned it — even when + // the effect was cancelled mid-stream (a listing poll + // captured the next queue-drain turn id, for example). If we + // don't, the next effect run hits `if (streamAbortRef.current) + // return` against our now-aborted controller and never + // reattaches, leaving `streaming === true` with no live stream. + if (weStartedStream && streamAbortRef.current === abortController) { + streamAbortRef.current = null + } + // The other state (streaming flag, turn id, lastSeq) is the + // *current run's* lifecycle: only reset it on a clean exit. + // When `cancelled` is true the next run will set these + // itself, so resetting here would only cause a brief flicker. + if (!cancelled && weStartedStream) { turnIdRef.current = null lastSeqRef.current = null setStreaming(false) @@ -281,7 +316,7 @@ export function useAgentConversation( cancelled = true abortController.abort() } - }, [agentId]) + }, [agentId, activeTurnIdDep]) const send = async (input: string | SendInput) => { const normalized: SendInput = diff --git a/packages/browseros-agent/apps/agent/entrypoints/app/agents/agent-harness-types.ts b/packages/browseros-agent/apps/agent/entrypoints/app/agents/agent-harness-types.ts index e257e579a..e0eeeb14e 100644 --- a/packages/browseros-agent/apps/agent/entrypoints/app/agents/agent-harness-types.ts +++ b/packages/browseros-agent/apps/agent/entrypoints/app/agents/agent-harness-types.ts @@ -73,6 +73,20 @@ export interface HarnessAgent { lastErrorAt?: number | null /** When non-null, an in-flight turn this row can be resumed from. */ activeTurnId?: string | null + /** Persistent FIFO queue of messages waiting for this agent. */ + queue?: HarnessQueuedMessage[] +} + +export interface HarnessQueuedMessageAttachment { + mediaType: string + data: string +} + +export interface HarnessQueuedMessage { + id: string + createdAt: number + message: string + attachments?: ReadonlyArray } export interface HarnessAdapterHealth { diff --git a/packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts b/packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts index 22686ef57..78de68ab0 100644 --- a/packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts +++ b/packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts @@ -8,6 +8,7 @@ import { type HarnessAdapterDescriptor, type HarnessAgent, type HarnessAgentHistoryPage, + type HarnessQueuedMessage, mapHarnessAgentToEntry, } from './agent-harness-types' import type { OpenClawStatus } from './useOpenClaw' @@ -263,6 +264,8 @@ export interface HarnessActiveTurnInfo { lastSeq: number startedAt: number endedAt?: number + /** User message that kicked off the turn; null when not captured. */ + prompt: string | null } /** @@ -317,3 +320,145 @@ export async function fetchHarnessAgentHistory( `/${encodeURIComponent(agentId)}/sessions/main/history`, ) } + +export interface EnqueueMessageInput { + message: string + attachments?: ReadonlyArray +} + +export async function enqueueHarnessMessage( + agentId: string, + input: EnqueueMessageInput, +): Promise { + const baseUrl = await getAgentServerUrl() + const response = await fetch( + `${baseUrl}/agents/${encodeURIComponent(agentId)}/queue`, + { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + message: input.message, + ...(input.attachments && input.attachments.length > 0 + ? { attachments: input.attachments } + : {}), + }), + }, + ) + if (!response.ok) { + let message = `Request failed with status ${response.status}` + try { + const body = (await response.json()) as { error?: string } + if (body.error) message = body.error + } catch {} + throw new Error(message) + } + const body = (await response.json()) as { queued: HarnessQueuedMessage } + return body.queued +} + +export async function removeHarnessQueuedMessage( + agentId: string, + messageId: string, +): Promise<{ removed: boolean }> { + const baseUrl = await getAgentServerUrl() + const response = await fetch( + `${baseUrl}/agents/${encodeURIComponent(agentId)}/queue/${encodeURIComponent( + messageId, + )}`, + { method: 'DELETE' }, + ) + if (!response.ok) return { removed: false } + return (await response.json()) as { removed: boolean } +} + +/** + * Optimistic enqueue: writes the new queued message into the listing + * cache immediately so the queue panel reflects the change without + * waiting for the next poll. Rolls back if the server rejects. + */ +export function useEnqueueHarnessMessage() { + const { baseUrl } = useAgentServerUrl() + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: async (input: { agentId: string } & EnqueueMessageInput) => + enqueueHarnessMessage(input.agentId, input), + onMutate: async (input) => { + const queryKey = [AGENT_QUERY_KEYS.agents, baseUrl] + await queryClient.cancelQueries({ queryKey }) + const previous = queryClient.getQueryData(queryKey) + if (!previous) return { previous: undefined } + const optimistic: HarnessQueuedMessage = { + id: `optimistic-${Math.random().toString(36).slice(2, 10)}`, + createdAt: Date.now(), + message: input.message, + } + queryClient.setQueryData(queryKey, { + ...previous, + agents: previous.agents.map((agent) => + agent.id === input.agentId + ? { ...agent, queue: [...(agent.queue ?? []), optimistic] } + : agent, + ), + }) + return { previous } + }, + onError: (_err, _vars, context) => { + if (!context?.previous) return + queryClient.setQueryData( + [AGENT_QUERY_KEYS.agents, baseUrl], + context.previous, + ) + }, + onSettled: async () => { + await queryClient.invalidateQueries({ + queryKey: [AGENT_QUERY_KEYS.agents], + }) + }, + }) +} + +/** + * Optimistic queue removal mirror of `useEnqueueHarnessMessage`. + */ +export function useRemoveHarnessQueuedMessage() { + const { baseUrl } = useAgentServerUrl() + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: async (input: { agentId: string; messageId: string }) => + removeHarnessQueuedMessage(input.agentId, input.messageId), + onMutate: async (input) => { + const queryKey = [AGENT_QUERY_KEYS.agents, baseUrl] + await queryClient.cancelQueries({ queryKey }) + const previous = queryClient.getQueryData(queryKey) + if (!previous) return { previous: undefined } + queryClient.setQueryData(queryKey, { + ...previous, + agents: previous.agents.map((agent) => + agent.id === input.agentId + ? { + ...agent, + queue: (agent.queue ?? []).filter( + (entry) => entry.id !== input.messageId, + ), + } + : agent, + ), + }) + return { previous } + }, + onError: (_err, _vars, context) => { + if (!context?.previous) return + queryClient.setQueryData( + [AGENT_QUERY_KEYS.agents, baseUrl], + context.previous, + ) + }, + onSettled: async () => { + await queryClient.invalidateQueries({ + queryKey: [AGENT_QUERY_KEYS.agents], + }) + }, + }) +} diff --git a/packages/browseros-agent/apps/server/src/api/routes/agents.ts b/packages/browseros-agent/apps/server/src/api/routes/agents.ts index ba734ca57..2d8f225e4 100644 --- a/packages/browseros-agent/apps/server/src/api/routes/agents.ts +++ b/packages/browseros-agent/apps/server/src/api/routes/agents.ts @@ -36,8 +36,10 @@ import { AgentHarnessService, type GatewayStatusSnapshot, InvalidAgentUpdateError, + MessageQueueFullError, type OpenClawProvisioner, OpenClawProvisionerUnavailableError, + type QueuedMessage, TurnAlreadyActiveError, UnknownAgentError, } from '../services/agents/agent-harness-service' @@ -83,6 +85,16 @@ type AgentRouteService = { turnId?: string reason?: string }): boolean + enqueueMessage(input: { + agentId: string + message: string + attachments?: ReadonlyArray<{ mediaType: string; data: string }> + }): Promise + removeQueuedMessage(input: { + agentId: string + messageId: string + }): Promise + listQueuedMessages(agentId: string): Promise } type AgentRouteDeps = { @@ -353,6 +365,40 @@ export function createAgentRoutes(deps: AgentRouteDeps = {}) { const cancelled = service.cancelTurn({ agentId, turnId, reason }) return c.json({ cancelled }) }) + .get('/:agentId/queue', async (c) => { + try { + const queue = await service.listQueuedMessages(c.req.param('agentId')) + return c.json({ queue }) + } catch (err) { + return handleAgentRouteError(c, err) + } + }) + .post('/:agentId/queue', async (c) => { + const parsed = await parseEnqueueBody(c) + if ('error' in parsed) return c.json({ error: parsed.error }, 400) + try { + const queued = await service.enqueueMessage({ + agentId: c.req.param('agentId'), + message: parsed.message, + attachments: parsed.attachments, + }) + return c.json({ queued }) + } catch (err) { + return handleAgentRouteError(c, err) + } + }) + .delete('/:agentId/queue/:messageId', async (c) => { + try { + const removed = await service.removeQueuedMessage({ + agentId: c.req.param('agentId'), + messageId: c.req.param('messageId'), + }) + if (!removed) return c.json({ error: 'Queued message not found' }, 404) + return c.json({ removed }) + } catch (err) { + return handleAgentRouteError(c, err) + } + }) } function turnFramesToAgentEvents( @@ -551,6 +597,27 @@ const ALLOWED_IMAGE_MEDIA_TYPES = new Set([ 'image/gif', ]) +/** + * Body parser for `POST /agents/:id/queue`. Mirrors `parseChatBody`'s + * shape (message + attachments) but adds an upper bound on the + * message text size so a runaway client can't fill the queue file + * with multi-megabyte payloads. + */ +async function parseEnqueueBody( + c: Context, +): Promise< + { message: string; attachments: InboundImageAttachment[] } | { error: string } +> { + const parsed = await parseChatBody(c) + if ('error' in parsed) return parsed + if (parsed.message.length > AGENT_HARNESS_LIMITS.QUEUE_MESSAGE_MAX_BYTES) { + return { + error: `Message exceeds ${AGENT_HARNESS_LIMITS.QUEUE_MESSAGE_MAX_BYTES} bytes`, + } + } + return parsed +} + async function parseChatBody( c: Context, ): Promise< @@ -706,6 +773,9 @@ function handleAgentRouteError(c: Context, err: unknown) { if (err instanceof InvalidAgentUpdateError) { return c.json({ error: err.message }, 400) } + if (err instanceof MessageQueueFullError) { + return c.json({ error: err.message }, 429) + } if (err instanceof OpenClawProvisionerUnavailableError) { return c.json({ error: err.message }, 503) } diff --git a/packages/browseros-agent/apps/server/src/api/services/agents/agent-harness-service.ts b/packages/browseros-agent/apps/server/src/api/services/agents/agent-harness-service.ts index e438702d1..95222e922 100644 --- a/packages/browseros-agent/apps/server/src/api/services/agents/agent-harness-service.ts +++ b/packages/browseros-agent/apps/server/src/api/services/agents/agent-harness-service.ts @@ -18,6 +18,18 @@ import { type CreateAgentInput, FileAgentStore, } from '../../../lib/agents/file-agent-store' +import { + FileMessageQueue, + type QueuedMessage, + type QueuedMessageAttachment, +} from '../../../lib/agents/message-queue' + +export { + MessageQueueFullError, + type QueuedMessage, + type QueuedMessageAttachment, +} from '../../../lib/agents/message-queue' + import type { AgentHistoryPage, AgentRowSnapshot, @@ -56,6 +68,8 @@ export interface AgentDefinitionWithActivity extends AgentDefinition { lastErrorAt: number | null /** When non-null, an in-flight turn this row can be resumed from. */ activeTurnId: string | null + /** Persistent FIFO queue of messages waiting to run for this agent. */ + queue: QueuedMessage[] } const SPARKLINE_DAYS = 14 @@ -142,6 +156,7 @@ export class AgentHarnessService { private readonly runtime: AgentRuntime private readonly openclawProvisioner: OpenClawProvisioner | null private readonly turnRegistry: TurnRegistry + private readonly messageQueue: FileMessageQueue private inFlightReconcile: Promise | null = null // In-memory liveness tracker. Lost on server restart (acceptable — // `lastUsedAt` survives via the acpx session record's `lastUsedAt`, @@ -161,6 +176,7 @@ export class AgentHarnessService { openclawGatewayChat?: OpenClawGatewayChatClient openclawProvisioner?: OpenClawProvisioner turnRegistry?: TurnRegistry + messageQueue?: FileMessageQueue } = {}, ) { this.agentStore = deps.agentStore ?? new FileAgentStore() @@ -173,6 +189,25 @@ export class AgentHarnessService { }) this.openclawProvisioner = deps.openclawProvisioner ?? null this.turnRegistry = deps.turnRegistry ?? new TurnRegistry() + this.messageQueue = deps.messageQueue ?? new FileMessageQueue() + // Drain any agents whose queue file survived a restart. The check + // for `getActiveFor` inside `maybeStartNextFromQueue` guards + // against double-firing if the in-memory turn registry happens to + // have something (it won't post-restart, but the guard is cheap). + void this.drainOnBoot() + } + + private async drainOnBoot(): Promise { + try { + const pending = await this.messageQueue.agentsWithPendingMessages() + for (const agentId of pending) { + void this.maybeStartNextFromQueue(agentId) + } + } catch (err) { + logger.warn('Message queue boot drain failed', { + error: err instanceof Error ? err.message : String(err), + }) + } } async listAgents(): Promise { @@ -189,7 +224,10 @@ export class AgentHarnessService { */ async listAgentsWithActivity(): Promise { const agents = await this.listAgents() - const snapshots = await this.collectRowSnapshots(agents) + const [snapshots, queueSnapshot] = await Promise.all([ + this.collectRowSnapshots(agents), + this.messageQueue.snapshotAll(), + ]) const now = Date.now() return agents.map((agent) => { const live = this.activity.get(agent.id) @@ -210,6 +248,7 @@ export class AgentHarnessService { lastErrorAt: live?.status === 'error' ? (live.lastEventAt ?? null) : null, activeTurnId: activeTurn?.turnId ?? null, + queue: queueSnapshot[agent.id] ?? [], } }) } @@ -290,11 +329,101 @@ export class AgentHarnessService { lastEventAt: Date.now(), lastError: outcome.error, }) + } else { + // Successful turn — drop the in-memory entry. Liveness will be + // derived from the session record's `lastUsedAt` on next read. + this.activity.delete(agentId) + } + // The queue drain runs on every turn-end (success or failure) so + // a queued message is the next thing to run. Fire-and-forget; any + // failure inside `maybeStartNextFromQueue` requeues the message + // and logs. + void this.maybeStartNextFromQueue(agentId) + } + + /** + * Pop the oldest queued message for `agentId` and start a turn from + * it. Fires from `notifyTurnEnded` (covers natural completion + + * cancel) and on server boot to drain queue files that survived a + * restart. No-ops when the queue is empty or another turn is + * already running for the agent. + */ + private async maybeStartNextFromQueue(agentId: string): Promise { + const next = await this.messageQueue.popOldest(agentId) + if (!next) return + // Race guard: a turn may have started between `popOldest` and now + // (e.g. the user typed and clicked Send directly between cancel + // and the drain). Put the message back at the head and let the + // next turn-end retry. + if (this.turnRegistry.getActiveFor(agentId, 'main')) { + await this.messageQueue.pushFront(agentId, next) return } - // Successful turn — drop the in-memory entry. Liveness will be - // derived from the session record's `lastUsedAt` on next read. - this.activity.delete(agentId) + try { + await this.startTurn({ + agentId, + message: next.message, + attachments: next.attachments, + }) + } catch (err) { + logger.warn('Queue drain failed; requeued message', { + agentId, + queuedId: next.id, + error: err instanceof Error ? err.message : String(err), + }) + try { + await this.messageQueue.pushFront(agentId, next) + } catch (requeueErr) { + logger.error('Queue requeue after drain failure also failed', { + agentId, + queuedId: next.id, + error: + requeueErr instanceof Error + ? requeueErr.message + : String(requeueErr), + }) + } + } + } + + /** + * Append a message to the agent's queue. Returns the new queued + * record. Throws `UnknownAgentError` for unknown agents and + * `MessageQueueFullError` when the per-agent cap is reached. + */ + async enqueueMessage(input: { + agentId: string + message: string + attachments?: ReadonlyArray + }): Promise { + const agent = await this.requireAgent(input.agentId) + const queued = await this.messageQueue.append(agent.id, { + message: input.message, + attachments: input.attachments, + }) + // Defensive drain: if the agent has no active turn at enqueue + // time (e.g. the user enqueued during the brief window between + // turns), pop it back off and start it directly. Avoids the + // queue sitting idle while the agent is also idle. + if (!this.turnRegistry.getActiveFor(agent.id, 'main')) { + void this.maybeStartNextFromQueue(agent.id) + } + return queued + } + + /** + * Remove a queued message. Returns true if the message was + * removed, false if the agent or message was unknown. + */ + async removeQueuedMessage(input: { + agentId: string + messageId: string + }): Promise { + return this.messageQueue.remove(input.agentId, input.messageId) + } + + async listQueuedMessages(agentId: string): Promise { + return this.messageQueue.list(agentId) } private ensureGatewayReconciled(): Promise { @@ -493,7 +622,9 @@ export class AgentHarnessService { throw new TurnAlreadyActiveError(agent.id, existing.turnId) } - const turn = this.turnRegistry.register(agent.id, 'main') + const turn = this.turnRegistry.register(agent.id, 'main', { + prompt: input.message, + }) this.notifyTurnStarted(agent.id) // Kick off the runtime call in the background. The per-turn diff --git a/packages/browseros-agent/apps/server/src/lib/agents/active-turn-registry.ts b/packages/browseros-agent/apps/server/src/lib/agents/active-turn-registry.ts index 25904889c..e18516ff8 100644 --- a/packages/browseros-agent/apps/server/src/lib/agents/active-turn-registry.ts +++ b/packages/browseros-agent/apps/server/src/lib/agents/active-turn-registry.ts @@ -24,6 +24,8 @@ export interface ActiveTurnInfo { lastSeq: number startedAt: number endedAt?: number + /** User message that kicked off the turn; null when not captured. */ + prompt: string | null } interface Subscriber { @@ -43,6 +45,8 @@ interface ActiveTurn { startedAt: number endedAt?: number retainUntil?: number + /** User message that kicked off the turn (when known). */ + prompt: string | null } const DEFAULT_BUFFER_CAPACITY = 5000 @@ -136,7 +140,11 @@ export class TurnRegistry { * Register a new turn. The caller is responsible for kicking off the * runtime call and pumping its events into `pushEvent` until done. */ - register(agentId: string, sessionId: 'main' = 'main'): ActiveTurn { + register( + agentId: string, + sessionId: 'main' = 'main', + options: { prompt?: string | null } = {}, + ): ActiveTurn { const turn: ActiveTurn = { turnId: randomUUID(), agentId, @@ -146,6 +154,7 @@ export class TurnRegistry { subscribers: new Set(), abortController: new AbortController(), startedAt: Date.now(), + prompt: options.prompt ?? null, } this.turns.set(turn.turnId, turn) this.ensureSweeper() @@ -187,6 +196,7 @@ export class TurnRegistry { lastSeq: turn.buffer.lastSeq, startedAt: turn.startedAt, endedAt: turn.endedAt, + prompt: turn.prompt, } } diff --git a/packages/browseros-agent/apps/server/src/lib/agents/message-queue.ts b/packages/browseros-agent/apps/server/src/lib/agents/message-queue.ts new file mode 100644 index 000000000..ede478a7c --- /dev/null +++ b/packages/browseros-agent/apps/server/src/lib/agents/message-queue.ts @@ -0,0 +1,226 @@ +/** + * @license + * Copyright 2025 BrowserOS + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import { randomUUID } from 'node:crypto' +import { mkdir, readFile, rename, writeFile } from 'node:fs/promises' +import { dirname, join } from 'node:path' +import { AGENT_HARNESS_LIMITS } from '@browseros/shared/constants/limits' +import { getBrowserosDir } from '../browseros-dir' +import { logger } from '../logger' + +export interface QueuedMessageAttachment { + mediaType: string + data: string +} + +export interface QueuedMessage { + id: string + createdAt: number + message: string + attachments?: ReadonlyArray +} + +interface MessageQueueFile { + version: 1 + queues: Record +} + +export class MessageQueueFullError extends Error { + constructor( + readonly agentId: string, + readonly limit: number, + ) { + super(`Queue for agent ${agentId} is full (limit ${limit})`) + this.name = 'MessageQueueFullError' + } +} + +/** + * Per-agent durable FIFO of messages waiting to run. Persists at + * `~/.browseros/agent-harness/message-queues.json` so queues survive + * server restarts. Atomic temp+rename writes serialised through a + * write lock so concurrent enqueues from different request contexts + * don't race. + * + * Reads and writes always touch the whole file. The file is small in + * practice (one short JSON record per agent, capped at 50 messages + * each), so this keeps the implementation honest and removes any need + * for partial-update semantics. + */ +export class FileMessageQueue { + private readonly filePath: string + private writeQueue: Promise = Promise.resolve() + private readonly maxLength: number + + constructor(options: { filePath?: string; maxLength?: number } = {}) { + this.filePath = + options.filePath ?? + join(getBrowserosDir(), 'agents', 'harness', 'message-queues.json') + this.maxLength = options.maxLength ?? AGENT_HARNESS_LIMITS.QUEUE_MAX_LENGTH + } + + async list(agentId: string): Promise { + const file = await this.read() + return file.queues[agentId] ?? [] + } + + async snapshotAll(): Promise> { + const file = await this.read() + return Object.fromEntries( + Object.entries(file.queues).map(([agentId, queue]) => [ + agentId, + queue.slice(), + ]), + ) + } + + async append( + agentId: string, + input: { + message: string + attachments?: ReadonlyArray + }, + ): Promise { + return this.withWriteLock(async () => { + const file = await this.read() + const queue = file.queues[agentId] ?? [] + if (queue.length >= this.maxLength) { + throw new MessageQueueFullError(agentId, this.maxLength) + } + const queued: QueuedMessage = { + id: randomUUID(), + createdAt: Date.now(), + message: input.message, + attachments: input.attachments, + } + const next = [...queue, queued] + await this.write({ + ...file, + queues: { ...file.queues, [agentId]: next }, + }) + logger.info('Message queue appended', { + agentId, + queuedId: queued.id, + depth: next.length, + }) + return queued + }) + } + + async popOldest(agentId: string): Promise { + return this.withWriteLock(async () => { + const file = await this.read() + const queue = file.queues[agentId] ?? [] + if (queue.length === 0) return null + const [head, ...rest] = queue + const nextQueues = { ...file.queues } + if (rest.length === 0) { + delete nextQueues[agentId] + } else { + nextQueues[agentId] = rest + } + await this.write({ ...file, queues: nextQueues }) + logger.info('Message queue popped', { + agentId, + queuedId: head.id, + remaining: rest.length, + }) + return head + }) + } + + /** + * Re-attach a message to the head of an agent's queue. Used by the + * drain pump when starting a turn fails so the message isn't + * silently dropped. Bypasses the length cap — `pushFront` is a + * recovery primitive, not a regular append. + */ + async pushFront(agentId: string, message: QueuedMessage): Promise { + await this.withWriteLock(async () => { + const file = await this.read() + const queue = file.queues[agentId] ?? [] + const next = [message, ...queue] + await this.write({ + ...file, + queues: { ...file.queues, [agentId]: next }, + }) + logger.info('Message queue requeued at head', { + agentId, + queuedId: message.id, + depth: next.length, + }) + }) + } + + async remove(agentId: string, messageId: string): Promise { + return this.withWriteLock(async () => { + const file = await this.read() + const queue = file.queues[agentId] ?? [] + const next = queue.filter((entry) => entry.id !== messageId) + if (next.length === queue.length) return false + const nextQueues = { ...file.queues } + if (next.length === 0) { + delete nextQueues[agentId] + } else { + nextQueues[agentId] = next + } + await this.write({ ...file, queues: nextQueues }) + logger.info('Message queue removed', { agentId, messageId }) + return true + }) + } + + /** Agent ids that have at least one queued message. */ + async agentsWithPendingMessages(): Promise { + const file = await this.read() + return Object.entries(file.queues) + .filter(([, queue]) => queue.length > 0) + .map(([agentId]) => agentId) + } + + private async read(): Promise { + try { + const raw = await readFile(this.filePath, 'utf8') + const parsed = JSON.parse(raw) as MessageQueueFile + if (parsed.version !== 1 || typeof parsed.queues !== 'object') { + return emptyQueueFile() + } + return parsed + } catch (err) { + if (isNotFoundError(err)) return emptyQueueFile() + throw err + } + } + + private async write(file: MessageQueueFile): Promise { + await mkdir(dirname(this.filePath), { recursive: true }) + const tmpPath = `${this.filePath}.${process.pid}.${Date.now()}.tmp` + await writeFile(tmpPath, `${JSON.stringify(file, null, 2)}\n`, 'utf8') + await rename(tmpPath, this.filePath) + } + + private withWriteLock(fn: () => Promise): Promise { + const result = this.writeQueue.then(fn, fn) + this.writeQueue = result.then( + () => undefined, + () => undefined, + ) + return result + } +} + +function emptyQueueFile(): MessageQueueFile { + return { version: 1, queues: {} } +} + +function isNotFoundError(err: unknown): boolean { + return ( + typeof err === 'object' && + err !== null && + 'code' in err && + err.code === 'ENOENT' + ) +} diff --git a/packages/browseros-agent/apps/server/tests/api/routes/agents.test.ts b/packages/browseros-agent/apps/server/tests/api/routes/agents.test.ts index b2ac07a2f..6ec34dd91 100644 --- a/packages/browseros-agent/apps/server/tests/api/routes/agents.test.ts +++ b/packages/browseros-agent/apps/server/tests/api/routes/agents.test.ts @@ -501,6 +501,93 @@ describe('createAgentRoutes', () => { expect(unknown.status).toBe(404) }) + it('queues + lists + removes messages for an agent', async () => { + const agent: AgentDefinition = { + id: 'agent-1', + name: 'Review bot', + adapter: 'codex', + modelId: 'gpt-5.5', + reasoningEffort: 'medium', + permissionMode: 'approve-all', + sessionKey: 'agent:agent-1:main', + createdAt: 1000, + updatedAt: 1000, + } + const route = createMountedRoutes([agent]) + + const enqueueA = await route.request('/agents/agent-1/queue', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ message: 'first', attachments: [] }), + }) + expect(enqueueA.status).toBe(200) + const enqueuedA = await enqueueA.json() + expect(enqueuedA.queued).toMatchObject({ message: 'first' }) + + const enqueueB = await route.request('/agents/agent-1/queue', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ message: 'second' }), + }) + expect(enqueueB.status).toBe(200) + const enqueuedB = await enqueueB.json() + + const listed = await route.request('/agents/agent-1/queue') + expect(listed.status).toBe(200) + const listedBody = await listed.json() + expect(listedBody.queue.map((q: { message: string }) => q.message)).toEqual( + ['first', 'second'], + ) + + const removed = await route.request( + `/agents/agent-1/queue/${enqueuedA.queued.id}`, + { method: 'DELETE' }, + ) + expect(removed.status).toBe(200) + expect(await removed.json()).toEqual({ removed: true }) + + const afterRemove = await route.request('/agents/agent-1/queue') + expect((await afterRemove.json()).queue).toEqual([ + expect.objectContaining({ id: enqueuedB.queued.id, message: 'second' }), + ]) + + const removeMissing = await route.request( + '/agents/agent-1/queue/does-not-exist', + { method: 'DELETE' }, + ) + expect(removeMissing.status).toBe(404) + }) + + it('rejects empty queue messages and unknown agents', async () => { + const route = createMountedRoutes([ + { + id: 'agent-1', + name: 'Review bot', + adapter: 'codex', + modelId: 'gpt-5.5', + reasoningEffort: 'medium', + permissionMode: 'approve-all', + sessionKey: 'agent:agent-1:main', + createdAt: 1000, + updatedAt: 1000, + }, + ]) + + const empty = await route.request('/agents/agent-1/queue', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ message: ' ' }), + }) + expect(empty.status).toBe(400) + + const unknown = await route.request('/agents/missing/queue', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ message: 'hi' }), + }) + expect(unknown.status).toBe(404) + }) + it('rejects overlong agent names', async () => { const route = createMountedRoutes([]) const response = await route.request('/agents', { @@ -548,6 +635,15 @@ function createFakeService(agents: AgentDefinition[]) { let lastStartTurnInput: | { agentId: string; message?: string; cwd?: string } | undefined + const queues = new Map< + string, + Array<{ + id: string + createdAt: number + message: string + attachments?: ReadonlyArray<{ mediaType: string; data: string }> + }> + >() return { get _lastStartTurnInput() { @@ -655,8 +751,43 @@ function createFakeService(agents: AgentDefinition[]) { if (!turnId) return false return registry.cancel(turnId, input.reason) }, + async enqueueMessage(input: { + agentId: string + message: string + attachments?: ReadonlyArray<{ mediaType: string; data: string }> + }) { + if (!agents.some((a) => a.id === input.agentId)) { + const { UnknownAgentError } = await import( + '../../../src/api/services/agents/agent-harness-service' + ) + throw new UnknownAgentError(input.agentId) + } + const queued = { + id: `q-${Math.random().toString(36).slice(2, 10)}`, + createdAt: Date.now(), + message: input.message, + attachments: input.attachments, + } + const list = queues.get(input.agentId) ?? [] + list.push(queued) + queues.set(input.agentId, list) + return queued + }, + async removeQueuedMessage(input: { agentId: string; messageId: string }) { + const list = queues.get(input.agentId) + if (!list) return false + const next = list.filter((entry) => entry.id !== input.messageId) + if (next.length === list.length) return false + if (next.length === 0) queues.delete(input.agentId) + else queues.set(input.agentId, next) + return true + }, + async listQueuedMessages(agentId: string) { + return queues.get(agentId)?.slice() ?? [] + }, /** Test-only: lets tests await turn completion deterministically. */ _registry: registry, + _queues: queues, } } @@ -746,6 +877,15 @@ function createBlockingFakeService(agents: AgentDefinition[]) { if (!turnId) return false return registry.cancel(turnId, input.reason) }, + async enqueueMessage() { + throw new Error('not used in this test') + }, + async removeQueuedMessage() { + return false + }, + async listQueuedMessages() { + return [] + }, _unblock: () => unblock(), _cancelCalls: cancelCalls, } diff --git a/packages/browseros-agent/apps/server/tests/lib/agents/message-queue.test.ts b/packages/browseros-agent/apps/server/tests/lib/agents/message-queue.test.ts new file mode 100644 index 000000000..5e6aed097 --- /dev/null +++ b/packages/browseros-agent/apps/server/tests/lib/agents/message-queue.test.ts @@ -0,0 +1,109 @@ +/** + * @license + * Copyright 2025 BrowserOS + */ + +import { afterEach, beforeEach, describe, expect, it } from 'bun:test' +import { mkdtemp, readFile, rm } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { + FileMessageQueue, + MessageQueueFullError, +} from '../../../src/lib/agents/message-queue' + +let tmp: string +let queue: FileMessageQueue + +beforeEach(async () => { + tmp = await mkdtemp(join(tmpdir(), 'browseros-queue-')) + queue = new FileMessageQueue({ + filePath: join(tmp, 'queues.json'), + maxLength: 3, + }) +}) + +afterEach(async () => { + await rm(tmp, { recursive: true, force: true }) +}) + +describe('FileMessageQueue', () => { + it('appends in FIFO order and pops oldest first', async () => { + await queue.append('a', { message: 'one' }) + await queue.append('a', { message: 'two' }) + const popped = await queue.popOldest('a') + expect(popped?.message).toBe('one') + expect(await queue.list('a')).toEqual([ + expect.objectContaining({ message: 'two' }), + ]) + }) + + it('returns null when popping an empty queue', async () => { + expect(await queue.popOldest('a')).toBeNull() + }) + + it('removes a single message by id', async () => { + const first = await queue.append('a', { message: 'one' }) + await queue.append('a', { message: 'two' }) + const removed = await queue.remove('a', first.id) + expect(removed).toBe(true) + expect(await queue.list('a')).toEqual([ + expect.objectContaining({ message: 'two' }), + ]) + }) + + it('returns false when removing an unknown message id', async () => { + await queue.append('a', { message: 'one' }) + expect(await queue.remove('a', 'nope')).toBe(false) + }) + + it('throws MessageQueueFullError when capacity is reached', async () => { + await queue.append('a', { message: 'one' }) + await queue.append('a', { message: 'two' }) + await queue.append('a', { message: 'three' }) + await expect(queue.append('a', { message: 'four' })).rejects.toBeInstanceOf( + MessageQueueFullError, + ) + }) + + it('pushFront bypasses the cap (recovery primitive)', async () => { + await queue.append('a', { message: 'one' }) + await queue.append('a', { message: 'two' }) + await queue.append('a', { message: 'three' }) + await queue.pushFront('a', { + id: 'recovered', + createdAt: Date.now(), + message: 'recovered', + }) + expect((await queue.list('a')).map((q) => q.message)).toEqual([ + 'recovered', + 'one', + 'two', + 'three', + ]) + }) + + it('persists across instances on the same file path', async () => { + await queue.append('a', { message: 'survives' }) + const other = new FileMessageQueue({ + filePath: join(tmp, 'queues.json'), + maxLength: 3, + }) + expect((await other.list('a')).map((q) => q.message)).toEqual(['survives']) + }) + + it('agentsWithPendingMessages lists agents with non-empty queues', async () => { + await queue.append('a', { message: 'x' }) + await queue.append('b', { message: 'y' }) + const pending = await queue.agentsWithPendingMessages() + expect(pending.sort()).toEqual(['a', 'b']) + }) + + it('writes are atomic (temp file rename leaves no stray files)', async () => { + await queue.append('a', { message: 'one' }) + const raw = await readFile(join(tmp, 'queues.json'), 'utf8') + const parsed = JSON.parse(raw) + expect(parsed.version).toBe(1) + expect(parsed.queues.a[0].message).toBe('one') + }) +}) diff --git a/packages/browseros-agent/packages/shared/src/constants/limits.ts b/packages/browseros-agent/packages/shared/src/constants/limits.ts index 76fb567dc..70d533689 100644 --- a/packages/browseros-agent/packages/shared/src/constants/limits.ts +++ b/packages/browseros-agent/packages/shared/src/constants/limits.ts @@ -83,4 +83,8 @@ export const CONTENT_LIMITS = { export const AGENT_HARNESS_LIMITS = { AGENT_NAME_MAX_CHARS: 80, + /** Maximum number of messages allowed in an agent's pending queue. */ + QUEUE_MAX_LENGTH: 50, + /** Maximum size in bytes for a single queued message's text. */ + QUEUE_MESSAGE_MAX_BYTES: 64 * 1024, } as const