diff --git a/index.ts b/index.ts index 07f2e28..78a700c 100644 --- a/index.ts +++ b/index.ts @@ -26,6 +26,49 @@ const plugin = { return await controller.handleInboundClaim(event); }); + const registerInternalHook = ( + api as OpenClawPluginApi & { + registerHook?: ( + events: string | string[], + handler: (event: unknown) => Promise | void, + opts?: { name?: string; description?: string }, + ) => void; + } + ).registerHook; + if (typeof registerInternalHook === "function") { + registerInternalHook( + "message:transcribed", + async (event) => { + await controller.handleMessageTranscribed(event as { + type?: string; + action?: string; + sessionKey?: string; + context?: Record; + }); + }, + { + name: "codex-transcribed-handoff", + description: "Send transcribed inbound audio to the bound Codex thread as text.", + }, + ); + + registerInternalHook( + "message:preprocessed", + async (event) => { + await controller.handleMessagePreprocessed(event as { + type?: string; + action?: string; + sessionKey?: string; + context?: Record; + }); + }, + { + name: "codex-preprocessed-audio-fallback", + description: "Fallback: transcribe bound inbound audio from mediaPath when transcript events do not arrive.", + }, + ); + } + api.registerInteractiveHandler({ channel: "telegram", namespace: INTERACTIVE_NAMESPACE, diff --git a/src/controller.test.ts b/src/controller.test.ts index 6b590b2..c55f41e 100644 --- a/src/controller.test.ts +++ b/src/controller.test.ts @@ -3642,6 +3642,82 @@ describe("Discord controller flows", () => { ); }); + it("starts a turn when a message:transcribed event arrives for a bound session", async () => { + const { controller } = await createControllerHarness(); + await (controller as any).store.upsertBinding({ + conversation: { + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + }, + sessionKey: "session-1", + threadId: "thread-1", + workspaceDir: "/repo/openclaw", + updatedAt: Date.now(), + }); + const startTurn = vi.fn(async () => undefined); + (controller as any).startTurn = startTurn; + + await controller.handleMessageTranscribed({ + type: "message", + action: "transcribed", + sessionKey: "session-1", + context: { + transcript: "Hello from voice", + mediaType: "audio/ogg", + }, + }); + + expect(startTurn).toHaveBeenCalledWith( + expect.objectContaining({ + conversation: expect.objectContaining({ conversationId: TEST_TELEGRAM_PEER_ID }), + prompt: "Hello from voice", + reason: "inbound", + }), + ); + }); + + it("falls back to runtime transcription on message:preprocessed when transcript is missing", async () => { + const { controller } = await createControllerHarness(); + await (controller as any).store.upsertBinding({ + conversation: { + channel: "telegram", + accountId: "default", + conversationId: TEST_TELEGRAM_PEER_ID, + }, + sessionKey: "session-1", + threadId: "thread-1", + workspaceDir: "/repo/openclaw", + updatedAt: Date.now(), + }); + const startTurn = vi.fn(async () => undefined); + (controller as any).startTurn = startTurn; + (controller as any).api.runtime = { + mediaUnderstanding: { + transcribeAudioFile: vi.fn(async () => ({ text: "Fallback transcript" })), + }, + }; + (controller as any).api.config = {}; + + await controller.handleMessagePreprocessed({ + type: "message", + action: "preprocessed", + sessionKey: "session-1", + context: { + mediaPath: "/tmp/test.ogg", + mediaType: "audio/ogg", + }, + }); + + expect(startTurn).toHaveBeenCalledWith( + expect.objectContaining({ + conversation: expect.objectContaining({ conversationId: TEST_TELEGRAM_PEER_ID }), + prompt: "Fallback transcript", + reason: "inbound", + }), + ); + }); + it("still ignores unsupported binary document attachments", async () => { const { controller, stateDir } = await createControllerHarness(); const filePath = path.join(stateDir, "tmp", "manual.pdf"); diff --git a/src/controller.ts b/src/controller.ts index c9c5249..49fac7a 100644 --- a/src/controller.ts +++ b/src/controller.ts @@ -1409,6 +1409,120 @@ export class CodexPluginController { } } + async handleMessageTranscribed(event: { + type?: string; + action?: string; + sessionKey?: string; + context?: Record; + }): Promise { + try { + if (!this.settings.enabled) { + return; + } + if (event.type !== "message" || event.action !== "transcribed") { + return; + } + await this.start(); + const sessionKey = typeof event.sessionKey === "string" ? event.sessionKey : ""; + const ctx = event.context ?? {}; + const transcript = typeof ctx.transcript === "string" ? ctx.transcript.trim() : ""; + if (!sessionKey || !transcript) { + return; + } + const binding = this.store.listBindings().find((entry) => entry.sessionKey === sessionKey) ?? null; + if (!binding) { + return; + } + const mediaType = typeof ctx.mediaType === "string" ? ctx.mediaType : ""; + const conversation = binding.conversation; + this.api.logger.info( + `codex message:transcribed starting turn ${this.formatConversationForLog(conversation)} mediaType=${mediaType || ""} prompt="${summarizeTextForLog(transcript)}"`, + ); + await this.startTurn({ + conversation, + binding, + workspaceDir: binding.workspaceDir, + prompt: transcript, + reason: "inbound", + }); + } catch (error) { + const detail = error instanceof Error ? `${error.message}\n${error.stack ?? ""}`.trim() : String(error); + this.api.logger.error(`codex message:transcribed failed: ${detail}`); + } + } + + async handleMessagePreprocessed(event: { + type?: string; + action?: string; + sessionKey?: string; + context?: Record; + }): Promise { + try { + if (!this.settings.enabled) { + return; + } + if (event.type !== "message" || event.action !== "preprocessed") { + return; + } + await this.start(); + const sessionKey = typeof event.sessionKey === "string" ? event.sessionKey : ""; + const ctx = event.context ?? {}; + const mediaPath = typeof ctx.mediaPath === "string" ? ctx.mediaPath : ""; + const mediaType = typeof ctx.mediaType === "string" ? ctx.mediaType : ""; + const existingTranscript = typeof ctx.transcript === "string" ? ctx.transcript.trim() : ""; + if ( + !sessionKey || + !mediaPath || + (!mediaType.startsWith("audio/") && !mediaPath.match(/\.(ogg|oga|opus|mp3|wav|m4a)$/i)) + ) { + return; + } + const binding = this.store.listBindings().find((entry) => entry.sessionKey === sessionKey) ?? null; + if (!binding) { + return; + } + const mediaUnderstanding = (this.api as OpenClawPluginApi & { + runtime?: { + mediaUnderstanding?: { + transcribeAudioFile?: (params: { + filePath: string; + cfg?: unknown; + mime?: string; + }) => Promise<{ text?: string } | null | undefined>; + }; + }; + config?: unknown; + }).runtime?.mediaUnderstanding; + const transcript = + existingTranscript || + (await mediaUnderstanding?.transcribeAudioFile?.({ + filePath: mediaPath, + cfg: (this.api as { config?: unknown }).config, + mime: mediaType || undefined, + }))?.text?.trim() || + ""; + if (!transcript) { + this.api.logger.warn( + `codex message:preprocessed audio fallback produced no transcript ${this.formatConversationForLog(binding.conversation)} mediaPath=${mediaPath}`, + ); + return; + } + this.api.logger.info( + `codex message:preprocessed audio fallback starting turn ${this.formatConversationForLog(binding.conversation)} prompt="${summarizeTextForLog(transcript)}"`, + ); + await this.startTurn({ + conversation: binding.conversation, + binding, + workspaceDir: binding.workspaceDir, + prompt: transcript, + reason: "inbound", + }); + } catch (error) { + const detail = error instanceof Error ? `${error.message}\n${error.stack ?? ""}`.trim() : String(error); + this.api.logger.error(`codex message:preprocessed fallback failed: ${detail}`); + } + } + async handleTelegramInteractive(ctx: PluginInteractiveTelegramHandlerContext): Promise { await this.start(); const bindingApi = asScopedBindingApi(ctx);