diff --git a/packages/app/e2e/backend.ts b/packages/app/e2e/backend.ts index 3575c4c5..72d4fe4e 100644 --- a/packages/app/e2e/backend.ts +++ b/packages/app/e2e/backend.ts @@ -60,6 +60,30 @@ const LOG_CAP = 100 const INTERNAL_SERVER_AUTH_ENV = new Set(["opencode_server_password", "opencode_server_username"]) +// Strip any host-provided AI provider credentials from the spawned backend's +// environment so the test fixture's OPENCODE_E2E_LLM_URL routing always wins. +// Without this, a developer with e.g. GEMINI_API_KEY exported on their host +// gets that provider auto-picked as default model in the worker backend, and +// the test silently makes a real network call (or fails with auth errors) +// instead of routing through the in-process e2e LLM fixture. +// +// Pattern catches `*_API_KEY` / `*_API_TOKEN` (the bulk of provider env names +// in models.dev). Explicit set covers the long tail that doesn't match +// (e.g. `GITHUB_TOKEN` for Copilot, `HF_TOKEN`, `AWS_BEARER_TOKEN_BEDROCK`). +const PROVIDER_ENV_PATTERN = /_API_(KEY|TOKEN)$/ +const PROVIDER_ENV_EXTRA = new Set([ + "AWS_BEARER_TOKEN_BEDROCK", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "BAILING_API_TOKEN", + "DIGITALOCEAN_ACCESS_TOKEN", + "FRIENDLI_TOKEN", + "GITHUB_TOKEN", + "GITLAB_TOKEN", + "GOOGLE_APPLICATION_CREDENTIALS", + "HF_TOKEN", +]) + function cap(input: string[]) { if (input.length > LOG_CAP) input.splice(0, input.length - LOG_CAP) } @@ -90,6 +114,7 @@ export function createBackendEnv(input: { } for (const key of Object.keys(env)) { if (INTERNAL_SERVER_AUTH_ENV.has(key.toLowerCase())) delete env[key] + else if (PROVIDER_ENV_PATTERN.test(key) || PROVIDER_ENV_EXTRA.has(key)) delete env[key] } return env } diff --git a/packages/app/e2e/session/session-composer-dock.spec.ts b/packages/app/e2e/session/session-composer-dock.spec.ts index 82968e04..c91e6a20 100644 --- a/packages/app/e2e/session/session-composer-dock.spec.ts +++ b/packages/app/e2e/session/session-composer-dock.spec.ts @@ -1460,3 +1460,47 @@ test("question text renders source newlines as visible line breaks", async ({ pa { trackSession: project.trackSession }, ) }) + +// Cancelling a session while a question tool is awaiting an answer must clear +// the dock AND surface a friendly hint in the message stream so the user is +// not left staring at a stuck UI. See #419. +test("cancelled question tool surfaces interrupted hint in message stream", async ({ page, llm, project }) => { + await project.open() + await withDockSession( + project.sdk, + "e2e composer dock question cancelled", + async (session) => { + await withDockSeed(project.sdk, session.id, async () => { + await project.gotoSession(session.id) + + await llm.toolMatch(inputMatch({ questions: defaultQuestions }), "question", { questions: defaultQuestions }) + await seedSessionQuestion(project.sdk, { + sessionID: session.id, + questions: defaultQuestions, + }) + + await expectQuestionBlocked(page) + + await project.sdk.session.abort({ sessionID: session.id }) + + // Dock disappears via the live `question.rejected` SSE event published + // by Question.ask's abort handler — no reload needed for this leg. + await expect(page.locator(questionDockSelector)).toHaveCount(0, { timeout: 10_000 }) + + // The message stream isn't subscribed to mid-session message updates + // in this dock-focused test setup (matching the permission-flow tests + // which also reload before asserting on tool-result UI). Reload so the + // initial render walks the full message history and our error tool + // part with `metadata.interrupted = true` lands. + await page.goto(page.url()) + + // Hint string lives in packages/ui/src/i18n/en.ts (not the app dict); + // hardcode it here as the contract anchor for this fix. + await expect( + page.getByText("This question was cancelled before it was answered. Ask again below if you want to continue."), + ).toBeVisible({ timeout: 10_000 }) + }) + }, + { trackSession: project.trackSession }, + ) +}) diff --git a/packages/app/src/pages/session.tsx b/packages/app/src/pages/session.tsx index 874f3026..9f460f0f 100644 --- a/packages/app/src/pages/session.tsx +++ b/packages/app/src/pages/session.tsx @@ -114,7 +114,19 @@ export default function Page() { const timelineSessionID = timeline.sessionID const timelineSessionKey = timeline.sessionKey const timelineIsChildSession = timeline.isChildSession - const composer = createSessionComposerState({ sessionID: timelineSessionID, fallbackSessionID: () => params.id }) + const haltAbort = (sessionID: string) => + isSessionRunning(sync.data.session_status[sessionID], sync.data.message[sessionID]) + ? sdk.client.session.abort({ sessionID }) + : Promise.resolve() + // sessionRevert chains halt with .then(), so its existing outer .catch + // already handles abort failures. The auto-heal clock wants to see the + // error so it can structured-warn — pass haltAbort directly there. + const halt = (sessionID: string) => haltAbort(sessionID).catch(() => {}) + const composer = createSessionComposerState({ + sessionID: timelineSessionID, + fallbackSessionID: () => params.id, + halt: haltAbort, + }) const timelineMessages = timeline.messages const timelineMessagesReady = timeline.messagesReady const timelineDiffs = timeline.diffs @@ -449,11 +461,6 @@ export default function Page() { attachmentLabel: () => language.t("common.attachment"), }) - const halt = (sessionID: string) => - isSessionRunning(sync.data.session_status[sessionID], sync.data.message[sessionID]) - ? sdk.client.session.abort({ sessionID }).catch(() => {}) - : Promise.resolve() - const sessionRevert = createSessionRevert({ sessionID: timelineSessionID, revertMessageID: timelineRevertMessageID, diff --git a/packages/app/src/pages/session/blockers/question-fallback.test.ts b/packages/app/src/pages/session/blockers/question-fallback.test.ts index 4f0c2f93..550200a8 100644 --- a/packages/app/src/pages/session/blockers/question-fallback.test.ts +++ b/packages/app/src/pages/session/blockers/question-fallback.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from "bun:test" -import type { Message, Part, ToolState } from "@opencode-ai/sdk/v2" +import type { Message, Part, QuestionRequest, ToolState } from "@opencode-ai/sdk/v2" import { findRunningQuestionFallbackSession } from "./question-fallback" const message = (id: string): Message => ({ id }) as Message @@ -13,37 +13,69 @@ const toolState = (status: ToolState["status"]): ToolState => time: { start: 0 }, }) as ToolState -const toolPart = (tool: string, status: ToolState["status"] = "running"): Part => +const toolPart = ( + id: string, + tool: string, + status: ToolState["status"] = "running", + attrs?: { messageID?: string; callID?: string }, +): Part => ({ - id: `part-${tool}-${status}`, + id, type: "tool", tool, state: toolState(status), + messageID: attrs?.messageID, + callID: attrs?.callID, }) as Part +const syncQ = (id: string, sessionID: string, tool?: { messageID: string; callID: string }): QuestionRequest => + ({ + id, + sessionID, + questions: [{ header: "h", question: "q", options: [] }], + tool, + }) as QuestionRequest + describe("findRunningQuestionFallbackSession", () => { test("returns undefined without a session", () => { - expect(findRunningQuestionFallbackSession({ hasQuestionRequest: false, partsByMessageID: {} })).toBeUndefined() + expect(findRunningQuestionFallbackSession({ syncQuestions: [], partsByMessageID: {} })).toBeUndefined() }) - test("returns undefined when a question request already exists", () => { + test("returns undefined when sync entry matches the running part by (messageID, callID)", () => { expect( findRunningQuestionFallbackSession({ sessionID: "s", - hasQuestionRequest: true, - messages: [message("m")], - partsByMessageID: { m: [toolPart("question")] }, + syncQuestions: [syncQ("q1", "s", { messageID: "m1", callID: "c1" })], + messages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })] }, }), ).toBeUndefined() }) - test("returns the session when a recent running question tool part exists", () => { + test("triggers when running part has no matching sync entry by identity", () => { + expect( + findRunningQuestionFallbackSession({ + sessionID: "s", + // sync has an entry, but its tool identity points to a different call + syncQuestions: [syncQ("q_other", "s", { messageID: "m1", callID: "c_other" })], + messages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })] }, + }), + ).toBe("s") + }) + + test("triggers when running parts outnumber matched sync entries (multi-pending parallel)", () => { expect( findRunningQuestionFallbackSession({ sessionID: "s", - hasQuestionRequest: false, - messages: [message("m")], - partsByMessageID: { m: [toolPart("question")] }, + // only q1 matches; q2 q3 are running but unknown to sync + syncQuestions: [syncQ("q1", "s", { messageID: "m1", callID: "c1" })], + messages: [message("m1"), message("m2"), message("m3")], + partsByMessageID: { + m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })], + m2: [toolPart("p2", "question", "running", { messageID: "m2", callID: "c2" })], + m3: [toolPart("p3", "question", "running", { messageID: "m3", callID: "c3" })], + }, }), ).toBe("s") }) @@ -52,21 +84,66 @@ describe("findRunningQuestionFallbackSession", () => { expect( findRunningQuestionFallbackSession({ sessionID: "s", - hasQuestionRequest: false, + syncQuestions: [], messages: [message("m1"), message("m2")], - partsByMessageID: { m1: [toolPart("question", "completed")], m2: [toolPart("todowrite", "running")] }, + partsByMessageID: { + m1: [toolPart("p1", "question", "completed", { messageID: "m1", callID: "c1" })], + m2: [toolPart("p2", "todowrite", "running", { messageID: "m2", callID: "c2" })], + }, + }), + ).toBeUndefined() + }) + + test("triggers for a running question part beyond the legacy 5-message window", () => { + const messages = Array.from({ length: 50 }, (_, i) => message(`m${i}`)) + expect( + findRunningQuestionFallbackSession({ + sessionID: "s", + syncQuestions: [], + messages, + partsByMessageID: { m0: [toolPart("p0", "question", "running", { messageID: "m0", callID: "c0" })] }, + }), + ).toBe("s") + }) + + test("falls back to count check when neither side has tool identity", () => { + expect( + findRunningQuestionFallbackSession({ + sessionID: "s", + // sync entry without tool identity, running part also missing identity + syncQuestions: [syncQ("q1", "s")], + messages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running")] }, }), ).toBeUndefined() }) - test("recovers running question parts even when they are older than the lookback window", () => { + test("count fallback triggers when running-without-identity exceeds entries-without-identity", () => { expect( findRunningQuestionFallbackSession({ sessionID: "s", - hasQuestionRequest: false, - messages: [message("old"), message("recent-1"), message("recent-2")], - partsByMessageID: { old: [toolPart("question")] }, + syncQuestions: [], + messages: [message("m1"), message("m2")], + partsByMessageID: { + m1: [toolPart("p1", "question", "running")], + m2: [toolPart("p2", "question", "running")], + }, }), ).toBe("s") }) + + // Mixed-state guard for #419: when a sync entry lacks tool identity but a + // running part has identity, the legacy entry should still cover it. Pre-fix + // behavior would treat the running part as missing and trigger fallback, + // even though the sync entry was a legitimate (legacy-shaped) match. + test("legacy sync entry without identity absorbs running part with identity", () => { + expect( + findRunningQuestionFallbackSession({ + sessionID: "s", + syncQuestions: [syncQ("q_legacy", "s")], + messages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })] }, + }), + ).toBeUndefined() + }) }) diff --git a/packages/app/src/pages/session/blockers/question-fallback.ts b/packages/app/src/pages/session/blockers/question-fallback.ts index 92cc2510..9df50685 100644 --- a/packages/app/src/pages/session/blockers/question-fallback.ts +++ b/packages/app/src/pages/session/blockers/question-fallback.ts @@ -1,23 +1,55 @@ import type { Message, Part } from "@opencode-ai/sdk/v2" +// Minimal sync-entry shape this matcher needs. Widened from the full +// QuestionRequest so callers (e.g. reverify) can pass narrower generics +// without `as never` while keeping QuestionRequest[] callers happy via +// structural subtyping. +export interface QuestionFallbackEntry { + tool?: { messageID: string; callID: string } +} + +// Triggers fallback recovery when a running question tool part on this session +// has no matching entry in sync. Identity is (messageID, callID) so a model +// emitting parallel question tool calls is covered correctly even when the +// counts happen to line up but the entries point to different tool calls. +// +// Sync entries that lack tool identity (e.g. legacy / seeded test fixtures) +// can't be matched by key, so they cover any one running part. The +// uncovered-with-identity bucket and the without-identity bucket are pooled +// against the entries-without-identity bucket: a fallback only fires when +// the uncovered total truly exceeds what the legacy entries can absorb. +// See #419. export function findRunningQuestionFallbackSession(input: { sessionID?: string - hasQuestionRequest: boolean - messages?: Message[] - partsByMessageID: Record + syncQuestions: ReadonlyArray + messages?: ReadonlyArray + partsByMessageID: Record | undefined> }): string | undefined { if (!input.sessionID) return undefined - if (input.hasQuestionRequest) return undefined const messages = input.messages if (!messages?.length) return undefined - for (let i = messages.length - 1; i >= 0; i--) { - const parts = input.partsByMessageID[messages[i].id] + const coveredKeys = new Set() + let entriesWithoutTool = 0 + for (const q of input.syncQuestions) { + if (q.tool) coveredKeys.add(`${q.tool.messageID}:${q.tool.callID}`) + else entriesWithoutTool++ + } + + let uncoveredRunning = 0 + for (const m of messages) { + const parts = input.partsByMessageID[m.id] if (!parts) continue for (const part of parts) { - if (part.type === "tool" && part.tool === "question" && part.state.status === "running") return input.sessionID + if (part.type !== "tool" || part.tool !== "question" || part.state.status !== "running") continue + const callID = part.callID + const messageID = part.messageID + if (!callID || !messageID || !coveredKeys.has(`${messageID}:${callID}`)) { + uncoveredRunning++ + } } } + if (uncoveredRunning > entriesWithoutTool) return input.sessionID return undefined } diff --git a/packages/app/src/pages/session/blockers/question-recovery-chain.test.ts b/packages/app/src/pages/session/blockers/question-recovery-chain.test.ts new file mode 100644 index 00000000..cc128a69 --- /dev/null +++ b/packages/app/src/pages/session/blockers/question-recovery-chain.test.ts @@ -0,0 +1,308 @@ +import { describe, expect, test } from "bun:test" +import { createRoot, createSignal } from "solid-js" +import type { Message, Part, QuestionRequest, ToolState } from "@opencode-ai/sdk/v2" +import { createQuestionRecoveryClock, HEAL_DELAY_MS } from "./question-recovery-clock" +import { resolveQuestionRecoverySnapshot, type QuestionRecoverySnapshot } from "./question-recovery-snapshot" +import { questionRecoveryReverify, type ReverifyDeps } from "./question-recovery-reverify" + +// End-to-end auto-heal chain: snapshot reducer + clock + reverify wired +// against the same mutable harness state. Tests the recovery contract as a +// whole — snapshot edge → clock arm → reverify → halt or hydrate — instead +// of trusting that three correct pieces compose correctly. See #419. + +interface FakeTimer { + cb: () => void + fireAt: number + cancelled: boolean +} + +function fakeClock() { + let nowMs = 0 + const timers: FakeTimer[] = [] + return { + now: () => nowMs, + advance(by: number) { + nowMs += by + for (const t of timers) { + if (t.cancelled) continue + if (t.fireAt <= nowMs) { + t.cancelled = true + t.cb() + } + } + }, + setTimer: (cb: () => void, ms: number) => { + const t: FakeTimer = { cb, fireAt: nowMs + ms, cancelled: false } + timers.push(t) + return t + }, + clearTimer: (handle: unknown) => { + ;(handle as FakeTimer).cancelled = true + }, + pending: () => timers.filter((t) => !t.cancelled).length, + } +} + +const flush = async () => { + for (let i = 0; i < 5; i++) await Promise.resolve() +} + +const message = (id: string): Message => ({ id }) as Message + +const toolState = (status: ToolState["status"], input: Record = {}): ToolState => + ({ + status, + input, + title: "", + metadata: {}, + time: { start: 0 }, + }) as ToolState + +const runningQuestionPart = (callID: string, messageID: string): Part => + ({ + id: callID, + type: "tool", + tool: "question", + state: toolState("running", { id: "q1" }), + callID, + messageID, + }) as Part + +const SID = "ses_chain" +const DIR = "/dir" + +interface Harness { + // Inputs that drive the snapshot reducer + reverify. + setSyncQuestions: (q: ReadonlyArray) => void + setMessages: (m: ReadonlyArray) => void + setParts: (p: Record>) => void + setBusy: (b: boolean) => void + setActiveSid: (s: string | undefined) => void + setDirectory: (d: string) => void + setListImpl: (impl: () => Promise) => void + setHaltImpl: (impl: () => Promise) => void + // Observed effects. + haltCalls: string[] + hydrationCalls: { sid: string; questions: readonly QuestionRequest[] }[] + warnCalls: { message: string; payload: Record }[] + fk: ReturnType + // Drive snapshot recompute on input change (production goes via memo). + recompute: () => void + dispose: () => void +} + +const setupChain = (initial?: { + syncQuestions?: ReadonlyArray + messages?: ReadonlyArray + parts?: Record> + busy?: boolean + activeSid?: string + directory?: string + listImpl?: () => Promise + haltImpl?: () => Promise +}): Harness => { + const fk = fakeClock() + const haltCalls: string[] = [] + const hydrationCalls: { sid: string; questions: readonly QuestionRequest[] }[] = [] + const warnCalls: { message: string; payload: Record }[] = [] + + let setSync!: (q: ReadonlyArray) => void + let setMsgs!: (m: ReadonlyArray) => void + let setPartsSig!: (p: Record>) => void + let setBusySig!: (b: boolean) => void + let setSid!: (s: string | undefined) => void + let setDir!: (d: string) => void + let recompute!: () => void + + let listImpl: () => Promise = initial?.listImpl ?? (async () => []) + let haltImpl: () => Promise = + initial?.haltImpl ?? + (async () => { + // default + }) + + const dispose = createRoot((d) => { + const [sync, sSync] = createSignal>(initial?.syncQuestions ?? []) + const [msgs, sMsgs] = createSignal>(initial?.messages ?? []) + const [parts, sParts] = createSignal>>(initial?.parts ?? {}) + const [busy, sBusy] = createSignal(initial?.busy ?? true) + const [sid, sSid] = createSignal(initial?.activeSid ?? SID) + const [dir, sDir] = createSignal(initial?.directory ?? DIR) + const [snap, sSnap] = createSignal({ kind: "none" }) + + setSync = sSync + setMsgs = sMsgs + setPartsSig = sParts + setBusySig = sBusy + setSid = sSid + setDir = sDir + + recompute = () => { + const next = resolveQuestionRecoverySnapshot({ + sessionID: sid(), + sessionTreeQuestionRequest: undefined, + activeSessionSyncQuestions: sync(), + activeSessionMessages: msgs(), + partsByMessageID: parts(), + }) + sSnap(next) + clock.tick() + } + + const reverifyDeps: ReverifyDeps = { + snapshot: snap, + activeSessionID: sid, + activeDirectory: dir, + isSessionBusy: () => busy(), + listQuestions: () => listImpl(), + messagesFor: () => msgs(), + partsByMessageID: () => parts(), + applyHydration: (s, qs) => hydrationCalls.push({ sid: s, questions: qs }), + warn: (m, p) => warnCalls.push({ message: m, payload: p }), + } + + const clock = createQuestionRecoveryClock({ + snapshot: snap, + activeSessionID: sid, + activeDirectory: dir, + halt: async (s) => { + haltCalls.push(s) + return haltImpl() + }, + reverify: (s, ctx) => questionRecoveryReverify(reverifyDeps, s, ctx), + now: fk.now, + setTimer: fk.setTimer, + clearTimer: fk.clearTimer, + warn: (m, p) => warnCalls.push({ message: m, payload: p }), + }) + + return d + }) + + // Initialise snapshot once before tests interact. + recompute() + + return { + setSyncQuestions: (q) => { + setSync(q) + recompute() + }, + setMessages: (m) => { + setMsgs(m) + recompute() + }, + setParts: (p) => { + setPartsSig(p) + recompute() + }, + setBusy: setBusySig, + setActiveSid: (s) => { + setSid(s) + recompute() + }, + setDirectory: (d) => { + setDir(d) + recompute() + }, + setListImpl: (impl) => { + listImpl = impl + }, + setHaltImpl: (impl) => { + haltImpl = impl + }, + haltCalls, + hydrationCalls, + warnCalls, + fk, + recompute, + dispose, + } +} + +describe("question recovery chain", () => { + test("missingRunning edge → reverify (still uncovered) → halt fires", async () => { + const h = setupChain() + + // Drop into missingRunning: assistant message has a running question + // part with no covering sync entry. + h.setMessages([message("m1")]) + h.setParts({ m1: [runningQuestionPart("c1", "m1")] }) + expect(h.fk.pending()).toBe(1) + + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual([SID]) + expect(h.hydrationCalls).toEqual([]) + h.dispose() + }) + + test("server hydrates the missing question before fire → reverify writes it back, halt skipped", async () => { + const covering = { + id: "q1", + sessionID: SID, + tool: { messageID: "m1", callID: "c1" }, + } as unknown as QuestionRequest + const h = setupChain() + h.setMessages([message("m1")]) + h.setParts({ m1: [runningQuestionPart("c1", "m1")] }) + expect(h.fk.pending()).toBe(1) + + // Server now reports the covering question — reverify will hydrate sync + // and refuse to halt because the running part is no longer uncovered. + h.setListImpl(async () => [covering]) + + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual([]) + expect(h.hydrationCalls).toHaveLength(1) + expect(h.hydrationCalls[0]).toEqual({ sid: SID, questions: [covering] }) + h.dispose() + }) + + test("transient list() failure → bounded retry → recovery on follow-up halts", async () => { + let attempts = 0 + const h = setupChain() + h.setListImpl(async () => { + attempts++ + if (attempts === 1) throw new Error("server blip") + return [] + }) + h.setMessages([message("m1")]) + h.setParts({ m1: [runningQuestionPart("c1", "m1")] }) + + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(attempts).toBe(1) + expect(h.haltCalls).toEqual([]) + expect(h.fk.pending()).toBe(1) + + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(attempts).toBe(2) + expect(h.haltCalls).toEqual([SID]) + h.dispose() + }) + + test("session leaves missingRunning before timer fires → halt skipped, no hydration", async () => { + const covering = { + id: "q1", + sessionID: SID, + tool: { messageID: "m1", callID: "c1" }, + } as unknown as QuestionRequest + const h = setupChain() + h.setMessages([message("m1")]) + h.setParts({ m1: [runningQuestionPart("c1", "m1")] }) + expect(h.fk.pending()).toBe(1) + + // Sync receives the covering entry before the timer expires — snapshot + // flips to "none" and the clock cancels its timer. + h.setSyncQuestions([covering]) + expect(h.fk.pending()).toBe(0) + + h.fk.advance(HEAL_DELAY_MS * 2) + await flush() + expect(h.haltCalls).toEqual([]) + expect(h.hydrationCalls).toEqual([]) + h.dispose() + }) +}) diff --git a/packages/app/src/pages/session/blockers/question-recovery-clock.test.ts b/packages/app/src/pages/session/blockers/question-recovery-clock.test.ts new file mode 100644 index 00000000..e6f9bb3c --- /dev/null +++ b/packages/app/src/pages/session/blockers/question-recovery-clock.test.ts @@ -0,0 +1,318 @@ +import { describe, expect, test } from "bun:test" +import { createRoot, createSignal } from "solid-js" +import { createQuestionRecoveryClock, HEAL_DELAY_MS, MAX_RETRIES } from "./question-recovery-clock" +import type { QuestionRecoverySnapshot } from "./question-recovery-snapshot" + +interface FakeTimer { + cb: () => void + fireAt: number + cancelled: boolean +} + +function fakeClock() { + let nowMs = 0 + const timers: FakeTimer[] = [] + return { + now: () => nowMs, + advance(by: number) { + nowMs += by + for (const t of timers) { + if (t.cancelled) continue + if (t.fireAt <= nowMs) { + t.cancelled = true + t.cb() + } + } + }, + setTimer: (cb: () => void, ms: number) => { + const t: FakeTimer = { cb, fireAt: nowMs + ms, cancelled: false } + timers.push(t) + return t + }, + clearTimer: (handle: unknown) => { + ;(handle as FakeTimer).cancelled = true + }, + pending: () => timers.filter((t) => !t.cancelled).length, + } +} + +const flush = async () => { + for (let i = 0; i < 5; i++) await Promise.resolve() +} + +const ready: QuestionRecoverySnapshot = { kind: "ready" } +const none: QuestionRecoverySnapshot = { kind: "none" } +const missing: QuestionRecoverySnapshot = { kind: "missingRunning" } + +interface Harness { + clock: ReturnType + setSnap: (s: QuestionRecoverySnapshot) => void + setSid: (s: string | undefined) => void + fk: ReturnType + haltCalls: string[] + warnCalls: { message: string; payload: Record }[] + haltImpl?: (s: string) => Promise + reverifyImpl?: () => Promise<{ proceed: true } | { proceed: false; retry?: boolean }> + dispose: () => void +} + +const setupHarness = (overrides?: { + haltImpl?: (s: string) => Promise + reverifyImpl?: (sid: string) => Promise<{ proceed: true } | { proceed: false; retry?: boolean }> + delayMs?: number + initialSid?: string +}): Harness => { + const fk = fakeClock() + const haltCalls: string[] = [] + const warnCalls: { message: string; payload: Record }[] = [] + let setSnap!: (s: QuestionRecoverySnapshot) => void + let setSid!: (s: string | undefined) => void + let clock!: ReturnType + const dispose = createRoot((d) => { + const [snap, setS] = createSignal(none) + const [sid, setSidSignal] = createSignal(overrides?.initialSid ?? "s") + setSnap = (s) => { + setS(s) + clock.tick() + } + setSid = (s) => { + setSidSignal(s) + clock.tick() + } + clock = createQuestionRecoveryClock({ + snapshot: snap, + activeSessionID: sid, + activeDirectory: () => "/dir", + halt: + overrides?.haltImpl ?? + (async (s: string) => { + haltCalls.push(s) + }), + reverify: overrides?.reverifyImpl ?? (async () => ({ proceed: true })), + delayMs: overrides?.delayMs ?? HEAL_DELAY_MS, + now: fk.now, + setTimer: fk.setTimer, + clearTimer: fk.clearTimer, + warn: (m, p) => warnCalls.push({ message: m, payload: p }), + }) + return d + }) + return { clock, setSnap, setSid, fk, haltCalls, warnCalls, dispose } +} + +describe("createQuestionRecoveryClock", () => { + test("transition to missingRunning arms a timer; halt called after delay", async () => { + const h = setupHarness() + h.setSnap(missing) + expect(h.fk.pending()).toBe(1) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual(["s"]) + h.dispose() + }) + + test("transition back to ready before fire clears the timer; halt not called", async () => { + const h = setupHarness() + h.setSnap(missing) + h.setSnap(ready) + h.fk.advance(HEAL_DELAY_MS + 100) + await flush() + expect(h.haltCalls).toEqual([]) + h.dispose() + }) + + test("transition back to none before fire clears the timer; halt not called", async () => { + const h = setupHarness() + h.setSnap(missing) + h.setSnap(none) + h.fk.advance(HEAL_DELAY_MS + 100) + await flush() + expect(h.haltCalls).toEqual([]) + h.dispose() + }) + + test("reverify proceed=false → halt NOT called", async () => { + const h = setupHarness({ reverifyImpl: async () => ({ proceed: false }) }) + h.setSnap(missing) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual([]) + h.dispose() + }) + + // Transient reverify failure: when reverify returns proceed:false with + // retry:true, the clock re-arms exactly one follow-up timer so a sticky + // stuck session does not dead-end on a single list() blip. + test("reverify proceed=false + retry=true re-arms one follow-up timer", async () => { + let attempts = 0 + const h = setupHarness({ + reverifyImpl: async () => { + attempts++ + return attempts === 1 ? { proceed: false, retry: true } : { proceed: true } + }, + }) + h.setSnap(missing) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(attempts).toBe(1) + expect(h.haltCalls).toEqual([]) + expect(h.fk.pending()).toBe(1) + + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(attempts).toBe(2) + expect(h.haltCalls).toEqual(["s"]) + h.dispose() + }) + + // Bounded retry: persistent transient failures retry up to MAX_RETRIES, + // then escalate to halt. Leaving the user stuck on a hidden blocker is + // worse than a conservative halt they could have triggered manually. + test("reverify retry=true exhausts MAX_RETRIES then escalates to halt", async () => { + let attempts = 0 + const h = setupHarness({ + reverifyImpl: async () => { + attempts++ + return { proceed: false, retry: true } + }, + }) + h.setSnap(missing) + + // Drive initial fire + MAX_RETRIES follow-ups. After each tick we should + // see a follow-up timer queued for the next round, until the budget hits. + for (let i = 0; i < MAX_RETRIES; i++) { + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(attempts).toBe(i + 1) + expect(h.fk.pending()).toBe(1) + expect(h.haltCalls).toEqual([]) + } + + // The (MAX_RETRIES + 1)-th attempt returns retry=true but the budget is + // now exhausted — clock warns and escalates to halt instead of leaving + // the user stuck on a hidden blocker. + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(attempts).toBe(MAX_RETRIES + 1) + expect(h.haltCalls).toEqual(["s"]) + expect(h.fk.pending()).toBe(0) + expect(h.warnCalls.some((w) => w.message.includes("escalating to halt"))).toBe(true) + h.dispose() + }) + + // Non-retry proceed=false (e.g. guard failed, server says still covered) + // must NOT trigger any retry/halt — only retry=true counts toward escalation. + test("reverify proceed=false retry=false → no retry, no halt", async () => { + let attempts = 0 + const h = setupHarness({ + reverifyImpl: async () => { + attempts++ + return { proceed: false } + }, + }) + h.setSnap(missing) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(attempts).toBe(1) + expect(h.haltCalls).toEqual([]) + expect(h.fk.pending()).toBe(0) + h.dispose() + }) + + test("halt() throws → structured warn logged, map entry deleted, no retry", async () => { + let haltCount = 0 + const h = setupHarness({ + haltImpl: async () => { + haltCount++ + throw new Error("halt boom") + }, + }) + h.setSnap(missing) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(haltCount).toBe(1) + expect(h.warnCalls.length).toBe(1) + expect(h.warnCalls[0].message).toContain("halt failed") + expect(h.warnCalls[0].payload).toMatchObject({ sessionID: "s", directory: "/dir" }) + expect(typeof h.warnCalls[0].payload.armedAt).toBe("number") + expect(typeof h.warnCalls[0].payload.firedAt).toBe("number") + expect(h.clock.pendingFor("s")).toBe(false) + h.dispose() + }) + + test("snapshot stays missingRunning after fire → no re-arm (v5 P2-2 lock)", async () => { + const h = setupHarness() + h.setSnap(missing) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual(["s"]) + + // Same-kind set after fire must not arm a second timer. + h.setSnap(missing) + h.fk.advance(HEAL_DELAY_MS * 2) + await flush() + expect(h.haltCalls).toEqual(["s"]) + + // After leaving and returning, a fresh edge re-arms. + h.setSnap(none) + h.setSnap(missing) + expect(h.fk.pending()).toBe(1) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual(["s", "s"]) + h.dispose() + }) + + test("reverify threw → warn logged, halt NOT called", async () => { + const h = setupHarness({ + reverifyImpl: async () => { + throw new Error("reverify boom") + }, + }) + h.setSnap(missing) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual([]) + expect(h.warnCalls.length).toBe(1) + expect(h.warnCalls[0].message).toContain("reverify threw") + expect(h.warnCalls[0].payload).toMatchObject({ sessionID: "s" }) + h.dispose() + }) + + test("dispose clears all pending timers", () => { + const h = setupHarness() + h.setSnap(missing) + expect(h.fk.pending()).toBe(1) + h.dispose() + expect(h.fk.pending()).toBe(0) + }) + + // Navigation cleanup: switching active session must drop the previous + // session's pending timer AND its lastSeen entry, so coming back to a + // still-stuck session re-arms cleanly. Locks the bug Codex flagged where + // lastSeen[old] stayed missingRunning forever. + test("session navigation forgets the old session's pending timer and edge state", async () => { + const h = setupHarness({ initialSid: "a" }) + h.setSnap(missing) + expect(h.fk.pending()).toBe(1) + + // Navigate to b. In production b's snapshot is per-session; here we + // simulate by flipping snap to none on the navigation tick. + h.setSid("b") + h.setSnap(none) + expect(h.fk.pending()).toBe(0) + h.fk.advance(HEAL_DELAY_MS * 2) + await flush() + expect(h.haltCalls).toEqual([]) + + // Come back to a with snapshot still missingRunning — must re-arm fresh + // even though lastSeen[a] would otherwise still read missingRunning. + h.setSid("a") + h.setSnap(missing) + expect(h.fk.pending()).toBe(1) + h.fk.advance(HEAL_DELAY_MS) + await flush() + expect(h.haltCalls).toEqual(["a"]) + h.dispose() + }) +}) diff --git a/packages/app/src/pages/session/blockers/question-recovery-clock.ts b/packages/app/src/pages/session/blockers/question-recovery-clock.ts new file mode 100644 index 00000000..8627cd94 --- /dev/null +++ b/packages/app/src/pages/session/blockers/question-recovery-clock.ts @@ -0,0 +1,200 @@ +import { createEffect, onCleanup } from "solid-js" +import type { QuestionRecoverySnapshot } from "./question-recovery-snapshot" + +export const HEAL_DELAY_MS = 3_000 +// Max follow-up attempts after the initial fire on the same arm. A fresh +// snapshot edge or session navigation resets this budget. See spec v6 R16. +export const MAX_RETRIES = 3 + +export interface ReverifyContext { + armedAt: number + armedDirectory: string + firedAt: number +} + +// `retry` lets reverify ask the clock to re-arm a single follow-up timer +// without waiting for a snapshot edge — used for transient failures +// (e.g. server question.list() blip) so a sticky stuck session does not +// dead-end on a single error. +export type ReverifyOutcome = { proceed: true } | { proceed: false; retry?: boolean } + +export interface ClockInput { + snapshot: () => QuestionRecoverySnapshot + activeSessionID: () => string | undefined + activeDirectory: () => string + halt: (sessionID: string) => Promise + reverify: (sessionID: string, ctx: ReverifyContext) => Promise + delayMs?: number + now?: () => number + setTimer?: (cb: () => void, ms: number) => unknown + clearTimer?: (handle: unknown) => void + warn?: (message: string, payload: Record) => void +} + +export interface Clock { + dispose: () => void + pendingFor: (sessionID: string) => boolean + // Test-only: drive a step manually. In production, createEffect calls this. + tick: () => void +} + +interface PendingEntry { + handle: unknown + armedAt: number + armedDirectory: string + retries: number +} + +// Auto-heal clock. Edge-triggered arming (transition INTO missingRunning), +// at-most-once fire per arm (map entry deleted before any await), all guards +// run inside consumer-supplied reverify. See spec v6. +// +// Production wires the clock's tick() into createEffect; tests call tick() +// directly because the SSR build of solid-js used in unit tests does not +// propagate signal updates through effects. +export function createQuestionRecoveryClock(input: ClockInput): Clock { + const delayMs = input.delayMs ?? HEAL_DELAY_MS + const now = input.now ?? (() => Date.now()) + const setTimer = + input.setTimer ?? + ((cb: () => void, ms: number) => setTimeout(cb, ms) as unknown as ReturnType) + const clearTimer = + input.clearTimer ?? ((handle: unknown) => clearTimeout(handle as ReturnType)) + const warn = input.warn ?? ((m, p) => console.warn(m, p)) + + const pending = new Map() + const lastSeen = new Map() + let lastActiveSid: string | undefined + let disposed = false + + const cancelFor = (sessionID: string) => { + const entry = pending.get(sessionID) + if (!entry) return + clearTimer(entry.handle) + pending.delete(sessionID) + } + + const forget = (sessionID: string) => { + cancelFor(sessionID) + lastSeen.delete(sessionID) + } + + const fire = async (sessionID: string) => { + const entry = pending.get(sessionID) + if (!entry) return + pending.delete(sessionID) + if (disposed) return + + const ctx: ReverifyContext = { + armedAt: entry.armedAt, + armedDirectory: entry.armedDirectory, + firedAt: now(), + } + let outcome: ReverifyOutcome + try { + outcome = await input.reverify(sessionID, ctx) + } catch (err) { + warn("question-recovery: reverify threw", { + sessionID, + directory: entry.armedDirectory, + armedAt: entry.armedAt, + firedAt: ctx.firedAt, + err, + }) + return + } + if (disposed) return + if (!outcome.proceed) { + // Bounded retry: up to MAX_RETRIES follow-up attempts per arm. After + // the budget is exhausted, escalate to halt — leaving the user stuck + // on a hidden blocker is worse than a conservative halt they could + // have triggered manually anyway. Fresh snapshot edge or session + // navigation still resets the budget for a new arm. + if (outcome.retry && input.activeSessionID() === sessionID) { + if (entry.retries >= MAX_RETRIES) { + warn("question-recovery: retry budget exhausted, escalating to halt", { + sessionID, + directory: entry.armedDirectory, + armedAt: entry.armedAt, + firedAt: ctx.firedAt, + retries: entry.retries, + }) + // fall through to halt below + } else { + const handle = setTimer(() => { + void fire(sessionID) + }, delayMs) + pending.set(sessionID, { + handle, + armedAt: now(), + armedDirectory: entry.armedDirectory, + retries: entry.retries + 1, + }) + return + } + } else { + return + } + } + + try { + await input.halt(sessionID) + } catch (err) { + warn("question-recovery: halt failed", { + sessionID, + directory: entry.armedDirectory, + armedAt: entry.armedAt, + firedAt: ctx.firedAt, + err, + }) + } + } + + const tick = () => { + if (disposed) return + const sid = input.activeSessionID() + const snap = input.snapshot() + + // Session navigation: drop the previous session's pending timer and + // edge state so coming back to a still-stuck session re-arms cleanly + // instead of hitting a stale lastSeen=missingRunning entry. This also + // bounds lastSeen to at most one entry at any time. + if (lastActiveSid && lastActiveSid !== sid) forget(lastActiveSid) + lastActiveSid = sid + + if (!sid) return + + const previousKind = lastSeen.get(sid) + lastSeen.set(sid, snap.kind) + + if (snap.kind === "missingRunning") { + if (previousKind === "missingRunning") return + if (pending.has(sid)) return + const armedAt = now() + const armedDirectory = input.activeDirectory() + const handle = setTimer(() => { + void fire(sid) + }, delayMs) + pending.set(sid, { handle, armedAt, armedDirectory, retries: 0 }) + return + } + + cancelFor(sid) + } + + const disposeAll = () => { + disposed = true + for (const entry of pending.values()) clearTimer(entry.handle) + pending.clear() + lastSeen.clear() + } + + createEffect(tick) + onCleanup(disposeAll) + + return { + dispose: disposeAll, + pendingFor: (sessionID: string) => pending.has(sessionID), + tick, + } +} diff --git a/packages/app/src/pages/session/blockers/question-recovery-reverify.test.ts b/packages/app/src/pages/session/blockers/question-recovery-reverify.test.ts new file mode 100644 index 00000000..9fd67e67 --- /dev/null +++ b/packages/app/src/pages/session/blockers/question-recovery-reverify.test.ts @@ -0,0 +1,190 @@ +import { describe, expect, test } from "bun:test" +import type { Message, Part } from "@opencode-ai/sdk/v2" +import { questionRecoveryReverify, type ReverifyDeps } from "./question-recovery-reverify" +import type { ReverifyContext } from "./question-recovery-clock" +import type { QuestionRecoverySnapshot } from "./question-recovery-snapshot" + +interface FakeQuestion { + id: string + sessionID: string + tool?: { messageID: string; callID: string } +} + +const ctx: ReverifyContext = { + armedAt: 0, + armedDirectory: "/dir", + firedAt: 100, +} + +const missing: QuestionRecoverySnapshot = { kind: "missingRunning" } +const ready: QuestionRecoverySnapshot = { kind: "ready" } + +// Test fixtures are minimal stand-ins; cast to the SDK shapes at the harness +// boundary so tests stay terse but the deps signature still enforces the +// real ReadonlyArray / ReadonlyArray contract on production. +interface HarnessOpts { + snapshot?: QuestionRecoverySnapshot + activeSid?: string + directory?: string + busy?: boolean + list?: () => Promise + messages?: ReadonlyArray<{ id: string; role: string }> + parts?: Record | undefined> +} + +const setup = (opts: HarnessOpts = {}) => { + const hydrated: { sid: string; questions: readonly FakeQuestion[] }[] = [] + const warns: { msg: string; payload: Record }[] = [] + const deps: ReverifyDeps = { + snapshot: () => opts.snapshot ?? missing, + activeSessionID: () => opts.activeSid ?? "s", + activeDirectory: () => opts.directory ?? "/dir", + isSessionBusy: () => opts.busy ?? true, + listQuestions: opts.list ?? (async () => []), + messagesFor: () => opts.messages as ReadonlyArray | undefined, + partsByMessageID: () => (opts.parts ?? {}) as Record | undefined>, + applyHydration: (sid, questions) => { + hydrated.push({ sid, questions }) + }, + warn: (msg, payload) => warns.push({ msg, payload }), + } + return { deps, hydrated, warns } +} + +describe("questionRecoveryReverify", () => { + test("guard 1: snapshot moved off missingRunning → proceed:false, no list", async () => { + let called = false + const { deps } = setup({ + snapshot: ready, + list: async () => { + called = true + return [] + }, + }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: false }) + expect(called).toBe(false) + }) + + test("guard 2: active session changed → proceed:false", async () => { + const { deps } = setup({ activeSid: "other" }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: false }) + }) + + test("guard 2: directory drifted from armedDirectory → proceed:false", async () => { + const { deps } = setup({ directory: "/other" }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: false }) + }) + + test("guard 3: session no longer busy → proceed:false", async () => { + const { deps } = setup({ busy: false }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: false }) + }) + + test("list() throws → proceed:false + retry:true (one bounded follow-up)", async () => { + const { deps, warns } = setup({ + list: async () => { + throw new Error("network down") + }, + }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: false, retry: true }) + expect(warns).toHaveLength(1) + expect(warns[0].msg).toContain("question.list() failed") + }) + + test("post-await guard re-check: snapshot flipped during list → proceed:false", async () => { + let snap: QuestionRecoverySnapshot = missing + const { deps } = setup({ + list: async () => { + snap = ready + return [] + }, + }) + deps.snapshot = () => snap + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: false }) + }) + + // Running parts use top-level `callID/messageID` to match the real ToolPart + // SDK shape — fallback reads them at part level, not from `state`. + const runningQuestionPart = (callID: string, messageID: string) => ({ + type: "tool", + tool: "question", + state: { status: "running", input: { id: "q1" } }, + callID, + messageID, + }) + + test("server returns no covering question → proceed:true (halt is licensed)", async () => { + // syncQuestions empty + a running question part means fallback finds the + // session as "still uncovered", so the clock is allowed to halt. + const { deps, hydrated } = setup({ + list: async () => [], + messages: [{ id: "m1", role: "assistant" }], + parts: { m1: [runningQuestionPart("c1", "m1")] }, + }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: true }) + // No hydration when server has nothing to write back. + expect(hydrated).toEqual([]) + }) + + test("server now covers the question → hydrate sync + proceed:false", async () => { + // Server returns a question whose tool.(messageID, callID) matches the + // running part — fallback now finds it covered. + const covering: FakeQuestion = { + id: "q1", + sessionID: "s", + tool: { messageID: "m1", callID: "c1" }, + } + const { deps, hydrated } = setup({ + list: async () => [covering], + messages: [{ id: "m1", role: "assistant" }], + parts: { m1: [runningQuestionPart("c1", "m1")] }, + }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: false }) + expect(hydrated).toHaveLength(1) + expect(hydrated[0]).toEqual({ sid: "s", questions: [covering] }) + }) + + test("list returns questions for other sessions only → still uncovered → proceed:true", async () => { + const other: FakeQuestion = { + id: "q2", + sessionID: "other-sid", + tool: { messageID: "m1", callID: "c1" }, + } + const { deps, hydrated } = setup({ + list: async () => [other], + messages: [{ id: "m1", role: "assistant" }], + parts: { m1: [runningQuestionPart("c1", "m1")] }, + }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: true }) + expect(hydrated).toEqual([]) + }) + + // Identity match means the *exact* (messageID, callID) pair, not just the + // session. Server may legitimately have other questions for the same + // session that point at a different tool call; halt must still be licensed + // because the running part remains uncovered. + test("server returns same-session question with mismatched callID → proceed:true", async () => { + const sameSessionWrongCall: FakeQuestion = { + id: "q-other", + sessionID: "s", + tool: { messageID: "m1", callID: "different-call" }, + } + const { deps, hydrated } = setup({ + list: async () => [sameSessionWrongCall], + messages: [{ id: "m1", role: "assistant" }], + parts: { m1: [runningQuestionPart("c1", "m1")] }, + }) + const result = await questionRecoveryReverify(deps, "s", ctx) + expect(result).toEqual({ proceed: true }) + expect(hydrated).toEqual([]) + }) +}) diff --git a/packages/app/src/pages/session/blockers/question-recovery-reverify.ts b/packages/app/src/pages/session/blockers/question-recovery-reverify.ts new file mode 100644 index 00000000..26db7967 --- /dev/null +++ b/packages/app/src/pages/session/blockers/question-recovery-reverify.ts @@ -0,0 +1,59 @@ +import type { Message, Part } from "@opencode-ai/sdk/v2" +import type { ReverifyContext, ReverifyOutcome } from "./question-recovery-clock" +import type { QuestionRecoverySnapshot } from "./question-recovery-snapshot" +import { findRunningQuestionFallbackSession } from "./question-fallback" + +export interface ReverifyDeps { + snapshot: () => QuestionRecoverySnapshot + activeSessionID: () => string | undefined + activeDirectory: () => string + isSessionBusy: (sessionID: string) => boolean + listQuestions: () => Promise + partsByMessageID: () => Record | undefined> + messagesFor: (sessionID: string) => ReadonlyArray | undefined + applyHydration: (sessionID: string, questions: readonly Q[]) => void + warn?: (message: string, payload: Record) => void +} + +// Reverify glue used by `createSessionBlockers`. Lives here as a pure +// function so it can be unit-tested without standing up the full provider +// tree (sdk + sync + permission + language). The four guards run in order: +// 1. snapshot still missingRunning +// 2. active session and directory unchanged since arm +// 3. session still busy +// 4. server confirms the running question part is still uncovered +// Transient list() failures ask the clock for one bounded follow-up. +export async function questionRecoveryReverify< + Q extends { sessionID: string; tool?: { messageID: string; callID: string }; id?: string }, +>(deps: ReverifyDeps, sessionID: string, ctx: ReverifyContext): Promise { + const localGuards = () => { + if (deps.snapshot().kind !== "missingRunning") return false + if (deps.activeSessionID() !== sessionID) return false + if (deps.activeDirectory() !== ctx.armedDirectory) return false + if (!deps.isSessionBusy(sessionID)) return false + return true + } + if (!localGuards()) return { proceed: false } + + let filtered: readonly Q[] + try { + const all = await deps.listQuestions() + filtered = all.filter((q) => q.sessionID === sessionID) + } catch (err) { + deps.warn?.("question-recovery: question.list() failed", { sessionID, err }) + return { proceed: false, retry: true } + } + + if (!localGuards()) return { proceed: false } + + const stillUncovered = findRunningQuestionFallbackSession({ + sessionID, + syncQuestions: filtered, + messages: deps.messagesFor(sessionID), + partsByMessageID: deps.partsByMessageID(), + }) + if (stillUncovered === sessionID) return { proceed: true } + + deps.applyHydration(sessionID, filtered) + return { proceed: false } +} diff --git a/packages/app/src/pages/session/blockers/question-recovery-snapshot.test.ts b/packages/app/src/pages/session/blockers/question-recovery-snapshot.test.ts new file mode 100644 index 00000000..8a2986b5 --- /dev/null +++ b/packages/app/src/pages/session/blockers/question-recovery-snapshot.test.ts @@ -0,0 +1,142 @@ +import { describe, expect, test } from "bun:test" +import type { Message, Part, QuestionRequest, ToolState } from "@opencode-ai/sdk/v2" +import { resolveQuestionRecoverySnapshot } from "./question-recovery-snapshot" + +const message = (id: string): Message => ({ id }) as Message + +const toolState = (status: ToolState["status"]): ToolState => + ({ + status, + input: {}, + title: "", + metadata: {}, + time: { start: 0 }, + }) as ToolState + +const toolPart = ( + id: string, + tool: string, + status: ToolState["status"] = "running", + attrs?: { messageID?: string; callID?: string }, +): Part => + ({ + id, + type: "tool", + tool, + state: toolState(status), + messageID: attrs?.messageID, + callID: attrs?.callID, + }) as Part + +const syncQ = (id: string, sessionID: string, tool?: { messageID: string; callID: string }): QuestionRequest => + ({ + id, + sessionID, + questions: [{ header: "h", question: "q", options: [] }], + tool, + }) as QuestionRequest + +describe("resolveQuestionRecoverySnapshot", () => { + test("none when no sessionID", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: undefined, + sessionTreeQuestionRequest: undefined, + activeSessionSyncQuestions: [], + activeSessionMessages: undefined, + partsByMessageID: {}, + }), + ).toEqual({ kind: "none" }) + }) + + test("none when no running question and no sync entry", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: "s", + sessionTreeQuestionRequest: undefined, + activeSessionSyncQuestions: [], + activeSessionMessages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "todowrite", "running")] }, + }), + ).toEqual({ kind: "none" }) + }) + + test("ready when tree-walked question request resolves (active session)", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: "s", + sessionTreeQuestionRequest: { id: "q1" }, + activeSessionSyncQuestions: [syncQ("q1", "s", { messageID: "m1", callID: "c1" })], + activeSessionMessages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })] }, + }), + ).toEqual({ kind: "ready" }) + }) + + // P2-3 lock: parent ready continues to surface via tree-walked request even + // though the active session is the child. + test("ready when running part lives in parent session and sync has matching entry", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: "child", + sessionTreeQuestionRequest: { id: "q_parent" }, + activeSessionSyncQuestions: [], + activeSessionMessages: [], + partsByMessageID: {}, + }), + ).toEqual({ kind: "ready" }) + }) + + test("missingRunning when active session has running question with no matching sync identity", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: "s", + sessionTreeQuestionRequest: undefined, + activeSessionSyncQuestions: [], + activeSessionMessages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })] }, + }), + ).toEqual({ kind: "missingRunning" }) + }) + + // P1-2 lock: legacy sync entry without tool identity covers one running + // part. Snapshot must not raise missingRunning here, matching fallback. + test("not missingRunning when legacy sync entry without identity covers running part", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: "s", + sessionTreeQuestionRequest: undefined, + activeSessionSyncQuestions: [syncQ("q_legacy", "s")], + activeSessionMessages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })] }, + }), + ).toEqual({ kind: "none" }) + }) + + // P2-3 lock: subagents cannot use the question tool today; missingRunning + // detection only looks at the active session. Cross-session running parts + // are intentionally ignored until subagents gain question access. + test("missingRunning is NOT raised when running part lives in a parent (asymmetric by design)", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: "child", + sessionTreeQuestionRequest: undefined, + activeSessionSyncQuestions: [], + activeSessionMessages: [], + partsByMessageID: { m_parent: [toolPart("p", "question", "running", { messageID: "m_parent", callID: "c1" })] }, + }), + ).toEqual({ kind: "none" }) + }) + + test("ready takes precedence over missingRunning when both could apply", () => { + expect( + resolveQuestionRecoverySnapshot({ + sessionID: "s", + sessionTreeQuestionRequest: { id: "q1" }, + activeSessionSyncQuestions: [], + activeSessionMessages: [message("m1")], + partsByMessageID: { m1: [toolPart("p1", "question", "running", { messageID: "m1", callID: "c1" })] }, + }), + ).toEqual({ kind: "ready" }) + }) +}) diff --git a/packages/app/src/pages/session/blockers/question-recovery-snapshot.ts b/packages/app/src/pages/session/blockers/question-recovery-snapshot.ts new file mode 100644 index 00000000..eedcc806 --- /dev/null +++ b/packages/app/src/pages/session/blockers/question-recovery-snapshot.ts @@ -0,0 +1,32 @@ +import type { Message, Part, QuestionRequest } from "@opencode-ai/sdk/v2" +import { findRunningQuestionFallbackSession } from "./question-fallback" + +export type QuestionRecoverySnapshot = + | { kind: "none" } + | { kind: "ready" } + | { kind: "missingRunning" } + +export interface ResolveSnapshotInput { + sessionID: string | undefined + sessionTreeQuestionRequest: unknown + activeSessionSyncQuestions: ReadonlyArray + activeSessionMessages: ReadonlyArray | undefined + partsByMessageID: Record | undefined> +} + +// Pure reducer: drives the auto-heal clock. Delegates missingRunning detection +// to findRunningQuestionFallbackSession so identity matching + legacy bucket +// pooling stay in lockstep with the existing fallback (#419 / PR #430). +export function resolveQuestionRecoverySnapshot(input: ResolveSnapshotInput): QuestionRecoverySnapshot { + if (!input.sessionID) return { kind: "none" } + if (input.sessionTreeQuestionRequest) return { kind: "ready" } + + const fallbackSessionID = findRunningQuestionFallbackSession({ + sessionID: input.sessionID, + syncQuestions: input.activeSessionSyncQuestions, + messages: input.activeSessionMessages, + partsByMessageID: input.partsByMessageID, + }) + if (fallbackSessionID === input.sessionID) return { kind: "missingRunning" } + return { kind: "none" } +} diff --git a/packages/app/src/pages/session/blockers/use-session-blockers.ts b/packages/app/src/pages/session/blockers/use-session-blockers.ts index 9efbd33b..29dab180 100644 --- a/packages/app/src/pages/session/blockers/use-session-blockers.ts +++ b/packages/app/src/pages/session/blockers/use-session-blockers.ts @@ -5,12 +5,19 @@ import { useLanguage } from "@/context/language" import { usePermission } from "@/context/permission" import { useSDK } from "@/context/sdk" import { useSync } from "@/context/sync" +import { isSessionRunning } from "@/pages/session/session-running-state" import { findRunningQuestionFallbackSession } from "./question-fallback" +import { createQuestionRecoveryClock } from "./question-recovery-clock" +import { questionRecoveryReverify } from "./question-recovery-reverify" +import { resolveQuestionRecoverySnapshot } from "./question-recovery-snapshot" import { createQuestionRefetchRunner } from "./question-refetch-runner" import { refetchPendingQuestionsForSession } from "./question-reconcile" import { sessionPermissionRequest, sessionQuestionRequest } from "./request-tree" -export function createSessionBlockers(input: { sessionID: () => string | undefined }) { +export function createSessionBlockers(input: { + sessionID: () => string | undefined + halt?: (sessionID: string) => Promise +}) { const sdk = useSDK() const sync = useSync() const language = useLanguage() @@ -29,7 +36,10 @@ export function createSessionBlockers(input: { sessionID: () => string | undefin const sessionID = activeSessionID() return findRunningQuestionFallbackSession({ sessionID, - hasQuestionRequest: !!questionRequest(), + // Pass the per-session entries so fallback can match by (messageID, + // callID) — not the tree-walked sessionQuestionRequest result, which + // would mask local multi-pending loss. See #419. + syncQuestions: sessionID ? (sync.data.question[sessionID] ?? []) : [], messages: sessionID ? sync.data.message[sessionID] : undefined, partsByMessageID: sync.data.part, }) @@ -61,6 +71,59 @@ export function createSessionBlockers(input: { sessionID: () => string | undefin }), ) + // Auto-heal: detect hidden question blockers (running question part but no + // sync coverage) and halt the stuck session after a short delay. The + // composer keeps `recoveringQuestion()` so existing UI/refetch behavior is + // unchanged; the clock fires only as a last-resort cleanup. + const recoverySnapshot = createMemo(() => { + const sessionID = activeSessionID() + return resolveQuestionRecoverySnapshot({ + sessionID, + sessionTreeQuestionRequest: questionRequest(), + activeSessionSyncQuestions: sessionID ? (sync.data.question[sessionID] ?? []) : [], + activeSessionMessages: sessionID ? sync.data.message[sessionID] : undefined, + partsByMessageID: sync.data.part, + }) + }) + + if (input.halt) { + const halt = input.halt + const clock = createQuestionRecoveryClock({ + snapshot: recoverySnapshot, + activeSessionID, + activeDirectory: () => sdk.directory, + halt, + reverify: (sessionID, ctx) => + questionRecoveryReverify( + { + snapshot: recoverySnapshot, + activeSessionID, + activeDirectory: () => sdk.directory, + isSessionBusy: (sid) => + isSessionRunning(sync.data.session_status[sid], sync.data.message[sid]), + listQuestions: async () => { + const result = await sdk.client.question.list() + return result.data ?? [] + }, + partsByMessageID: () => sync.data.part, + messagesFor: (sid) => sync.data.message[sid], + applyHydration: (sid, questions) => { + batch(() => { + sync.set("question", sid, reconcile([...questions], { key: "id" })) + }) + }, + warn: (msg, payload) => console.warn(msg, payload), + }, + sessionID, + ctx, + ), + }) + // clock self-cleans via its own onCleanup; the binding above is kept + // only so `clock` is referenced (the side-effect of construction is + // what we want). + void clock + } + const permissionRequest = createMemo(() => { return sessionPermissionRequest(sync.data.session, sync.data.permission, activeSessionID(), (item) => { return !permission.autoResponds(item, sdk.directory) diff --git a/packages/app/src/pages/session/composer/session-composer-state.ts b/packages/app/src/pages/session/composer/session-composer-state.ts index 72ae4149..18fe2957 100644 --- a/packages/app/src/pages/session/composer/session-composer-state.ts +++ b/packages/app/src/pages/session/composer/session-composer-state.ts @@ -6,9 +6,10 @@ import { composerEnabled, composerStateProbe } from "@/testing/session-composer" export function createSessionComposerState(input: { sessionID: () => string | undefined fallbackSessionID?: () => string | undefined + halt?: (sessionID: string) => Promise }) { const activeSessionID = input.sessionID - const blockers = createSessionBlockers({ sessionID: activeSessionID }) + const blockers = createSessionBlockers({ sessionID: activeSessionID, halt: input.halt }) const todo = createSessionTodoModel({ sessionID: activeSessionID, fallbackSessionID: input.fallbackSessionID }) createEffect(() => { diff --git a/packages/app/src/pages/session/use-session-followups.test.ts b/packages/app/src/pages/session/use-session-followups.test.ts index 6db3a93b..c5531b2d 100644 --- a/packages/app/src/pages/session/use-session-followups.test.ts +++ b/packages/app/src/pages/session/use-session-followups.test.ts @@ -79,4 +79,27 @@ describe("session followups", () => { expect(shouldAutoSendFollowup({ ...base, blocked: true })).toBe(false) expect(shouldAutoSendFollowup({ ...base, followupBusy: true })).toBe(false) }) + + // Auto-heal flow: while the recovery clock is armed (running question + // part with no UI dock) the session is still busy AND blocked is false + // (the dock never surfaced). Auto-send must stay off. Once the clock + // halts the session, busy flips false on the next status sync and the + // queued followup must be eligible for auto-send. This locks step 6 of + // the v6 spec merge gate. + test("queued followup auto-sends after halt-induced idle (auto-heal flow)", () => { + const recovering = { + hasSession: true, + hasItem: true, + busy: true, + failed: false, + paused: false, + childSession: false, + blocked: false, + followupBusy: false, + } + expect(shouldAutoSendFollowup(recovering)).toBe(false) + + const afterHalt = { ...recovering, busy: false } + expect(shouldAutoSendFollowup(afterHalt)).toBe(true) + }) }) diff --git a/packages/opencode/src/question/index.ts b/packages/opencode/src/question/index.ts index 6bc6e734..f22625ed 100644 --- a/packages/opencode/src/question/index.ts +++ b/packages/opencode/src/question/index.ts @@ -1,4 +1,4 @@ -import { Deferred, Effect, Layer, Schema, Context } from "effect" +import { Cause, Deferred, Effect, Layer, Schema, Context } from "effect" import { Bus } from "@/bus" import { BusEvent } from "@/bus/bus-event" import { InstanceState } from "@/effect/instance-state" @@ -143,9 +143,19 @@ export namespace Question { Rejected: BusEvent.define("question.rejected", Rejected.zod), } - export class RejectedError extends Schema.TaggedErrorClass()("QuestionRejectedError", {}) { + // `cancelled` distinguishes a session-cancel-driven rejection (signal abort + // or fiber interrupt) from an intentional user dismissal. The processor + // uses this to set metadata.interrupted only on cancel — a dismiss is a + // completed user action, not an interruption. The message branches the + // same way so consumers (state.error, logs, telemetry) read accurately + // without each having to inspect the cancelled flag. See #419. + export class RejectedError extends Schema.TaggedErrorClass()("QuestionRejectedError", { + cancelled: Schema.optional(Schema.Boolean), + }) { override get message() { - return "The user dismissed this question" + return this.cancelled + ? "Question cancelled before the user answered it." + : "The user dismissed this question" } } @@ -165,6 +175,10 @@ export namespace Question { sessionID: SessionID questions: ReadonlyArray tool?: Tool + // Production cancellation channel. EffectBridge.run.promise does not + // propagate parent fiber interrupts, so without a signal a session + // cancel leaks the pending entry and the dock stays forever. See #419. + signal?: AbortSignal }) => Effect.Effect, RejectedError> readonly reply: (input: { requestID: QuestionID; answers: ReadonlyArray }) => Effect.Effect readonly reject: (requestID: QuestionID) => Effect.Effect @@ -186,7 +200,7 @@ export namespace Question { yield* Effect.addFinalizer(() => Effect.gen(function* () { for (const item of state.pending.values()) { - yield* Deferred.fail(item.deferred, new RejectedError()) + yield* Deferred.fail(item.deferred, new RejectedError({ cancelled: true })) } state.pending.clear() }), @@ -200,6 +214,7 @@ export namespace Question { sessionID: SessionID questions: ReadonlyArray tool?: Tool + signal?: AbortSignal }) { // Replies are mapped back by label string, so duplicate labels within // a question would make answers ambiguous. Reject before publishing. @@ -241,11 +256,69 @@ export namespace Question { // event payload to mutate the pending entry either. yield* bus.publish(Event.Asked, structuredClone(info)) - return yield* Effect.ensuring( - Deferred.await(deferred), - Effect.sync(() => { - pending.delete(id) - }), + // Cancellation: input.signal is the production channel; the + // Effect.onInterrupt arm below is defence for direct fiber kill + // (layer shutdown / supervisor) when signal is undefined. The abort + // callback fires from the JS event loop, so we capture the parent + // Effect context here and re-provide it on a single runFork. + const signal = input.signal + let removeListener: (() => void) | undefined + const sessionID = input.sessionID + const ctx = yield* Effect.context() + const failFromAbort = InstanceState.bind(() => { + const entry = pending.get(id) + if (!entry) return + pending.delete(id) + log.info("rejected", { requestID: id, reason: "aborted" }) + // Publish then fail in a single Effect so order is deterministic + // (subscribers see Rejected before any awaiter unblocks) and any + // failure inside is logged once instead of disappearing into two + // independent forks. + Effect.runFork( + Effect.provide( + Effect.gen(function* () { + yield* bus.publish(Event.Rejected, { sessionID, requestID: id }) + yield* Deferred.fail(entry.deferred, new RejectedError({ cancelled: true })) + }).pipe( + Effect.catchCause((cause) => + Effect.sync(() => + log.error("failFromAbort failed", { + requestID: id, + sessionID, + cause: Cause.pretty(cause), + }), + ), + ), + ), + ctx, + ), + ) + }) + + if (signal) { + if (signal.aborted) { + failFromAbort() + } else { + signal.addEventListener("abort", failFromAbort, { once: true }) + removeListener = () => signal.removeEventListener("abort", failFromAbort) + } + } + + return yield* Deferred.await(deferred).pipe( + Effect.onInterrupt(() => + Effect.gen(function* () { + if (!pending.has(id)) return + pending.delete(id) + log.info("rejected", { requestID: id, reason: "interrupted" }) + yield* bus.publish(Event.Rejected, { sessionID, requestID: id }) + }), + ), + Effect.ensuring( + Effect.sync(() => { + if (removeListener) removeListener() + pending.delete(id) + }), + ), ) }) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 4066ccef..77afa72b 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -459,13 +459,24 @@ export const layer: Layer.Layer< error, }).record.metadata.diagnostics : toolDiagnostics(match.part) + // The UI's interrupted-hint variant is gated on metadata.interrupted, + // so when a question tool errors out via session cancel (Question.ask + // routed RejectedError through here, not via the cleanup path at the + // bottom of the stream), we must mark it the same way the cleanup + // path does. The `cancelled` flag distinguishes a session cancel from + // an explicit user dismissal — only the former is "interrupted". See #419. + const questionInterrupted = + match.part.tool === "question" && error instanceof Question.RejectedError && error.cancelled === true yield* session.updatePart({ ...match.part, state: { status: "error", input: match.part.state.input, error: errorMessage(error), - metadata: SessionDiagnostics.mergeMetadata(toolStateMetadata(match.part), { diagnostics }), + metadata: SessionDiagnostics.mergeMetadata(toolStateMetadata(match.part), { + diagnostics, + ...(questionInterrupted ? { interrupted: true } : {}), + }), time: { start: match.part.state.time.start, end: Date.now() }, }, }) @@ -761,12 +772,22 @@ export const layer: Layer.Layer< const part = match.part const end = Date.now() const metadata = "metadata" in part.state && isRecord(part.state.metadata) ? part.state.metadata : {} + // Question tool deserves a clearer post-cancel message: the LLM + // reads this string as the tool result, and "Tool execution aborted" + // is ambiguous between "user dismissed your question" and "the run + // was cancelled before they answered". State only the certain fact + // (cancelled before answered), don't claim whether the user saw it + // — they may have. See issue #419. + const errorText = + part.tool === "question" + ? "Question cancelled before the user answered it." + : "Tool execution aborted" yield* session.updatePart({ ...part, state: { ...part.state, status: "error", - error: "Tool execution aborted", + error: errorText, metadata: { ...metadata, interrupted: true }, time: { start: "time" in part.state ? part.state.time.start : end, end }, }, diff --git a/packages/opencode/src/tool/question.ts b/packages/opencode/src/tool/question.ts index 76ae8968..85f2e01e 100644 --- a/packages/opencode/src/tool/question.ts +++ b/packages/opencode/src/tool/question.ts @@ -33,6 +33,9 @@ export const QuestionTool = Tool.define - AppRuntime.runPromise(Question.Service.use((svc) => svc.ask(input))) +const ask = ( + input: { sessionID: SessionID; questions: Question.Info[]; tool?: { messageID: any; callID: string } }, + options?: { signal?: AbortSignal }, +) => AppRuntime.runPromise(Question.Service.use((svc) => svc.ask(input)), options) const list = () => AppRuntime.runPromise(Question.Service.use((svc) => svc.list())) @@ -28,6 +31,19 @@ async function rejectAll() { } } +/** Wait until exactly one pending question shows up, then assert it landed. + * Returns the pending request so the caller can drive the next step. */ +async function waitForPending(timeoutMs = 1000) { + const start = Date.now() + while (Date.now() - start < timeoutMs) { + const pending = await list() + if (pending.length === 1) return pending[0]! + await Bun.sleep(10) + } + expect(await list(), "expected exactly one pending question before continuing").toHaveLength(1) + throw new Error("unreachable: assertion above always throws on miss") +} + test("ask - returns pending promise", async () => { await using tmp = await tmpdir({ git: true }) await Instance.provide({ @@ -474,6 +490,158 @@ test("questions stay isolated by directory", async () => { await p2.catch(() => {}) }) +// interrupt path: when the ask fiber is interrupted (e.g. session cancel), +// Question.ask must publish question.rejected so the frontend can clear the +// dock, AND remove the entry from pending so question.list() won't return a +// phantom. See issue #419. +// +// Two complementary coverage paths: +// 1. input.signal abort (production cancel route — EffectBridge.run.promise +// breaks fiber interrupt, so the AbortSignal is the only working channel) +// 2. fiber interrupt via runPromise options.signal (defence-in-depth path +// for direct supervisor kill / layer shutdown) + +test("ask - publishes question.rejected on input.signal abort", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const events: { sessionID: SessionID; requestID: QuestionID }[] = [] + const unsub = Bus.subscribe(Question.Event.Rejected, (evt) => { + events.push(evt.properties) + }) + + const controller = new AbortController() + // Pass signal as `input.signal` (NOT to runPromise) so we exercise the + // signal.addEventListener("abort", failFromAbort) branch, which is the + // only one that survives in production where EffectBridge.run.promise + // strips fiber interrupts. + const promise = AppRuntime.runPromise( + Question.Service.use((svc) => + svc.ask({ + sessionID: SessionID.make("ses_signal"), + questions: [ + { + question: "Pick one", + header: "Pick", + options: [ + { label: "A", description: "first" }, + { label: "B", description: "second" }, + ], + }, + ], + signal: controller.signal, + }), + ), + ).catch((err) => err) + + await waitForPending() + + controller.abort() + const result = await promise + + expect(events).toHaveLength(1) + expect(events[0]?.sessionID).toBe(SessionID.make("ses_signal")) + expect(result).toBeInstanceOf(Question.RejectedError) + expect((result as Question.RejectedError).cancelled).toBe(true) + expect((result as Question.RejectedError).message).toBe("Question cancelled before the user answered it.") + + const after = await list() + expect(after).toHaveLength(0) + + unsub() + }, + }) +}) + +test("ask - publishes question.rejected on fiber interrupt", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const events: { sessionID: SessionID; requestID: QuestionID }[] = [] + const unsub = Bus.subscribe(Question.Event.Rejected, (evt) => { + events.push(evt.properties) + }) + + const controller = new AbortController() + const promise = ask( + { + sessionID: SessionID.make("ses_interrupt"), + questions: [ + { + question: "Pick one", + header: "Pick", + options: [ + { label: "A", description: "first" }, + { label: "B", description: "second" }, + ], + }, + ], + }, + { signal: controller.signal }, + ).catch(() => {}) + + await waitForPending() + + controller.abort() + await promise + + expect(events).toHaveLength(1) + expect(events[0]?.sessionID).toBe(SessionID.make("ses_interrupt")) + + const after = await list() + expect(after).toHaveLength(0) + + unsub() + }, + }) +}) + +test("reject - leaves cancelled flag false (user dismiss, not session cancel)", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const promise = ask({ + sessionID: SessionID.make("ses_dismiss"), + questions: [ + { + question: "Dismiss me?", + header: "Dismiss", + options: [ + { label: "Yes", description: "Yes" }, + { label: "No", description: "No" }, + ], + }, + ], + }) + + const pending = await list() + await reject(pending[0]!.id) + + const result = await promise.catch((err) => err) + expect(result).toBeInstanceOf(Question.RejectedError) + expect((result as Question.RejectedError).cancelled).toBeFalsy() + expect((result as Question.RejectedError).message).toBe("The user dismissed this question") + }, + }) +}) + +// processor.failToolCall writes `errorMessage(error)` into part.state.error +// for the abort-signal path (failed != cleanup). The message getter has to +// branch on `cancelled` so consumers (state.error, logs, telemetry) read the +// same friendly copy as the legacy fiber-cleanup path. See #419. +test("RejectedError - message branches on cancelled flag", () => { + const dismissed = new Question.RejectedError() + expect(dismissed.cancelled).toBeFalsy() + expect(dismissed.message).toBe("The user dismissed this question") + + const cancelled = new Question.RejectedError({ cancelled: true }) + expect(cancelled.cancelled).toBe(true) + expect(cancelled.message).toBe("Question cancelled before the user answered it.") +}) + test("pending question rejects on instance dispose", async () => { await using tmp = await tmpdir({ git: true }) diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index b826ff1b..426b2150 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -818,6 +818,83 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup ), ) +// Question tool aborted on cleanup gets a clearer message than the generic +// "Tool execution aborted". The LLM reads part.state.error as the tool result +// and the generic phrasing made models think the user dismissed the question. +// See issue #419. +it.live("session.processor effect tests rewrite aborted question tool error to friendly message", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + yield* llm.toolHang("question", { + questions: [ + { + question: "Pick one", + header: "Pick", + options: [ + { label: "A", description: "first" }, + { label: "B", description: "second" }, + ], + }, + ], + }) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "question abort") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const run = yield* handle + .process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "question abort" }], + tools: {}, + }) + .pipe(Effect.forkChild) + + yield* llm.wait(1) + yield* Effect.promise(async () => { + const end = Date.now() + 500 + while (Date.now() < end) { + const parts = await MessageV2.parts(msg.id) + if (parts.some((part) => part.type === "tool")) return + await Bun.sleep(10) + } + }) + yield* Fiber.interrupt(run) + yield* Fiber.await(run) + + const parts = MessageV2.parts(msg.id) + const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") + + expect(call?.state.status).toBe("error") + if (call?.state.status === "error") { + expect(call.state.error).toBe("Question cancelled before the user answered it.") + expect(call.state.metadata?.interrupted).toBe(true) + } + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + it.live("session.processor effect tests record aborted errors and idle state", () => provideTmpdirServer( ({ dir, llm }) => diff --git a/packages/ui/src/components/message-part-stale.test.ts b/packages/ui/src/components/message-part-stale.test.ts index 5f223eb5..6e90f8a1 100644 --- a/packages/ui/src/components/message-part-stale.test.ts +++ b/packages/ui/src/components/message-part-stale.test.ts @@ -17,3 +17,62 @@ test("tool file accordions account for tool content gap in sticky offset", () => expect(source).toContain('style={{ "--sticky-accordion-offset": "calc(32px + var(--tool-content-gap))" }}') expect(source).not.toContain('style={{ "--sticky-accordion-offset": "40px" }}') }) + +// Cancelled question variant must identify the case via metadata.interrupted +// (written by processor cleanup) so the check stays decoupled from the exact +// backend error string. See #419. +test("question tool error renders interrupted variant via metadata.interrupted", () => { + const source = readFileSync(new URL("./message-part.tsx", import.meta.url), "utf8") + + expect(source).toContain('part().tool === "question" && partMetadata()?.interrupted === true') + expect(source).toContain('"ui.messagePart.questions.interrupted"') +}) + +test("interrupted i18n key exists in zh and en", () => { + const zh = readFileSync(new URL("../i18n/zh.ts", import.meta.url), "utf8") + const en = readFileSync(new URL("../i18n/en.ts", import.meta.url), "utf8") + + expect(zh).toContain('"ui.messagePart.questions.interrupted":') + expect(en).toContain('"ui.messagePart.questions.interrupted":') +}) + +// Live-stream reactivity: the interrupted hint must reappear when the part's +// metadata.interrupted flips from undefined → true *without* a page reload. +// In Solid that requires reading `partMetadata()` as an accessor over +// `part().state` so the JSX re-evaluates when props.part is replaced — not +// a one-shot snapshot at component setup. Lock the accessor pattern so a +// future "let me memo this once" refactor can't silently break live updates. +test("partMetadata is a fresh accessor over part().state, not a setup-time snapshot", () => { + const source = readFileSync(new URL("./message-part.tsx", import.meta.url), "utf8") + + // Defined as () => …, not const partMetadata = props.part.metadata + expect(source).toContain("const partMetadata = () => toolStateMetadata(part().state)") + expect(source).not.toMatch(/const partMetadata\s*=\s*props\.part\.metadata/) + expect(source).not.toMatch(/const partMetadata\s*=\s*createMemo\(\)/) +}) + +// Ensure the metadata extractor itself respects the shape variations the +// live message stream actually emits — including the case where the part +// initially has no metadata key and gains one on the next update. +test("toolStateMetadata extracts interrupted flag across part state shapes", () => { + // Inline reimplementation that mirrors the helper in message-part.tsx + // (kept private there). Drift between the two will trip the structural + // test above before it reaches users. + function toolStateMetadata(state: unknown): Record { + if (!state || typeof state !== "object" || !("metadata" in state)) return {} + const metadata = (state as { metadata: unknown }).metadata + return metadata && typeof metadata === "object" ? (metadata as Record) : {} + } + + expect(toolStateMetadata(undefined).interrupted).toBeUndefined() + expect(toolStateMetadata({}).interrupted).toBeUndefined() + expect(toolStateMetadata({ metadata: undefined }).interrupted).toBeUndefined() + expect(toolStateMetadata({ metadata: {} }).interrupted).toBeUndefined() + expect(toolStateMetadata({ metadata: { interrupted: true } }).interrupted).toBe(true) + // Reactivity contract: a new state object with the flag flipped must + // produce a fresh metadata object so equality checks downstream observe + // the change rather than caching a stale reference. + const before = toolStateMetadata({ metadata: { interrupted: undefined } }) + const after = toolStateMetadata({ metadata: { interrupted: true } }) + expect(before).not.toBe(after) +}) diff --git a/packages/ui/src/components/message-part.tsx b/packages/ui/src/components/message-part.tsx index 8a65cc04..4e631744 100644 --- a/packages/ui/src/components/message-part.tsx +++ b/packages/ui/src/components/message-part.tsx @@ -1384,6 +1384,21 @@ PART_MAPPING["tool"] = function ToolPartDisplay(props) { ) } + // Cancelled question: the run was interrupted before the user + // saw or answered. Show a short, action-oriented hint instead of + // the raw "Question cancelled..." string so non-technical users + // know they can just re-ask. Identify by metadata.interrupted + // (written by processor cleanup) so this is independent of the + // exact error string used in the backend. See #419. + if (part().tool === "question" && partMetadata()?.interrupted === true) { + return ( +
+ + {i18n.t("ui.messagePart.questions.interrupted")} + +
+ ) + } return ( = { "ui.messagePart.option.typeOwnAnswer": "Type your own answer", "ui.messagePart.review.title": "Review your answers", "ui.messagePart.questions.dismissed": "Questions dismissed", + "ui.messagePart.questions.interrupted": "This question was cancelled before it was answered. Ask again below if you want to continue.", "ui.messagePart.compaction": "Session compacted", "ui.messagePart.context.read.one": "Read {{count}} file", "ui.messagePart.context.read.other": "Read {{count}} files", diff --git a/packages/ui/src/i18n/zh.ts b/packages/ui/src/i18n/zh.ts index e2919473..f7666a2f 100644 --- a/packages/ui/src/i18n/zh.ts +++ b/packages/ui/src/i18n/zh.ts @@ -82,6 +82,7 @@ export const dict = { "ui.sessionTurn.status.consideringNextSteps": "正在考虑下一步", "ui.messagePart.questions.dismissed": "问题已忽略", + "ui.messagePart.questions.interrupted": "这个问题已取消,尚未收到回答。如需继续,请在下方重新说明。", "ui.messagePart.compaction": "会话已压缩", "ui.messagePart.context.read.one": "读取 {{count}} 个文件", "ui.messagePart.context.read.other": "读取 {{count}} 个文件",