Skip to content

feat(agents): durable per-agent chat message queue + composer Stop#880

Merged
Dani Akash (DaniAkash) merged 6 commits intodevfrom
feat/agent-chat-message-queue
Apr 30, 2026
Merged

feat(agents): durable per-agent chat message queue + composer Stop#880
Dani Akash (DaniAkash) merged 6 commits intodevfrom
feat/agent-chat-message-queue

Conversation

@DaniAkash
Copy link
Copy Markdown
Contributor

Summary

The chat composer at `/agents/:agentId` now accepts new messages while the agent is actively streaming. Sent-while-active messages append to a durable FIFO queue rendered with the AI Elements `Queue` primitives between the chat history and the composer. The queue drains automatically when the active turn finishes (success, error, or user cancel) — the next queued message starts immediately. Queues persist across server restarts via an atomic JSON file at `~/.browseros/agent-harness/message-queues.json`.

Two control loops added:

  • Stop button — surfaces in the composer to the left of the voice mic only while the agent is streaming. Click hits the existing chat-cancel endpoint, which fires the turn-end lifecycle hook → drain → next queued message starts. This is how the user fast-tracks a queued message.
  • Remove queued message — `X` on each queue item deletes it before it runs, with optimistic UI.

Server

  • New `FileMessageQueue` store with atomic temp+rename writes serialised through a write lock. Methods: `list`, `snapshotAll`, `append`, `popOldest`, `pushFront` (recovery), `remove`, `agentsWithPendingMessages`. Per-agent cap (50) → `MessageQueueFullError` → HTTP 429.
  • `AgentHarnessService` wires `maybeStartNextFromQueue` into the turn-end lifecycle hook. Also drains on boot for queues that survived a restart. Race guard: if a turn started between pop and start, the message is pushed back to the head and retried on the next turn-end. If `startTurn` itself throws, the message requeues at the head.
  • Listing payload (`/agents`) exposes `queue: QueuedMessage[]` per agent so the rail and chat panel pick up state on the same 5s poll.
  • Three new endpoints:
    • `POST /agents/:id/queue`
    • `DELETE /agents/:id/queue/:messageId`
    • `GET /agents/:id/queue` (debug)
  • `ActiveTurnRegistry.register` now accepts an optional `prompt`; `describe()` / `getActiveTurn` / `/chat/active` surface it. This is how the chat panel knows which message kicked off a queued turn (otherwise the user bubble was empty when a drain started a new turn).

Client

  • `HarnessAgent` type extended with `queue?: HarnessQueuedMessage[]`.
  • `useEnqueueHarnessMessage` and `useRemoveHarnessQueuedMessage` mutations with optimistic listing-cache updates and rollback on failure.
  • New `` wrapping the AI Elements Queue primitives. Empty queue → renders nothing.
  • `ConversationInput` accepts `onStop`; renders a small destructive-tinted Stop button while `streaming` is true. Composer accepts input while streaming for queue-aware surfaces (parent decides routing); legacy home composer is unchanged.
  • `AgentCommandConversation` reads queue + `activeTurnId` from `useHarnessAgents`, renders `` between chat and composer when non-empty, routes send through enqueue when streaming or there's an active turn, wires `handleStop` → cancel.
  • `useAgentConversation` accepts an `activeTurnId` option; the resume effect re-runs when it flips, so a queued message that just started running attaches into the chat without a remount. The active-turn's `prompt` is used as the user-bubble placeholder, so both sides of the conversation render immediately.

Bug fixes that landed in the same branch

  • The Stop button was disappearing mid-stream when a separate listing refetch caused the resume effect to re-fire. Root cause: the effect's `finally` block was unconditionally calling `setStreaming(false)` even on its no-op early-return paths, clobbering the live state set by `send()`. Fix: track `weStartedStream` locally; the finally only resets state when this run actually started a stream. Early-return / no-active-turn paths leave streaming/turnIdRef/lastSeqRef alone for whoever does own them.
  • User bubble was empty when a queued message kicked off a new turn — fixed by persisting the prompt on the turn at register time and surfacing it via `/chat/active`.

Test plan

  • Server typecheck clean. 149 api tests pass; 131 lib tests pass (includes 9 new `FileMessageQueue` unit tests covering FIFO, bounds, requeue at head, atomic persistence, multi-instance, `agentsWithPendingMessages`).
  • Two new route tests: enqueue + list + remove FIFO behaviour, plus 400/404 paths.
  • Agent app typecheck clean. 62 tests pass. Lint clean (4 pre-existing warnings).
  • Manual: send a long message, queue 2–3 more during the stream, verify the panel shows them; click Stop → current turn cancels and the first queued message starts immediately, with both the user bubble and the streaming response visible without leaving the chat.
  • Manual: remove a queued message via the X — disappears optimistically, doesn't reappear after the next listing poll.
  • Manual: refresh the tab while messages are queued — queue persists; resume reattaches to the active turn.
  • Manual: restart the dev server with messages queued — the boot drain picks them up automatically.

Out of scope

  • Sidepanel chat (stateless, no per-agent queue).
  • Reordering queued messages (FIFO only).
  • Per-agent pause / resume controls.
  • SSE for instant cross-tab queue updates (5s polling is the v1 behaviour).
  • In-place editing of queued messages.

…ive drain attach

User feedback round 1 on the message-queue UX:

1) The Stop button matched the send/voice mics at h-10 w-10 with a
   solid destructive fill, which read as alarming. Shrunk to h-8 w-8,
   ghost variant with a soft destructive/10 background, smaller
   filled square glyph. Reads as a calm 'stop' affordance instead of
   a panic button.

2) The QueueItem's leading <QueueItemIndicator> dot was decorative
   only — no state, no interaction. Dropped it from QueuePanel along
   with the import; queue items now render as a clean preview line
   with the trailing X remove action.

3) When the server drained the queue and started the next turn, the
   chat panel didn't pick up the live stream until the user
   navigated away and back. The hook's resume effect previously
   only fired on agent change, not on listing-observed activeTurnId
   change. Surface activeTurnId from useHarnessAgents into
   useAgentConversation; effect now re-runs when the id changes,
   calls /chat/active, and attaches to the new turn — so a queued
   message starts streaming the moment the server drain pops it.
…op paths

The Stop button was disappearing while the agent was actively
streaming, even though events were still flowing into the chat. Root
cause: the resume effect's `finally` block reset `streaming`,
`turnIdRef`, and `lastSeqRef` unconditionally — including on the
early-return paths (no active turn, or another mechanism already
owns the stream).

Sequence that triggered it:
  1) User sends a message → send() sets streamAbortRef + streaming=true
     and starts consuming the SSE.
  2) User enqueues another message → enqueue mutation invalidates the
     listing query.
  3) Listing refetches with the live activeTurnId → the resume
     effect re-fires (deps include activeTurnIdDep).
  4) attemptResume hits `if (streamAbortRef.current) return` because
     send() owns it.
  5) The finally clause fires anyway and calls setStreaming(false),
     clobbering the live state set by send(). The SSE consumer keeps
     running (refs are intact) so text keeps streaming, but the React
     flag is wrong, so the Stop button gates off.

Fix: track whether *this* run actually started a stream
(`weStartedStream`). The finally only resets state when it does.
Early-return / no-active-turn paths now leave streaming/turnIdRef/
lastSeqRef alone for whoever does own them.

Also widens the Stop button's visibility (`canStop` prop on
ConversationInput) so it stays steady across the brief gap between
turns when a queue drain is mid-flight; the parent computes
`streaming || activeTurnId !== null || queue.length > 0`. The
visibility widening is independent of the streaming-state fix above
— both are now in place.
Reverts the canStop prop on ConversationInput and the OR-with-queue
visibility from AgentCommandConversation. Stop is gated solely on
`streaming` again. Between turns (queue draining) the button stays
hidden — only the actively-streaming turn is interruptible from the
composer, which matches what the user actually expects.
…sume placeholder isn't empty

When a queued message drained and started a new turn, the chat
panel's resume effect staged a placeholder turn with userText: ''
because the hook had no way to know what message kicked off the
turn — only the agent-side stream was visible, and the user bubble
above it was blank until the user navigated away and back (at which
point the session record's history loaded normally).

Fix: ActiveTurnRegistry.register now accepts an optional `prompt`
that's stashed on the turn and surfaced via describe() / the
ActiveTurnInfo response. AgentHarnessService.startTurn passes the
incoming message into register. /chat/active returns it. The chat
hook's resume effect uses active.prompt as the placeholder
turn's userText, so the user bubble shows the queued message text
the moment streaming begins. Falls back to '' for older clients
that haven't been refetched yet.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 30, 2026

✅ Tests passed — 1011/1015

Suite Passed Failed Skipped
agent 62/62 0 0
build 7/7 0 0
eval 65/65 0 0
server-agent 261/261 0 0
server-api 149/149 0 0
server-browser 3/3 0 0
server-integration 9/10 0 1
server-lib 131/131 0 0
server-root 62/65 0 3
server-skills 31/31 0 0
server-tools 231/231 0 0

View workflow run

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 30, 2026

Greptile Summary

This PR adds a durable per-agent FIFO message queue backed by an atomic JSON file, so messages sent while an agent is streaming are enqueued and drained automatically at turn boundaries rather than dropped. It also surfaces a Stop button in the composer and fixes a pre-existing bug where the stop button would disappear mid-stream when listing refetches caused the resume effect to re-fire.

  • P1 – streamAbortRef.current left stale on cleanup: When activeTurnIdDep changes while a resume stream is in flight (e.g., the 5s listing poll captures a new queue-drain turn ID before the SSE for the prior turn has finished flushing), React runs cleanup (cancelled = true, abort). The finally guard if (!cancelled && weStartedStream) correctly skips resetting streaming, but it also skips clearing streamAbortRef.current. Every subsequent effect run hits if (streamAbortRef.current) return and bails out, leaving streaming === true with no live stream until the user navigates away. The fix is to check streamAbortRef.current?.signal.aborted before the early-return guard and clear the ref if the controller was already aborted.

Confidence Score: 3/5

Hold for the streamAbortRef stale-ref fix; all server-side queue logic is solid.

One P1 in the resume hook: a narrow but real timing race introduced by the new activeTurnIdDep dependency can leave the client permanently stuck in streaming=true with no live stream until navigation. The server-side FileMessageQueue, drain logic, and route layer are well-implemented with good test coverage.

packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts — the streamAbortRef.current stale-on-cancel issue at the guard check around line 251.

Important Files Changed

Filename Overview
packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts Adds activeTurnId dep to the resume effect and introduces weStartedStream guard — the guard fixes the original stop-button flicker but leaves streamAbortRef.current stale when cleanup fires mid-stream, which can permanently block reattachment (P1).
packages/browseros-agent/apps/server/src/lib/agents/message-queue.ts New FileMessageQueue: atomic temp+rename writes, single-promise write lock, FIFO append/pop/pushFront/remove — well-structured with good error handling and recovery semantics.
packages/browseros-agent/apps/server/src/api/services/agents/agent-harness-service.ts Wires FileMessageQueue into the service: maybeStartNextFromQueue on turn-end, boot drain, enqueue/remove/list surface methods, defensive drain on enqueue when agent is idle. Race guard via pushFront is solid.
packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts Adds useEnqueueHarnessMessage and useRemoveHarnessQueuedMessage with optimistic cache updates and rollback — attachments omitted from the optimistic entry (P2 UX inconsistency).
packages/browseros-agent/apps/server/src/api/routes/agents.ts Three new queue endpoints with proper error handling and MessageQueueFullError→429; POST returns 200 instead of 201 (P2 REST convention).
packages/browseros-agent/apps/agent/entrypoints/app/agent-command/AgentCommandConversation.tsx Integrates QueuePanel, enqueue-on-send routing when streaming
packages/browseros-agent/apps/agent/entrypoints/app/agent-command/ConversationInput.tsx Queue-aware mode gated cleanly on onStop presence; StopButton component is isolated; InputActionButton spinner/send icon logic updated correctly.
packages/browseros-agent/apps/agent/entrypoints/app/agent-command/QueuePanel.tsx New component wrapping AI Elements Queue primitives; handles text and image/file attachments; returns null on empty queue so caller doesn't need its own gate.
packages/browseros-agent/apps/server/src/lib/agents/active-turn-registry.ts Adds optional prompt field to ActiveTurn/ActiveTurnInfo and the register() options bag; minimal, backwards-compatible change.
packages/browseros-agent/packages/shared/src/constants/limits.ts Adds QUEUE_MAX_LENGTH (50) and QUEUE_MESSAGE_MAX_BYTES (64 KiB) constants — well-chosen values with JSDoc.

Sequence Diagram

sequenceDiagram
    participant U as User
    participant CC as AgentCommandConversation
    participant UE as useEnqueueHarnessMessage
    participant S as Server /agents/:id/queue
    participant HS as AgentHarnessService
    participant FMQ as FileMessageQueue
    participant TR as TurnRegistry

    U->>CC: Send message while streaming=true
    CC->>UE: mutate({agentId, message})
    UE-->>CC: optimistic cache update
    UE->>S: POST /agents/:id/queue
    S->>HS: enqueueMessage()
    HS->>FMQ: append(agentId, msg)
    FMQ-->>HS: QueuedMessage
    HS->>TR: getActiveFor(agentId)?
    alt no active turn (idle window)
        HS->>HS: maybeStartNextFromQueue()
        HS->>FMQ: popOldest()
        HS->>HS: startTurn()
    end
    S-->>UE: {queued}
    UE-->>CC: cache invalidate + refetch

    note over HS,TR: Turn A ends (success/cancel/error)
    HS->>HS: notifyTurnEnded()
    HS->>HS: maybeStartNextFromQueue(agentId)
    HS->>FMQ: popOldest(agentId)
    FMQ-->>HS: next QueuedMessage
    HS->>TR: getActiveFor? (race guard)
    alt race: another turn started
        HS->>FMQ: pushFront(agentId, msg)
    else idle
        HS->>HS: startTurn(next.message)
        TR-->>HS: new ActiveTurn(prompt=next.message)
    end

    note over CC: Listing poll (5s) - activeTurnId flips
    CC->>CC: useAgentConversation (activeTurnIdDep changes)
    CC->>CC: fetchActiveHarnessTurn - attach SSE
Loading

Comments Outside Diff (2)

  1. packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts, line 289-309 (link)

    P1 Stale streamAbortRef.current after cleanup leaves UI stuck in streaming state

    When activeTurnIdDep changes (e.g., the listing poll captures the new queue-drain turn ID) while the resume effect is mid-stream, React runs the cleanup (cancelled = true, abortController.abort()). The finally block's guard if (!cancelled && weStartedStream) correctly skips resetting streaming, but it also skips clearing streamAbortRef.current. The next effect run (for the new turn ID) hits if (streamAbortRef.current) return at line 251, sees the now-aborted but non-null controller, and exits early without attaching. The result: streaming === true forever with no live stream, and no subsequent re-runs can recover it until the user navigates away.

    This is most likely to trigger in the new queue-drain flow — turn A ends and turn B starts sub-second, so a 5s listing poll could see B's ID while the SSE for A's final done event is still being read by the current run.

    A targeted fix is to clear the ref before the early-return check when the owned controller has already been aborted:

    // Clear stale ref so the next run can attach even if cleanup
    // aborted our controller before our finally ran.
    if (streamAbortRef.current?.signal.aborted) {
      streamAbortRef.current = null
    }
    if (streamAbortRef.current) return // someone else already owns the stream
    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts
    Line: 289-309
    
    Comment:
    **Stale `streamAbortRef.current` after cleanup leaves UI stuck in streaming state**
    
    When `activeTurnIdDep` changes (e.g., the listing poll captures the new queue-drain turn ID) while the resume effect is mid-stream, React runs the cleanup (`cancelled = true`, `abortController.abort()`). The `finally` block's guard `if (!cancelled && weStartedStream)` correctly skips resetting `streaming`, but it also skips clearing `streamAbortRef.current`. The next effect run (for the new turn ID) hits `if (streamAbortRef.current) return` at line 251, sees the now-aborted but non-null controller, and exits early without attaching. The result: `streaming === true` forever with no live stream, and no subsequent re-runs can recover it until the user navigates away.
    
    This is most likely to trigger in the new queue-drain flow — turn A ends and turn B starts sub-second, so a 5s listing poll could see B's ID while the SSE for A's final `done` event is still being read by the current run.
    
    A targeted fix is to clear the ref before the early-return check when the owned controller has already been aborted:
    
    ```typescript
    // Clear stale ref so the next run can attach even if cleanup
    // aborted our controller before our finally ran.
    if (streamAbortRef.current?.signal.aborted) {
      streamAbortRef.current = null
    }
    if (streamAbortRef.current) return // someone else already owns the stream
    ```
    
    How can I resolve this? If you propose a fix, please make it concise.
  2. packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts, line 551-556 (link)

    P2 Optimistic entry omits attachments

    The optimistic HarnessQueuedMessage built during onMutate doesn't include attachments, so messages queued with images or files will render without their attachments in QueuePanel until onSettled invalidates the cache and the next poll returns the real record (up to 5 seconds later).

    const optimistic: HarnessQueuedMessage = {
      id: `optimistic-${Math.random().toString(36).slice(2, 10)}`,
      createdAt: Date.now(),
      message: input.message,
      attachments: input.attachments as HarnessQueuedMessage['attachments'],
    }
    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts
    Line: 551-556
    
    Comment:
    **Optimistic entry omits attachments**
    
    The optimistic `HarnessQueuedMessage` built during `onMutate` doesn't include `attachments`, so messages queued with images or files will render without their attachments in `QueuePanel` until `onSettled` invalidates the cache and the next poll returns the real record (up to 5 seconds later).
    
    ```typescript
    const optimistic: HarnessQueuedMessage = {
      id: `optimistic-${Math.random().toString(36).slice(2, 10)}`,
      createdAt: Date.now(),
      message: input.message,
      attachments: input.attachments as HarnessQueuedMessage['attachments'],
    }
    ```
    
    How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts:289-309
**Stale `streamAbortRef.current` after cleanup leaves UI stuck in streaming state**

When `activeTurnIdDep` changes (e.g., the listing poll captures the new queue-drain turn ID) while the resume effect is mid-stream, React runs the cleanup (`cancelled = true`, `abortController.abort()`). The `finally` block's guard `if (!cancelled && weStartedStream)` correctly skips resetting `streaming`, but it also skips clearing `streamAbortRef.current`. The next effect run (for the new turn ID) hits `if (streamAbortRef.current) return` at line 251, sees the now-aborted but non-null controller, and exits early without attaching. The result: `streaming === true` forever with no live stream, and no subsequent re-runs can recover it until the user navigates away.

This is most likely to trigger in the new queue-drain flow — turn A ends and turn B starts sub-second, so a 5s listing poll could see B's ID while the SSE for A's final `done` event is still being read by the current run.

A targeted fix is to clear the ref before the early-return check when the owned controller has already been aborted:

```typescript
// Clear stale ref so the next run can attach even if cleanup
// aborted our controller before our finally ran.
if (streamAbortRef.current?.signal.aborted) {
  streamAbortRef.current = null
}
if (streamAbortRef.current) return // someone else already owns the stream
```

### Issue 2 of 3
packages/browseros-agent/apps/agent/entrypoints/app/agents/useAgents.ts:551-556
**Optimistic entry omits attachments**

The optimistic `HarnessQueuedMessage` built during `onMutate` doesn't include `attachments`, so messages queued with images or files will render without their attachments in `QueuePanel` until `onSettled` invalidates the cache and the next poll returns the real record (up to 5 seconds later).

```typescript
const optimistic: HarnessQueuedMessage = {
  id: `optimistic-${Math.random().toString(36).slice(2, 10)}`,
  createdAt: Date.now(),
  message: input.message,
  attachments: input.attachments as HarnessQueuedMessage['attachments'],
}
```

### Issue 3 of 3
packages/browseros-agent/apps/server/src/api/routes/agents.ts:669-682
**`POST /agents/:id/queue` returns 200 instead of 201**

REST convention returns `201 Created` when a new resource is created. The sister route `POST /agents` presumably returns 201; this endpoint should match.

```typescript
return c.json({ queued }, 201)
```

Reviews (1): Last reviewed commit: "fix(agents): persist the kicking-off pro..." | Re-trigger Greptile

…en cancelled

Greptile P1 follow-up. The previous `weStartedStream` guard correctly
stopped the resume effect's no-op early-returns from clobbering an
in-flight `send()` stream — but it also stopped a *cancelled*
mid-stream resume from clearing its own `streamAbortRef`. When the
cleanup fires (e.g. the 5s listing poll captures a new queue-drain
turn id while the SSE for the prior turn is still finishing), the
next effect run hits the `if (streamAbortRef.current) return` guard
against the now-aborted controller and never reattaches, leaving
`streaming === true` with no live stream until the user navigates
away.

Split the finally block: always release `streamAbortRef` when we
owned the controller (so the next run can take over), but only
reset the streaming flag / turn id / lastSeq on a clean exit (the
new run will set those itself, so resetting on cancel would just
flicker).
@DaniAkash Dani Akash (DaniAkash) merged commit 8712f89 into dev Apr 30, 2026
17 checks passed
@DaniAkash Dani Akash (DaniAkash) deleted the feat/agent-chat-message-queue branch April 30, 2026 12:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant