Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 338 additions & 0 deletions src/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4930,6 +4930,344 @@ describe("Discord controller flows", () => {
}
});

it("keeps plan questionnaire callbacks active until the pending input is settled", async () => {
const { controller } = await createControllerHarness();
let awaitingInput = true;
let clearPendingInput: (() => Promise<void>) | null = null;
(controller as any).client.startTurn = vi.fn((params: any) => {
clearPendingInput = async () => {
awaitingInput = false;
await params.onPendingInput?.(null);
};
void Promise.resolve().then(() =>
params.onPendingInput?.({
requestId: "req-plan-keepalive",
options: [],
expiresAt: Date.now() + 7 * 24 * 60 * 60_000,
method: "item/tool/requestUserInput",
questionnaire: {
currentIndex: 0,
questions: [
{
index: 0,
id: "style",
header: "Fix Style",
prompt: "How should this behave?",
options: [
{ key: "A", label: "Fallback", description: "Use a fallback." },
{ key: "B", label: "Fail", description: "Keep failing." },
],
guidance: [],
},
],
answers: [null],
responseMode: "structured",
},
}),
);
return {
result: Promise.resolve({
threadId: "thread-1",
}),
getThreadId: () => "thread-1",
queueMessage: vi.fn(async () => false),
interrupt: vi.fn(async () => {}),
isAwaitingInput: () => awaitingInput,
submitPendingInput: vi.fn(async () => false),
submitPendingInputPayload: vi.fn(async () => {
await clearPendingInput?.();
return true;
}),
};
});

await (controller as any).startPlan({
conversation: {
channel: "telegram",
accountId: "default",
conversationId: TEST_TELEGRAM_PEER_ID,
},
binding: null,
workspaceDir: "/repo/openclaw",
prompt: "Plan this.",
announceStart: false,
});

await flushAsyncWork();
await flushAsyncWork();

const key = "telegram::default::telegram-user-1::";
expect((controller as any).activeRuns.get(key)?.mode).toBe("plan");
expect((controller as any).store.getPendingRequestById("req-plan-keepalive")).not.toBeNull();

const callback = ((controller as any).store as any).snapshot.callbacks.find(
(entry: any) =>
entry.kind === "pending-questionnaire" && entry.requestId === "req-plan-keepalive",
);
expect(callback).toBeTruthy();

const reply = vi.fn(async () => {});
await controller.handleTelegramInteractive({
channel: "telegram",
accountId: "default",
conversationId: TEST_TELEGRAM_PEER_ID,
callback: { payload: callback.token },
respond: {
clearButtons: vi.fn(async () => {}),
reply,
editMessage: vi.fn(async () => {}),
},
} as any);

expect(reply).toHaveBeenCalledWith(
expect.objectContaining({ text: "Recorded your answers and sent them to Codex." }),
);
expect((controller as any).activeRuns.get(key)).toBeUndefined();
expect((controller as any).store.getPendingRequestById("req-plan-keepalive")).toBeNull();
});

it("keeps standard pending-input callbacks active until the input is settled", async () => {
const { controller } = await createControllerHarness();
let awaitingInput = true;
let clearPendingInput: (() => Promise<void>) | null = null;
(controller as any).client.startTurn = vi.fn((params: any) => {
clearPendingInput = async () => {
awaitingInput = false;
await params.onPendingInput?.(null);
};
void Promise.resolve().then(() =>
params.onPendingInput?.({
requestId: "req-turn-keepalive",
options: ["Approve Once"],
actions: [
{
kind: "option",
label: "Approve Once",
value: "approve-once",
},
],
expiresAt: Date.now() + 7 * 24 * 60 * 60_000,
promptText: "Codex needs input.",
}),
);
return {
result: Promise.resolve({
threadId: "thread-1",
}),
getThreadId: () => "thread-1",
queueMessage: vi.fn(async () => false),
interrupt: vi.fn(async () => {}),
isAwaitingInput: () => awaitingInput,
submitPendingInput: vi.fn(async () => {
await clearPendingInput?.();
return true;
}),
submitPendingInputPayload: vi.fn(async () => false),
};
});

await (controller as any).startTurn({
conversation: {
channel: "telegram",
accountId: "default",
conversationId: TEST_TELEGRAM_PEER_ID,
},
binding: null,
workspaceDir: "/repo/openclaw",
prompt: "Continue.",
reason: "command",
});

await flushAsyncWork();
await flushAsyncWork();

const key = "telegram::default::telegram-user-1::";
expect((controller as any).activeRuns.get(key)?.mode).toBe("default");
expect((controller as any).store.getPendingRequestById("req-turn-keepalive")).not.toBeNull();

const callback = ((controller as any).store as any).snapshot.callbacks.find(
(entry: any) => entry.kind === "pending-input" && entry.requestId === "req-turn-keepalive",
);
expect(callback).toBeTruthy();

const reply = vi.fn(async () => {});
await controller.handleTelegramInteractive({
channel: "telegram",
accountId: "default",
conversationId: TEST_TELEGRAM_PEER_ID,
callback: { payload: callback.token },
respond: {
clearButtons: vi.fn(async () => {}),
reply,
editMessage: vi.fn(async () => {}),
},
} as any);

expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: "Sent to Codex." }));
expect((controller as any).activeRuns.get(key)).toBeUndefined();
expect((controller as any).store.getPendingRequestById("req-turn-keepalive")).toBeNull();
});

it("restores the active plan run when questionnaire input arrives after result cleanup", async () => {
const { controller } = await createControllerHarness();
const conversation = {
channel: "telegram",
accountId: "default",
conversationId: TEST_TELEGRAM_PEER_ID,
} as const;
const activeKey = `telegram::default::${TEST_TELEGRAM_PEER_ID}::`;
let onPendingInput: ((state: any) => Promise<void>) | undefined;
const submitPendingInputPayload = vi.fn(async () => true);
(controller as any).client.startTurn = vi.fn((params: any) => {
onPendingInput = params.onPendingInput;
return {
result: Promise.resolve({ threadId: "thread-1" }),
getThreadId: () => "thread-1",
queueMessage: vi.fn(async () => false),
interrupt: vi.fn(async () => {}),
isAwaitingInput: () => false,
submitPendingInput: vi.fn(async () => false),
submitPendingInputPayload,
};
});

await (controller as any).startPlan({
conversation,
binding: null,
workspaceDir: "/repo/openclaw",
prompt: "Ask the breakfast question.",
});

await Promise.resolve();
await Promise.resolve();
(controller as any).activeRuns.delete(activeKey);
expect((controller as any).activeRuns.get(activeKey)).toBeUndefined();

await onPendingInput?.({
requestId: "req-plan-race",
options: [],
expiresAt: Date.now() + 7 * 24 * 60 * 60_000,
method: "item/tool/requestUserInput",
questionnaire: {
currentIndex: 0,
questions: [
{
index: 0,
id: "breakfast",
header: "Breakfast",
prompt: "Do you like cereal?",
options: [
{ key: "A", label: "Yes", description: "Choose yes." },
],
guidance: [],
},
],
answers: [null],
responseMode: "structured",
},
});

expect((controller as any).activeRuns.get(activeKey)?.mode).toBe("plan");
const callback = (controller as any).store.snapshot.callbacks.find((entry: any) =>
entry.kind === "pending-questionnaire" &&
entry.requestId === "req-plan-race" &&
entry.action === "select"
);
expect(callback).toBeTruthy();
const reply = vi.fn(async () => {});

await controller.handleTelegramInteractive({
...conversation,
callback: {
payload: callback.token,
},
respond: {
clearButtons: vi.fn(async () => {}),
reply,
editMessage: vi.fn(async () => {}),
},
} as any);

expect(submitPendingInputPayload).toHaveBeenCalledWith({
answers: {
breakfast: { answers: ["Yes"] },
},
});
expect(reply).toHaveBeenCalledWith({
text: "Recorded your answers and sent them to Codex.",
});
expect((controller as any).activeRuns.get(activeKey)).toBeUndefined();
expect((controller as any).store.getPendingRequestById("req-plan-race")).toBeNull();
});

it("ignores late questionnaire input from an older run when a newer run is already active", async () => {
const { controller } = await createControllerHarness();
const conversation = {
channel: "telegram",
accountId: "default",
conversationId: TEST_TELEGRAM_PEER_ID,
} as const;
const activeKey = `telegram::default::${TEST_TELEGRAM_PEER_ID}::`;
const newerRun = {
getThreadId: () => "thread-new",
queueMessage: vi.fn(async () => false),
interrupt: vi.fn(async () => {}),
isAwaitingInput: () => false,
submitPendingInput: vi.fn(async () => false),
submitPendingInputPayload: vi.fn(async () => false),
};
const olderRun = {
getThreadId: () => "thread-old",
queueMessage: vi.fn(async () => false),
interrupt: vi.fn(async () => {}),
isAwaitingInput: () => false,
submitPendingInput: vi.fn(async () => false),
submitPendingInputPayload: vi.fn(async () => false),
};

(controller as any).activeRuns.set(activeKey, {
conversation,
workspaceDir: "/repo/new",
mode: "plan",
profile: "default",
handle: newerRun,
});

await (controller as any).handlePendingInputState(
conversation,
"/repo/old",
"plan",
"default",
{
requestId: "req-old-run",
options: [],
expiresAt: Date.now() + 7 * 24 * 60 * 60_000,
method: "item/tool/requestUserInput",
questionnaire: {
currentIndex: 0,
questions: [
{
index: 0,
id: "breakfast",
header: "Breakfast",
prompt: "Do you like cereal?",
options: [
{ key: "A", label: "Yes", description: "Choose yes." },
],
guidance: [],
},
],
answers: [null],
responseMode: "structured",
},
},
olderRun as any,
);

expect((controller as any).activeRuns.get(activeKey)?.handle).toBe(newerRun);
expect((controller as any).store.getPendingRequestById("req-old-run")).toBeNull();
});

it("tells the user to log back in when Codex reports OpenAI auth is required", async () => {
const { controller, clientMock, sendMessageTelegram } = await createControllerHarness();
clientMock.readAccount.mockResolvedValue({
Expand Down
Loading