Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2852,6 +2852,7 @@ export class CodexAppServerClient {
async compactThread(params: {
sessionKey?: string;
threadId: string;
signal?: AbortSignal;
onProgress?: (progress: CompactProgress) => Promise<void> | void;
}): Promise<CompactResult> {
const connectionPromise = this.ensureConnected();
Expand Down Expand Up @@ -2891,6 +2892,15 @@ export class CodexAppServerClient {
reject?.(new Error(message));
};

const abortListener = () => {
fail("Codex thread compaction canceled.");
};
if (params.signal?.aborted) {
abortListener();
} else {
params.signal?.addEventListener("abort", abortListener, { once: true });
}

const removeNotificationListener = this.addNotificationListener(async (method, notificationParams) => {
const methodLower = method.trim().toLowerCase();
const ids = extractIds(notificationParams);
Expand Down Expand Up @@ -2955,6 +2965,7 @@ export class CodexAppServerClient {
if (settleTimer) {
clearTimeout(settleTimer);
}
params.signal?.removeEventListener("abort", abortListener);
}
}

Expand Down
137 changes: 137 additions & 0 deletions src/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ async function createControllerHarness() {
threadState.sandbox = params.sandbox;
return { ...threadState };
}),
compactThread: vi.fn(async () => ({})),
startReview: vi.fn(() => ({
result: new Promise(() => {}),
getThreadId: () => "thread-1",
Expand Down Expand Up @@ -6397,6 +6398,142 @@ describe("Discord controller flows", () => {
);
});

it("stops compaction keepalives when cas_stop is used", async () => {
vi.useFakeTimers();
try {
const { controller, clientMock, sendMessageTelegram } = await createControllerHarness();
const conversation = {
channel: "telegram",
accountId: "default",
conversationId: "123",
};
const binding = {
conversation,
sessionKey: "session-1",
threadId: "thread-1",
workspaceDir: "/repo/openclaw",
updatedAt: Date.now(),
};
let compactAbortSignal: AbortSignal | undefined;
clientMock.compactThread = vi.fn(async (params: { signal?: AbortSignal }) => {
compactAbortSignal = params.signal;
await new Promise<void>((_resolve, reject) => {
params.signal?.addEventListener(
"abort",
() => {
reject(new Error("Codex thread compaction canceled."));
},
{ once: true },
);
});
return {};
});

const compactPromise = (controller as any).startCompact({
conversation,
binding,
});

await vi.advanceTimersByTimeAsync(12_000);
const beforeStop = sendMessageTelegram.mock.calls.flatMap((call) => {
const [, text] = call as unknown as [unknown, unknown];
return text === "Codex is still compacting." ? [text] : [];
});
expect(beforeStop).toHaveLength(1);

const reply = await controller.handleCommand(
"cas_stop",
buildTelegramCommandContext({
commandBody: "/cas_stop",
from: "telegram:123",
to: "telegram:123",
}),
);
expect(reply).toEqual({
text: "Stopped Codex compaction updates for this conversation.",
});
expect(compactAbortSignal?.aborted).toBe(true);

await vi.advanceTimersByTimeAsync(30_000);
await compactPromise;

const afterStop = sendMessageTelegram.mock.calls.flatMap((call) => {
const [, text] = call as unknown as [unknown, unknown];
return text === "Codex is still compacting." ? [text] : [];
});
expect(afterStop).toHaveLength(1);
expect((controller as any).activeCompactions.size).toBe(0);
} finally {
vi.useRealTimers();
}
});

it("stops compaction keepalives when cas_detach is used", async () => {
vi.useFakeTimers();
try {
const { controller, clientMock, sendMessageTelegram } = await createControllerHarness();
const conversation = {
channel: "telegram",
accountId: "default",
conversationId: "123",
};
await (controller as any).store.upsertBinding({
conversation,
sessionKey: "session-1",
threadId: "thread-1",
workspaceDir: "/repo/openclaw",
updatedAt: Date.now(),
});
let compactAbortSignal: AbortSignal | undefined;
clientMock.compactThread = vi.fn(async (params: { signal?: AbortSignal }) => {
compactAbortSignal = params.signal;
await new Promise<void>((_resolve, reject) => {
params.signal?.addEventListener(
"abort",
() => {
reject(new Error("Codex thread compaction canceled."));
},
{ once: true },
);
});
return {};
});

const compactPromise = (controller as any).startCompact({
conversation,
binding: (controller as any).store.getBinding(conversation),
});

await vi.advanceTimersByTimeAsync(12_000);
const detachReply = await controller.handleCommand(
"cas_detach",
buildTelegramCommandContext({
commandBody: "/cas_detach",
from: "telegram:123",
to: "telegram:123",
}),
);

expect(detachReply).toEqual({
text: "Detached this conversation from Codex.",
});
expect(compactAbortSignal?.aborted).toBe(true);

await vi.advanceTimersByTimeAsync(30_000);
await compactPromise;

const keepalives = sendMessageTelegram.mock.calls.flatMap((call) => {
const [, text] = call as unknown as [unknown, unknown];
return text === "Codex is still compacting." ? [text] : [];
});
expect(keepalives).toHaveLength(1);
expect((controller as any).store.getBinding(conversation)).toBeNull();
expect((controller as any).activeCompactions.size).toBe(0);
} finally {
vi.useRealTimers();
}
});

it("runs skills from the status card without rewriting the status message", async () => {
const { controller, sendMessageTelegram } = await createControllerHarness();
await (controller as any).store.upsertBinding({
Expand Down
84 changes: 78 additions & 6 deletions src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ type ActiveRunRecord = {
handle: ActiveCodexRun;
};

type ActiveCompactionRecord = {
conversation: ConversationTarget;
cancel: () => void;
};

const execFileAsync = promisify(execFile);
const require = createRequire(import.meta.url);
const TEXT_ATTACHMENT_FILE_EXTENSIONS = new Set([
Expand Down Expand Up @@ -1362,6 +1367,7 @@ export class CodexPluginController {
private readonly settings;
private readonly client;
private readonly activeRuns = new Map<string, ActiveRunRecord>();
private readonly activeCompactions = new Map<string, ActiveCompactionRecord>();
private readonly threadChangesCache = new Map<string, Promise<boolean | undefined>>();
private readonly store;
private serviceWorkspaceDir?: string;
Expand Down Expand Up @@ -1404,6 +1410,10 @@ export class CodexPluginController {
await active.handle.interrupt().catch(() => undefined);
}
this.activeRuns.clear();
for (const active of this.activeCompactions.values()) {
active.cancel();
}
this.activeCompactions.clear();
await this.client.close().catch(() => undefined);
this.started = false;
}
Expand Down Expand Up @@ -1909,6 +1919,7 @@ export class CodexPluginController {
if (!conversation) {
return { text: "This command needs a Telegram or Discord conversation." };
}
this.cancelActiveCompaction(conversation);
const detachResult = await bindingApi.detachConversationBinding?.();
await this.unbindConversation(conversation);
return {
Expand Down Expand Up @@ -2940,13 +2951,27 @@ export class CodexPluginController {
return { text: "This command needs a Telegram or Discord conversation." };
}
const active = this.activeRuns.get(buildConversationKey(conversation));
if (!active && this.cancelActiveCompaction(conversation)) {
return { text: "Stopped Codex compaction updates for this conversation." };
}
if (!active) {
return { text: "No active Codex run to stop." };
}
await active.handle.interrupt();
return { text: "Stopping Codex now." };
}

private cancelActiveCompaction(conversation: ConversationTarget): boolean {
const key = buildConversationKey(conversation);
const active = this.activeCompactions.get(key);
if (!active) {
return false;
}
this.activeCompactions.delete(key);
active.cancel();
return true;
}

private async handleSteerCommand(
conversation: ConversationTarget | null,
args: string,
Expand Down Expand Up @@ -3052,15 +3077,45 @@ export class CodexPluginController {
binding: StoredBinding;
}): Promise<void> {
const { conversation, binding } = params;
this.cancelActiveCompaction(conversation);
const key = buildConversationKey(conversation);
const profile = this.getPermissionsMode(binding);
const typing = await this.startTypingLease(conversation);
const abortController = new AbortController();
let keepaliveInterval: NodeJS.Timeout | undefined;
let progressTimer: NodeJS.Timeout | undefined;
let cancelled = false;
const cleanupKeepalive = () => {
if (progressTimer) {
clearTimeout(progressTimer);
progressTimer = undefined;
}
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
keepaliveInterval = undefined;
}
};
const cancelCompaction = () => {
if (cancelled) {
return;
}
cancelled = true;
cleanupKeepalive();
abortController.abort();
};
this.activeCompactions.set(key, {
conversation,
cancel: cancelCompaction,
});
let startingUsage = binding.contextUsage;
let latestUsage = startingUsage;
let lastEmittedUsageText = binding.contextUsage ? formatContextUsageText(binding.contextUsage) : undefined;
try {
let keepaliveInterval: NodeJS.Timeout | undefined;
const progressTimer = setTimeout(() => {
progressTimer = setTimeout(() => {
void (async () => {
if (cancelled) {
return;
}
const usageText =
latestUsage ? formatContextUsageText(latestUsage) : undefined;
if (usageText && usageText !== lastEmittedUsageText) {
Expand All @@ -3074,14 +3129,21 @@ export class CodexPluginController {
);
})();
keepaliveInterval = setInterval(() => {
if (cancelled) {
return;
}
void this.sendText(conversation, "Codex is still compacting.");
}, COMPACT_PROGRESS_INTERVAL_MS);
}, COMPACT_PROGRESS_DELAY_MS);
const result = await this.client.compactThread({
profile,
sessionKey: binding.sessionKey,
threadId: binding.threadId,
signal: abortController.signal,
onProgress: async (progress) => {
if (cancelled) {
return;
}
if (progress.usage) {
latestUsage = progress.usage;
startingUsage ??= progress.usage;
Expand All @@ -3091,9 +3153,9 @@ export class CodexPluginController {
}
},
});
clearTimeout(progressTimer);
if (keepaliveInterval) {
clearInterval(keepaliveInterval);
cleanupKeepalive();
if (cancelled) {
return;
}
await this.sendText(
conversation,
Expand All @@ -3117,8 +3179,16 @@ export class CodexPluginController {
}
return;
} catch (error) {
if (abortController.signal.aborted || cancelled) {
return;
}
await this.sendText(conversation, formatFailureText("compact", error));
} finally {
cleanupKeepalive();
const active = this.activeCompactions.get(key);
if (active?.cancel === cancelCompaction) {
this.activeCompactions.delete(key);
}
typing?.stop();
}
}
Expand Down Expand Up @@ -5700,6 +5770,7 @@ export class CodexPluginController {
await this.store.removeCallback(callback.token);
const active = this.activeRuns.get(buildConversationKey(callback.conversation));
if (!active) {
const stoppedCompaction = this.cancelActiveCompaction(callback.conversation);
const statusCard = await this.buildStatusCard(
{
...callback.conversation,
Expand All @@ -5709,7 +5780,7 @@ export class CodexPluginController {
Boolean(binding),
);
await responders.editPicker({
text: statusCard.text,
text: stoppedCompaction ? `${statusCard.text}\n\nCompaction stopped.` : statusCard.text,
buttons: statusCard.buttons ?? [],
});
return;
Expand Down Expand Up @@ -5750,6 +5821,7 @@ export class CodexPluginController {
}
if (callback.kind === "detach-thread") {
await this.store.removeCallback(callback.token);
this.cancelActiveCompaction(callback.conversation);
await responders.detachConversationBinding?.().catch(() => undefined);
await this.unbindConversation(callback.conversation);
const statusCard = await this.buildStatusCard(
Expand Down