feat(opencode): add SSE replay for missed session events#401
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds server-side SSE replay (EventReplayStore, bridge, replay-aware /event stream), a client SSE replay cursor and E2E controls, a blocker-terminal cache to prevent reopening resolved asks/permissions, E2E endpoints/fixtures and tests, SDK wiring to expose/start/stop the stream for E2E, and an improved e2e timeout error that concurrently fetches questions and session status. ChangesSSE Event Replay, Client Cursor, and Blocker Terminal Cache
Sequence Diagram(s)sequenceDiagram
participant Client
participant SDK as Global SDK
participant Server as /event SSE
participant Replay as EventReplayStore
participant Bus as GlobalBus
Client->>SDK: connect (GET /event with Last-Event-ID?)
SDK->>SDK: replayCursor.headers() -> attach header
SDK->>Server: GET /event (with Last-Event-ID)
Server->>Replay: snapshot(lastEventID)
Replay-->>Server: replay records + fence/gap info
Server-->>Client: replayed SSE packets (with ids)
Bus->>Server: emit live envelope events
Server->>Replay: append(envelope)
Server-->>Client: buffered live SSE packets (post-replay)
Client->>SDK: onSseEvent(event)
SDK->>SDK: replayCursor.update(event.id)
sequenceDiagram
participant EventStream
participant Reducer
participant Cache
participant Store
EventStream->>Reducer: question.replied(session, req)
Reducer->>Cache: mark("question", dir, session, req)
Reducer->>Store: remove pending question
Note over EventStream,Reducer: Later (stale) ask arrives
EventStream->>Reducer: question.asked(session, req)
Reducer->>Cache: has("question", dir, session, req)?
Cache-->>Reducer: true
Reducer->>Reducer: short-circuit (skip creating pending question)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Review rate limit: 9/10 reviews remaining, refill in 6 minutes. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces an SSE event replay system to enhance client-server synchronization, featuring a server-side EventReplayStore and a client-side sse-cursor. It also adds a blocker-terminal-cache to prevent stale events from reopening resolved requests. Feedback highlights that the server.connected event is sent unconditionally, which may trigger redundant client-side refreshes and bypass the replay optimization.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
packages/app/src/context/global-sync/event-reducer.test.ts (1)
497-555: ⚡ Quick winAdd a
question.rejected-before-question.askedvariant test.
question.rejectedshares the same terminal path asquestion.replied; adding the mirrored stale-order case would close the last branch-level gap.Proposed test addition
+ test("question.rejected before question.asked prevents stale ask from reopening", () => { + const blockerTerminals = createBlockerTerminalCache({ now: () => 1000 }) + const [store, setStore] = createStore(baseState()) + + applyDirectoryEvent({ + event: { type: "question.rejected", properties: { sessionID: "ses_1", requestID: "q1" } }, + directory: "/repo", + store, + setStore, + push() {}, + loadLsp() {}, + blockerTerminals, + }) + + applyDirectoryEvent({ + event: { + type: "question.asked", + properties: questionRequest("q1", "ses_1"), + }, + directory: "/repo", + store, + setStore, + push() {}, + loadLsp() {}, + blockerTerminals, + }) + + expect(store.question.ses_1).toBeUndefined() + })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/app/src/context/global-sync/event-reducer.test.ts` around lines 497 - 555, Add a new unit test mirroring "question.replied before question.asked prevents stale ask from reopening" but use the "question.rejected" event type: create blockerTerminals with now:() => 1000, create a fresh store via createStore(baseState()), call applyDirectoryEvent first with event.type "question.rejected" and properties { sessionID: "ses_1", requestID: "q1" }, then call applyDirectoryEvent with event.type "question.asked" and properties from questionRequest("q1","ses_1"), and finally assert expect(store.question.ses_1).toBeUndefined(); use the same helpers (createBlockerTerminalCache, createStore, applyDirectoryEvent, questionRequest) and same args for directory, setStore, push, loadLsp as the existing tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/opencode/src/server/event-replay.ts`:
- Around line 105-129: open() computes replay slices from this.records without
pruning by age, so stale records survive quiet periods; before computing
fenceSeq/earliestSeq/replay, invoke the same expiration logic used in append
(respecting maxAgeMs) to remove aged-out entries from this.records (or call the
existing prune/expire helper if present) so replay only includes non-expired
records; ensure this affects earliestSeq and replay calculation and preserve
listener behavior.
---
Nitpick comments:
In `@packages/app/src/context/global-sync/event-reducer.test.ts`:
- Around line 497-555: Add a new unit test mirroring "question.replied before
question.asked prevents stale ask from reopening" but use the
"question.rejected" event type: create blockerTerminals with now:() => 1000,
create a fresh store via createStore(baseState()), call applyDirectoryEvent
first with event.type "question.rejected" and properties { sessionID: "ses_1",
requestID: "q1" }, then call applyDirectoryEvent with event.type
"question.asked" and properties from questionRequest("q1","ses_1"), and finally
assert expect(store.question.ses_1).toBeUndefined(); use the same helpers
(createBlockerTerminalCache, createStore, applyDirectoryEvent, questionRequest)
and same args for directory, setStore, push, loadLsp as the existing tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 1cfa4684-d3c6-4285-8c94-977dbc736a3e
📒 Files selected for processing (13)
packages/app/e2e/actions.tspackages/app/src/context/global-sdk.tsxpackages/app/src/context/global-sdk/sse-cursor.test.tspackages/app/src/context/global-sdk/sse-cursor.tspackages/app/src/context/global-sync.tsxpackages/app/src/context/global-sync/blocker-terminal-cache.test.tspackages/app/src/context/global-sync/blocker-terminal-cache.tspackages/app/src/context/global-sync/event-reducer.test.tspackages/app/src/context/global-sync/event-reducer.tspackages/opencode/src/server/event-replay.tspackages/opencode/src/server/instance/global.tspackages/opencode/test/server/event-replay.test.tspackages/opencode/test/server/global-event-replay.test.ts
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/opencode/src/server/event-replay.ts`:
- Around line 75-79: Validate retention configuration in the EventReplay
constructor (and the other initialization path around lines 149-153) to prevent
negative retention values: check that maxRecords and maxAgeMs are not negative
(e.g., if input?.maxRecords < 0 or input?.maxAgeMs < 0) and either throw a clear
ArgumentError or clamp to a safe non-negative value; this ensures prune()'s loop
condition cannot be satisfied indefinitely and prevents append()/snapshot() from
hanging. Include the checks where bootID, maxRecords, maxAgeMs, and now are
assigned so invalid inputs are rejected early.
In `@packages/opencode/test/server/global-event-replay.test.ts`:
- Around line 14-25: The helper readSseUntil currently can hang indefinitely if
the predicate never matches; modify readSseUntil to fail fast by adding a
timeout (e.g., timeoutMs param with a sensible default) and use an
AbortController or a timeout Promise to cancel the response.body.getReader()
(call reader.cancel()) and reject with a clear timeout error if the predicate
isn’t satisfied within that window; keep the existing loop and TextDecoder logic
but wrap the read loop in a Promise.race (or respect AbortSignal) so the
function throws on timeout instead of waiting for the test framework to time
out.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 96d4a219-8ef3-467d-bc3f-9f559820b261
📒 Files selected for processing (6)
packages/app/src/context/global-sdk/sse-cursor.tspackages/app/src/context/global-sync/blocker-terminal-cache.tspackages/opencode/src/server/event-replay.tspackages/opencode/src/server/instance/global.tspackages/opencode/test/server/event-replay.test.tspackages/opencode/test/server/global-event-replay.test.ts
✅ Files skipped from review due to trivial changes (1)
- packages/app/src/context/global-sdk/sse-cursor.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/opencode/src/server/instance/global.ts
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
packages/opencode/src/server/event-replay.ts (1)
164-164: ⚡ Quick winAdd module namespace self-reexport at file end.
Please add the canonical module-as-namespace export for this new
packages/opencode/srcmodule so consumers can import it consistently.Suggested diff
export class EventReplayStore { // ... } + +export * as EventReplay from "./event-replay"Based on learnings: In
packages/opencode/srcmodule files, the canonical pattern is a bottom self-reexport (export * as <ModuleName> from "./<module-file>") and consumers depend on that namespace import style.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/opencode/src/server/event-replay.ts` at line 164, Add a canonical module namespace self-reexport at the end of the file so consumers can import the module as a namespace; specifically append an export statement that re-exports everything from this file's module (e.g., export * as EventReplay from "./event-replay") at the bottom of packages/opencode/src/event-replay.ts so consumers can import the EventReplay namespace consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/opencode/test/server/global-event-replay.test.ts`:
- Around line 168-173: Add an explicit assertion that the SSE route returned a
successful HTTP status before consuming the stream: after calling
app.request(...) and before calling readSseUntil(...), check the response status
(e.g., ensure response.status is 200 or within the expected range) so failures
in route handling are reported immediately; update the test around the response,
controller, and readSseUntil usage to include this status assertion.
---
Nitpick comments:
In `@packages/opencode/src/server/event-replay.ts`:
- Line 164: Add a canonical module namespace self-reexport at the end of the
file so consumers can import the module as a namespace; specifically append an
export statement that re-exports everything from this file's module (e.g.,
export * as EventReplay from "./event-replay") at the bottom of
packages/opencode/src/event-replay.ts so consumers can import the EventReplay
namespace consistently.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 4cf77321-1a9a-4135-9a00-97950b6b91a9
📒 Files selected for processing (3)
packages/opencode/src/server/event-replay.tspackages/opencode/test/server/event-replay.test.tspackages/opencode/test/server/global-event-replay.test.ts
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
packages/app/e2e/session/session-composer-dock.spec.ts (1)
470-481: 🏗️ Heavy liftMake this assertion distinguish replay from fallback recovery.
After
stream.start(), this only checks that the dock becomes blocked again. That same end state is also valid for the normal bootstrap/refresh fallback described in the PR, so this test can stay green even if reconnect replay is broken. Please add a replay-specific assertion here instead of only asserting eventual dock recovery.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/app/e2e/session/session-composer-dock.spec.ts` around lines 470 - 481, The test currently only asserts dock recovery after stream.start() which can't distinguish replay from a generic fallback; modify the test to assert a replay-specific signal emitted by the reconnection flow before/while the dock unblocks. After calling stream.start() (and before or immediately after expectQuestionBlocked()), poll or inspect globalEventStream(stream.cursor) for a replay-specific event or marker (for example a "replay"/'replayed_events' log entry or a replay-related payload property) and assert it appears within the timeout, and/or assert that the restored question(s) include replay metadata (e.g. a replay flag on the restored question object) rather than relying solely on expectQuestionBlocked() or questionDockSelector count to prove recovery.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/opencode/src/server/instance/question.ts`:
- Around line 30-37: The current detached call to
AppRuntime.runPromise(Question.Service.use(... svc.ask(...))) swallows errors
via .catch(() => undefined); change this so startup/persistence failures are
surfaced or at least logged before the request is enqueued: in the block that
calls AppRuntime.runPromise and Question.Service.use with svc.ask, remove the
empty catch and instead handle rejections by logging the error (using your
system logger/processLogger) or rethrowing for upstream handling; ensure
failures that occur inside svc.ask are not converted into a silent 204 and that
waitForQuestionSeed can observe the error.
---
Nitpick comments:
In `@packages/app/e2e/session/session-composer-dock.spec.ts`:
- Around line 470-481: The test currently only asserts dock recovery after
stream.start() which can't distinguish replay from a generic fallback; modify
the test to assert a replay-specific signal emitted by the reconnection flow
before/while the dock unblocks. After calling stream.start() (and before or
immediately after expectQuestionBlocked()), poll or inspect
globalEventStream(stream.cursor) for a replay-specific event or marker (for
example a "replay"/'replayed_events' log entry or a replay-related payload
property) and assert it appears within the timeout, and/or assert that the
restored question(s) include replay metadata (e.g. a replay flag on the restored
question object) rather than relying solely on expectQuestionBlocked() or
questionDockSelector count to prove recovery.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 39678a08-a771-4db2-8723-8653a14e0e2f
📒 Files selected for processing (5)
packages/app/e2e/fixtures.tspackages/app/e2e/session/session-composer-dock.spec.tspackages/app/src/context/global-sdk.tsxpackages/app/src/testing/terminal.tspackages/opencode/src/server/instance/question.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/app/src/context/global-sdk.tsx
4adebd1 to
7edb06b
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/opencode/src/server/instance/question.ts`:
- Around line 30-31: Replace the ad-hoc check of OPENCODE_E2E_LLM_URL with a
strict dedicated E2E enable guard: add or use a single environment flag like
OPENCODE_ENABLE_E2E_ROUTES (or helper function isE2EEnabled()) and change the
early-return gate in the question route (the line with if
(!Env.get("OPENCODE_E2E_LLM_URL")) return c.notFound()) to check that flag
instead; update the other E2E route guards referenced (the checks at the other
route locations mentioned) to reuse the same flag/helper so both routes are
protected by the explicit test-mode guard.
- Around line 24-27: The route schema currently allows any number of questions;
restrict the questions array to the same 1..4 bounds as Question.Request to
avoid invalid seeds. Update the zod schema (the object containing SessionID.zod
and questions: z.array(Question.Info.zod)) to enforce .min(1).max(4) on the
questions array (or replace it with the validated
Question.Request/Question.Request.zod schema) so the route matches
Question.Request constraints.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 0bf3191c-a7eb-4b5e-b414-5b787d8b8a50
📒 Files selected for processing (2)
packages/app/src/context/global-sdk.tsxpackages/opencode/src/server/instance/question.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/app/src/context/global-sdk.tsx
Summary
/eventreconnect replay withLast-Event-IDcursors while keeping non-replayable live events on the existing bus path.server.connectedrefresh on valid reconnects with full replay, while still advancing invalid/gapped cursors.question.askedreplay and stalequestion.askedafter reply.Why
Issue #396 needs short-window recovery for missed
question,permission, andsession/statusupdates after SSE reconnects. This keeps recovery at the transport layer around the existing global BusEvent stream, without moving blocker/session updates into persistent sync-event storage.Related Issue
Refs #396. This PR implements the backend replay path, app cursor persistence, reducer stale-blocker guard, route-level SSE replay coverage, and focused UI e2e coverage for question replay/stale-ask behavior. Full UI e2e coverage for replay-unavailable fallback remains a follow-up before #396 should be considered completely closed.
Human Review Status
Pending. A human should make the final merge decision after reviewing the final diff and verification evidence.
Review Focus
packages/opencode/src/server/event-replay.tsandpackages/opencode/src/server/instance/global.ts.server.connected, and invalid/gap reconnects send oneserver.connectedwith the fence id.packages/app/src/context/global-sdk.tsxand terminal blocker cache behavior inglobal-sync.packages/opencode/src/server/instance/question.ts, which are gated byOPENCODE_E2E_LLM_URLand used only for deterministic UI transport tests.Risk Notes
/eventstream now assigns ids for replayable events, so reconnect behavior changes for clients usingLast-Event-ID.server.connected, so replay ordering and the terminal blocker guard now carry the short-gap recovery path instead of a refresh masking it.server.connected(fenceID); bootstrap owns recovery when the replay window cannot prove continuity.global.disposedresets the replay generation and clears retained records instead of being stored as a replay record; online clients receive the live packet, while offline clients refresh after bootID mismatch.Question.ask()/question.askedevents without depending on model seeding.How To Verify
Screenshots or Recordings
Not required. This PR changes transport/reducer behavior and tests, with no visible UI copy or layout change.
Checklist
dev, and my PR title and commit messages use Conventional Commits in EnglishSummary by CodeRabbit
New Features
Bug Fixes
Tests