diff --git a/src/controller.test.ts b/src/controller.test.ts index af9973e..e5e1c4f 100644 --- a/src/controller.test.ts +++ b/src/controller.test.ts @@ -4930,6 +4930,344 @@ describe("Discord controller flows", () => { } }); + it("keeps plan questionnaire callbacks active until the pending input is settled", async () => { + const { controller } = await createControllerHarness(); + let awaitingInput = true; + let clearPendingInput: (() => Promise) | null = null; + (controller as any).client.startTurn = vi.fn((params: any) => { + clearPendingInput = async () => { + awaitingInput = false; + await params.onPendingInput?.(null); + }; + void Promise.resolve().then(() => + params.onPendingInput?.({ + requestId: "req-plan-keepalive", + options: [], + expiresAt: Date.now() + 7 * 24 * 60 * 60_000, + method: "item/tool/requestUserInput", + questionnaire: { + currentIndex: 0, + questions: [ + { + index: 0, + id: "style", + header: "Fix Style", + prompt: "How should this behave?", + options: [ + { key: "A", label: "Fallback", description: "Use a fallback." }, + { key: "B", label: "Fail", description: "Keep failing." }, + ], + guidance: [], + }, + ], + answers: [null], + responseMode: "structured", + }, + }), + ); + return { + result: Promise.resolve({ + threadId: "thread-1", + }), + getThreadId: () => "thread-1", + queueMessage: vi.fn(async () => false), + interrupt: vi.fn(async () => {}), + isAwaitingInput: () => awaitingInput, + submitPendingInput: vi.fn(async () => false), + submitPendingInputPayload: vi.fn(async () => { + await clearPendingInput?.(); + return true; + }), + }; + }); + + await (controller as any).startPlan({ + conversation: { + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + }, + binding: null, + workspaceDir: "/repo/openclaw", + prompt: "Plan this.", + announceStart: false, + }); + + await flushAsyncWork(); + await flushAsyncWork(); + + const key = "telegram::default::telegram-user-1::"; + expect((controller as any).activeRuns.get(key)?.mode).toBe("plan"); + expect((controller as any).store.getPendingRequestById("req-plan-keepalive")).not.toBeNull(); + + const callback = ((controller as any).store as any).snapshot.callbacks.find( + (entry: any) => + entry.kind === "pending-questionnaire" && entry.requestId === "req-plan-keepalive", + ); + expect(callback).toBeTruthy(); + + const reply = vi.fn(async () => {}); + await controller.handleTelegramInteractive({ + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + callback: { payload: callback.token }, + respond: { + clearButtons: vi.fn(async () => {}), + reply, + editMessage: vi.fn(async () => {}), + }, + } as any); + + expect(reply).toHaveBeenCalledWith( + expect.objectContaining({ text: "Recorded your answers and sent them to Codex." }), + ); + expect((controller as any).activeRuns.get(key)).toBeUndefined(); + expect((controller as any).store.getPendingRequestById("req-plan-keepalive")).toBeNull(); + }); + + it("keeps standard pending-input callbacks active until the input is settled", async () => { + const { controller } = await createControllerHarness(); + let awaitingInput = true; + let clearPendingInput: (() => Promise) | null = null; + (controller as any).client.startTurn = vi.fn((params: any) => { + clearPendingInput = async () => { + awaitingInput = false; + await params.onPendingInput?.(null); + }; + void Promise.resolve().then(() => + params.onPendingInput?.({ + requestId: "req-turn-keepalive", + options: ["Approve Once"], + actions: [ + { + kind: "option", + label: "Approve Once", + value: "approve-once", + }, + ], + expiresAt: Date.now() + 7 * 24 * 60 * 60_000, + promptText: "Codex needs input.", + }), + ); + return { + result: Promise.resolve({ + threadId: "thread-1", + }), + getThreadId: () => "thread-1", + queueMessage: vi.fn(async () => false), + interrupt: vi.fn(async () => {}), + isAwaitingInput: () => awaitingInput, + submitPendingInput: vi.fn(async () => { + await clearPendingInput?.(); + return true; + }), + submitPendingInputPayload: vi.fn(async () => false), + }; + }); + + await (controller as any).startTurn({ + conversation: { + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + }, + binding: null, + workspaceDir: "/repo/openclaw", + prompt: "Continue.", + reason: "command", + }); + + await flushAsyncWork(); + await flushAsyncWork(); + + const key = "telegram::default::telegram-user-1::"; + expect((controller as any).activeRuns.get(key)?.mode).toBe("default"); + expect((controller as any).store.getPendingRequestById("req-turn-keepalive")).not.toBeNull(); + + const callback = ((controller as any).store as any).snapshot.callbacks.find( + (entry: any) => entry.kind === "pending-input" && entry.requestId === "req-turn-keepalive", + ); + expect(callback).toBeTruthy(); + + const reply = vi.fn(async () => {}); + await controller.handleTelegramInteractive({ + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + callback: { payload: callback.token }, + respond: { + clearButtons: vi.fn(async () => {}), + reply, + editMessage: vi.fn(async () => {}), + }, + } as any); + + expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: "Sent to Codex." })); + expect((controller as any).activeRuns.get(key)).toBeUndefined(); + expect((controller as any).store.getPendingRequestById("req-turn-keepalive")).toBeNull(); + }); + + it("restores the active plan run when questionnaire input arrives after result cleanup", async () => { + const { controller } = await createControllerHarness(); + const conversation = { + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + } as const; + const activeKey = `telegram::default::${TEST_TELEGRAM_PEER_ID}::`; + let onPendingInput: ((state: any) => Promise) | undefined; + const submitPendingInputPayload = vi.fn(async () => true); + (controller as any).client.startTurn = vi.fn((params: any) => { + onPendingInput = params.onPendingInput; + return { + result: Promise.resolve({ threadId: "thread-1" }), + getThreadId: () => "thread-1", + queueMessage: vi.fn(async () => false), + interrupt: vi.fn(async () => {}), + isAwaitingInput: () => false, + submitPendingInput: vi.fn(async () => false), + submitPendingInputPayload, + }; + }); + + await (controller as any).startPlan({ + conversation, + binding: null, + workspaceDir: "/repo/openclaw", + prompt: "Ask the breakfast question.", + }); + + await Promise.resolve(); + await Promise.resolve(); + (controller as any).activeRuns.delete(activeKey); + expect((controller as any).activeRuns.get(activeKey)).toBeUndefined(); + + await onPendingInput?.({ + requestId: "req-plan-race", + options: [], + expiresAt: Date.now() + 7 * 24 * 60 * 60_000, + method: "item/tool/requestUserInput", + questionnaire: { + currentIndex: 0, + questions: [ + { + index: 0, + id: "breakfast", + header: "Breakfast", + prompt: "Do you like cereal?", + options: [ + { key: "A", label: "Yes", description: "Choose yes." }, + ], + guidance: [], + }, + ], + answers: [null], + responseMode: "structured", + }, + }); + + expect((controller as any).activeRuns.get(activeKey)?.mode).toBe("plan"); + const callback = (controller as any).store.snapshot.callbacks.find((entry: any) => + entry.kind === "pending-questionnaire" && + entry.requestId === "req-plan-race" && + entry.action === "select" + ); + expect(callback).toBeTruthy(); + const reply = vi.fn(async () => {}); + + await controller.handleTelegramInteractive({ + ...conversation, + callback: { + payload: callback.token, + }, + respond: { + clearButtons: vi.fn(async () => {}), + reply, + editMessage: vi.fn(async () => {}), + }, + } as any); + + expect(submitPendingInputPayload).toHaveBeenCalledWith({ + answers: { + breakfast: { answers: ["Yes"] }, + }, + }); + expect(reply).toHaveBeenCalledWith({ + text: "Recorded your answers and sent them to Codex.", + }); + expect((controller as any).activeRuns.get(activeKey)).toBeUndefined(); + expect((controller as any).store.getPendingRequestById("req-plan-race")).toBeNull(); + }); + + it("ignores late questionnaire input from an older run when a newer run is already active", async () => { + const { controller } = await createControllerHarness(); + const conversation = { + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + } as const; + const activeKey = `telegram::default::${TEST_TELEGRAM_PEER_ID}::`; + const newerRun = { + getThreadId: () => "thread-new", + queueMessage: vi.fn(async () => false), + interrupt: vi.fn(async () => {}), + isAwaitingInput: () => false, + submitPendingInput: vi.fn(async () => false), + submitPendingInputPayload: vi.fn(async () => false), + }; + const olderRun = { + getThreadId: () => "thread-old", + queueMessage: vi.fn(async () => false), + interrupt: vi.fn(async () => {}), + isAwaitingInput: () => false, + submitPendingInput: vi.fn(async () => false), + submitPendingInputPayload: vi.fn(async () => false), + }; + + (controller as any).activeRuns.set(activeKey, { + conversation, + workspaceDir: "/repo/new", + mode: "plan", + profile: "default", + handle: newerRun, + }); + + await (controller as any).handlePendingInputState( + conversation, + "/repo/old", + "plan", + "default", + { + requestId: "req-old-run", + options: [], + expiresAt: Date.now() + 7 * 24 * 60 * 60_000, + method: "item/tool/requestUserInput", + questionnaire: { + currentIndex: 0, + questions: [ + { + index: 0, + id: "breakfast", + header: "Breakfast", + prompt: "Do you like cereal?", + options: [ + { key: "A", label: "Yes", description: "Choose yes." }, + ], + guidance: [], + }, + ], + answers: [null], + responseMode: "structured", + }, + }, + olderRun as any, + ); + + expect((controller as any).activeRuns.get(activeKey)?.handle).toBe(newerRun); + expect((controller as any).store.getPendingRequestById("req-old-run")).toBeNull(); + }); + it("tells the user to log back in when Codex reports OpenAI auth is required", async () => { const { controller, clientMock, sendMessageTelegram } = await createControllerHarness(); clientMock.readAccount.mockResolvedValue({ diff --git a/src/controller.ts b/src/controller.ts index 4ba6fb1..ca70f83 100644 --- a/src/controller.ts +++ b/src/controller.ts @@ -166,6 +166,7 @@ type ActiveRunRecord = { mode: "default" | "plan" | "review"; profile: PermissionsMode; handle: ActiveCodexRun; + cleanupWhenInputSettles?: boolean; }; const execFileAsync = promisify(execFile); @@ -1362,6 +1363,7 @@ export class CodexPluginController { private readonly settings; private readonly client; private readonly activeRuns = new Map(); + private readonly settledRuns = new WeakSet(); private readonly threadChangesCache = new Map>(); private readonly store; private serviceWorkspaceDir?: string; @@ -2331,6 +2333,54 @@ export class CodexPluginController { return !this.activeRuns.has(key); } + private ensureActiveRunRegistration(params: { + conversation: ConversationTarget; + workspaceDir: string; + mode: ActiveRunRecord["mode"]; + profile: PermissionsMode; + run: ActiveCodexRun; + reason: string; + }): boolean { + const key = buildConversationKey(params.conversation); + const active = this.activeRuns.get(key); + if (active?.handle === params.run) { + return true; + } + if (active && active.handle !== params.run) { + this.api.logger.warn( + `codex ignoring ${params.reason} for stale run ${this.formatConversationForLog(params.conversation)} activeMode=${active.mode} incomingMode=${params.mode}`, + ); + return false; + } + this.activeRuns.set(key, { + conversation: params.conversation, + workspaceDir: params.workspaceDir, + mode: params.mode, + profile: params.profile, + handle: params.run, + cleanupWhenInputSettles: this.settledRuns.has(params.run), + }); + this.api.logger.warn( + `codex restored active run from ${params.reason} ${this.formatConversationForLog(params.conversation)} mode=${params.mode}`, + ); + return true; + } + + private async maybeCleanupSettledInteractiveRun( + conversation: ConversationTarget, + run: ActiveCodexRun, + reason: string, + ): Promise { + const active = this.activeRuns.get(buildConversationKey(conversation)); + if (!active || active.handle !== run || !active.cleanupWhenInputSettles) { + return; + } + if (this.store.getPendingRequestByConversation(conversation)) { + return; + } + await this.finalizeActiveRun(conversation, run, reason); + } + private async readEffectiveThreadState(binding: StoredBinding): Promise<{ state: ThreadState | undefined; effectiveState: ThreadState | undefined; @@ -3555,7 +3605,14 @@ export class CodexPluginController { this.api.logger.debug?.( `codex turn pending input ${state ? "received" : "cleared"} ${this.formatConversationForLog(params.conversation)} questionnaire=${state?.questionnaire ? "yes" : "no"}`, ); - await this.handlePendingInputState(params.conversation, params.workspaceDir, state, run); + await this.handlePendingInputState( + params.conversation, + params.workspaceDir, + params.collaborationMode?.mode === "plan" ? "plan" : "default", + profile, + state, + run, + ); }, onFileEdits: async (text) => { await this.sendText(params.conversation, text); @@ -3644,15 +3701,8 @@ export class CodexPluginController { }) .finally(async () => { typing?.stop(); - this.activeRuns.delete(key); - const pending = this.store.getPendingRequestByConversation(params.conversation); - if (pending) { - await this.store.removePendingRequest(pending.requestId); - } - await this.applyPendingBindingPermissionsModeMigration(params.conversation); - this.api.logger.debug?.( - `codex turn cleaned up ${this.formatConversationForLog(params.conversation)}`, - ); + this.settledRuns.add(run); + await this.finalizeActiveRun(params.conversation, run, "turn settled"); }); } @@ -3816,7 +3866,14 @@ export class CodexPluginController { this.api.logger.debug( `codex plan pending input ${state ? `received (questionnaire=${state.questionnaire ? "yes" : "no"})` : "cleared"}`, ); - await this.handlePendingInputState(params.conversation, params.workspaceDir, state, run); + await this.handlePendingInputState( + params.conversation, + params.workspaceDir, + "plan", + profile, + state, + run, + ); }, onInterrupted: async () => { await this.sendText(params.conversation, formatInterruptedText("plan")); @@ -3917,12 +3974,8 @@ export class CodexPluginController { .finally(async () => { stopProgressTimer(); typing?.stop(); - this.activeRuns.delete(key); - const pending = this.store.getPendingRequestByConversation(params.conversation); - if (pending) { - await this.store.removePendingRequest(pending.requestId); - } - await this.applyPendingBindingPermissionsModeMigration(params.conversation); + this.settledRuns.add(run); + await this.finalizeActiveRun(params.conversation, run, "plan settled"); }); } @@ -3993,7 +4046,14 @@ export class CodexPluginController { if (state) { stopProgressTimer(); } - await this.handlePendingInputState(params.conversation, params.workspaceDir, state, run); + await this.handlePendingInputState( + params.conversation, + params.workspaceDir, + "review", + profile, + state, + run, + ); }, onInterrupted: async () => { await this.sendText(params.conversation, "Codex review stopped."); @@ -4080,26 +4140,66 @@ export class CodexPluginController { .finally(async () => { stopProgressTimer(); typing?.stop(); - this.activeRuns.delete(key); - const pending = this.store.getPendingRequestByConversation(params.conversation); - if (pending) { - await this.store.removePendingRequest(pending.requestId); - } - await this.applyPendingBindingPermissionsModeMigration(params.conversation); + this.settledRuns.add(run); + await this.finalizeActiveRun(params.conversation, run, "review settled"); }); } + private async finalizeActiveRun( + conversation: ConversationTarget, + run: ActiveCodexRun, + reason: string, + ): Promise { + const key = buildConversationKey(conversation); + const active = this.activeRuns.get(key); + if (!active || active.handle !== run) { + this.api.logger.debug?.( + `codex skipped active run cleanup ${this.formatConversationForLog(conversation)} reason=${reason} active=${active ? "other-run" : "none"}`, + ); + return; + } + const pending = this.store.getPendingRequestByConversation(conversation); + const awaitingInput = typeof run.isAwaitingInput === "function" && run.isAwaitingInput(); + if (awaitingInput || pending) { + active.cleanupWhenInputSettles = true; + this.api.logger.debug?.( + `codex deferred active run cleanup ${this.formatConversationForLog(conversation)} reason=${reason} awaitingInput=${awaitingInput ? "yes" : "no"} pendingRequest=${pending?.requestId ?? "none"}`, + ); + return; + } + this.activeRuns.delete(key); + await this.applyPendingBindingPermissionsModeMigration(conversation); + this.api.logger.debug?.( + `codex cleaned up active run ${this.formatConversationForLog(conversation)} reason=${reason}`, + ); + } + private async handlePendingInputState( conversation: ConversationTarget, workspaceDir: string, + mode: ActiveRunRecord["mode"], + profile: PermissionsMode, state: PendingInputState | null, run: ActiveCodexRun, ): Promise { + if (state) { + if (!this.ensureActiveRunRegistration({ + conversation, + workspaceDir, + mode, + profile, + run, + reason: "pending input", + })) { + return; + } + } if (!state) { const existing = this.store.getPendingRequestByConversation(conversation); if (existing) { await this.store.removePendingRequest(existing.requestId); } + await this.maybeCleanupSettledInteractiveRun(conversation, run, "pending input settled"); return; } if (state.questionnaire) { @@ -4310,6 +4410,11 @@ export class CodexPluginController { return false; } await this.store.removePendingRequest(pending.requestId); + await this.maybeCleanupSettledInteractiveRun( + conversation, + run, + "questionnaire submitted", + ); await this.sendText(conversation, "Recorded your answers and sent them to Codex."); return true; } @@ -5308,6 +5413,9 @@ export class CodexPluginController { } const active = this.activeRuns.get(buildConversationKey(callback.conversation)); if (!active) { + this.api.logger.warn( + `codex pending-input callback missing active run ${this.formatConversationForLog(callback.conversation)} requestId=${callback.requestId} pendingRequest=${pending.requestId} thread=${pending.threadId || ""}`, + ); await responders.reply("No active Codex run is waiting for input."); return; } @@ -5331,6 +5439,9 @@ export class CodexPluginController { } const active = this.activeRuns.get(buildConversationKey(callback.conversation)); if (!active) { + this.api.logger.warn( + `codex questionnaire callback missing active run ${this.formatConversationForLog(callback.conversation)} requestId=${callback.requestId} pendingRequest=${pending.requestId} thread=${pending.threadId || ""}`, + ); await responders.reply("No active Codex run is waiting for input."); return; } @@ -5413,6 +5524,11 @@ export class CodexPluginController { } await responders.clear().catch(() => undefined); await this.store.removePendingRequest(pending.requestId); + await this.maybeCleanupSettledInteractiveRun( + callback.conversation, + active.handle, + "questionnaire submitted", + ); if (callback.conversation.channel !== "discord") { await responders.reply("Recorded your answers and sent them to Codex."); }