From 205be2d4c532d1cf778e8349af6473a7e4bbc28a Mon Sep 17 00:00:00 2001 From: zhulijin1991 <167979819+zhulijin1991@users.noreply.github.com> Date: Mon, 11 May 2026 19:46:18 +0800 Subject: [PATCH] Route active Feishu card text sends --- src/card/active-streaming-card-store.ts | 109 ++++++++++++++++++++++ src/card/reply-dispatcher.ts | 23 ++++- src/messaging/outbound/actions.ts | 21 ++++- tests/active-streaming-card-store.test.ts | 69 ++++++++++++++ 4 files changed, 220 insertions(+), 2 deletions(-) create mode 100644 src/card/active-streaming-card-store.ts create mode 100644 tests/active-streaming-card-store.test.ts diff --git a/src/card/active-streaming-card-store.ts b/src/card/active-streaming-card-store.ts new file mode 100644 index 00000000..e2f64872 --- /dev/null +++ b/src/card/active-streaming-card-store.ts @@ -0,0 +1,109 @@ +/** + * Copyright (c) 2026 ByteDance Ltd. and/or its affiliates + * SPDX-License-Identifier: MIT + * + * Process-local registry for active Feishu streaming cards. + * + * The shared message tool can deliver a final text reply through the channel + * action path while an inbound turn already owns a streaming card. This bridge + * routes same-session, same-target text-only sends back into that card instead + * of posting a separate message and leaving the card stale. + */ + +import type { ChannelThreadingToolContext } from 'openclaw/plugin-sdk/channel-contract'; +import type { StreamingCardController } from './streaming-card-controller'; + +interface ActiveStreamingCard { + accountId?: string; + chatId: string; + controller: StreamingCardController; +} + +interface RegisterActiveStreamingCardParams { + sessionKey?: string | null; + accountId?: string | null; + chatId: string; + controller: StreamingCardController; +} + +interface DeliverTextToActiveStreamingCardParams { + sessionKey?: string | null; + accountId?: string | null; + to?: string; + text: string; + card?: unknown; + mediaUrl?: string; + toolContext?: ChannelThreadingToolContext; +} + +const activeCards = new Map(); + +export function registerActiveStreamingCard(params: RegisterActiveStreamingCardParams): () => void { + const sessionKey = normalizeKey(params.sessionKey); + if (!sessionKey) return () => {}; + + activeCards.set(sessionKey, { + accountId: params.accountId ?? undefined, + chatId: params.chatId, + controller: params.controller, + }); + + return () => unregisterActiveStreamingCard(params.sessionKey, params.controller); +} + +export function unregisterActiveStreamingCard( + sessionKey?: string | null, + controller?: StreamingCardController, +): void { + const key = normalizeKey(sessionKey); + if (!key) return; + const active = activeCards.get(key); + if (!active) return; + if (controller && active.controller !== controller) return; + activeCards.delete(key); +} + +export async function deliverTextToActiveStreamingCard(params: DeliverTextToActiveStreamingCardParams): Promise<{ + ok: true; + messageId: string; + chatId: string; + routedViaStreamingCard: true; +} | null> { + const key = normalizeKey(params.sessionKey); + if (!key || !params.text.trim()) return null; + if (params.card || params.mediaUrl) return null; + + const active = activeCards.get(key); + if (!active) return null; + if (params.accountId && active.accountId && params.accountId !== active.accountId) return null; + if (!isCurrentTurnTarget(params.to, active.chatId, params.toolContext?.currentChannelId)) return null; + + await active.controller.onDeliver({ text: params.text }); + const messageId = active.controller.cardMessageId; + if (!messageId) return null; + + return { + ok: true, + messageId, + chatId: active.chatId, + routedViaStreamingCard: true, + }; +} + +function isCurrentTurnTarget(to: string | undefined, activeChatId: string, currentChannelId: string | undefined): boolean { + const normalizedTo = normalizeTarget(to); + if (!normalizedTo) return true; + return normalizedTo === normalizeTarget(activeChatId) || normalizedTo === normalizeTarget(currentChannelId); +} + +function normalizeKey(value: unknown): string { + return typeof value === 'string' ? value.trim().toLowerCase() : ''; +} + +function normalizeTarget(value: unknown): string { + if (typeof value !== 'string') return ''; + return value + .trim() + .replace(/^(?:user|chat|group|open_id|open-chat-id):/i, '') + .toLowerCase(); +} diff --git a/src/card/reply-dispatcher.ts b/src/card/reply-dispatcher.ts index a2635c61..d7453ca4 100644 --- a/src/card/reply-dispatcher.ts +++ b/src/card/reply-dispatcher.ts @@ -21,6 +21,7 @@ import { larkLogger } from '../core/lark-logger'; import { sendMediaLark } from '../messaging/outbound/deliver'; import { sendMarkdownCardFeishu, sendMessageFeishu } from '../messaging/outbound/send'; import { type TypingIndicatorState, addTypingIndicator, removeTypingIndicator } from '../messaging/outbound/typing'; +import { registerActiveStreamingCard } from './active-streaming-card-store'; import { splitReasoningText, stripReasoningTags } from './builder'; import { isCardTableLimitError } from './card-error'; import type { CreateFeishuReplyDispatcherParams, FeishuReplyDispatcherResult } from './reply-dispatcher-types'; @@ -101,6 +102,14 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP resolvedFooter, }) : null; + const unregisterActiveCard = controller + ? registerActiveStreamingCard({ + sessionKey, + accountId, + chatId, + controller, + }) + : () => {}; // ---- Static mode unavailable guard ---- // In streaming mode the controller owns its own guard; in static mode @@ -359,10 +368,12 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (controller) { if (controller.terminateIfUnavailable('onError', err)) { typingCallbacks.onIdle?.(); + unregisterActiveCard(); return; } await controller.onError(err, info); typingCallbacks.onIdle?.(); + unregisterActiveCard(); return; } @@ -388,6 +399,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (controller) { await controller.onIdle(); + unregisterActiveCard(); } typingCallbacks.onIdle?.(); @@ -395,11 +407,20 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP onCleanup: async () => { typingCallbacks.onCleanup?.(); + unregisterActiveCard(); }, }); // ---- Abort card (delegates to controller or no-op for static) ---- - const abortCard = controller ? () => controller.abortCard() : async () => {}; + const abortCard = controller + ? async () => { + try { + await controller.abortCard(); + } finally { + unregisterActiveCard(); + } + } + : async () => {}; return { dispatcher, diff --git a/src/messaging/outbound/actions.ts b/src/messaging/outbound/actions.ts index c7b7ebfd..c369940f 100644 --- a/src/messaging/outbound/actions.ts +++ b/src/messaging/outbound/actions.ts @@ -23,6 +23,7 @@ import { extractToolSend } from 'openclaw/plugin-sdk/tool-send'; import { readStringParam } from 'openclaw/plugin-sdk/param-readers'; import { jsonResult, readReactionParams } from '../../core/sdk-compat'; +import { deliverTextToActiveStreamingCard } from '../../card/active-streaming-card-store'; import { LarkClient } from '../../core/lark-client'; import { getEnabledLarkAccounts } from '../../core/accounts'; import { larkLogger } from '../../core/lark-logger'; @@ -187,7 +188,10 @@ export const feishuMessageActions: ChannelMessageActionAdapter = { try { switch (action) { case 'send': - return await deliverMessage(cfg, readFeishuSendParams(params, toolContext), aid, ctx.mediaLocalRoots); + return await deliverMessage(cfg, readFeishuSendParams(params, toolContext), aid, ctx.mediaLocalRoots, { + sessionKey: ctx.sessionKey, + toolContext, + }); case 'react': return await handleReact(cfg, params, aid); case 'reactions': @@ -225,6 +229,7 @@ async function deliverMessage( sp: FeishuSendParams, accountId?: string, mediaLocalRoots?: readonly string[], + runtimeContext?: { sessionKey?: string | null; toolContext?: ChannelThreadingToolContext }, ) { const { to, text, mediaUrl, fileName, replyToMessageId, replyInThread, card } = sp; @@ -243,6 +248,20 @@ async function deliverMessage( throw new Error('send requires at least one of: message, card, or media.'); } + const activeCardResult = await deliverTextToActiveStreamingCard({ + sessionKey: runtimeContext?.sessionKey, + accountId, + to, + text, + card, + mediaUrl, + toolContext: runtimeContext?.toolContext, + }); + if (activeCardResult) { + log.info(`deliverMessage: routed text into active streaming card, messageId=${activeCardResult.messageId}`); + return jsonResult(activeCardResult); + } + const sendCtx = { cfg, to, replyToMessageId, replyInThread, accountId }; // Send text first if both text and card/media are present. diff --git a/tests/active-streaming-card-store.test.ts b/tests/active-streaming-card-store.test.ts new file mode 100644 index 00000000..7e2ff90e --- /dev/null +++ b/tests/active-streaming-card-store.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, it, vi } from 'vitest'; +import { + deliverTextToActiveStreamingCard, + registerActiveStreamingCard, +} from '../src/card/active-streaming-card-store'; +import type { StreamingCardController } from '../src/card/streaming-card-controller'; + +function createController() { + return { + cardMessageId: 'om_card', + onDeliver: vi.fn().mockResolvedValue(undefined), + } as unknown as StreamingCardController & { onDeliver: ReturnType }; +} + +describe('active streaming card store', () => { + it('routes same-session text-only sends to the active card', async () => { + const controller = createController(); + const unregister = registerActiveStreamingCard({ + sessionKey: 'Agent:Main:Feishu:Direct:User', + accountId: 'default', + chatId: 'oc_chat', + controller, + }); + + try { + const result = await deliverTextToActiveStreamingCard({ + sessionKey: 'agent:main:feishu:direct:user', + accountId: 'default', + to: 'chat:oc_chat', + text: 'final answer', + }); + + expect(result).toEqual({ + ok: true, + messageId: 'om_card', + chatId: 'oc_chat', + routedViaStreamingCard: true, + }); + expect(controller.onDeliver).toHaveBeenCalledWith({ text: 'final answer' }); + } finally { + unregister(); + } + }); + + it('does not route card, media, or different-target sends', async () => { + const controller = createController(); + const unregister = registerActiveStreamingCard({ + sessionKey: 'session-2', + accountId: 'default', + chatId: 'oc_chat', + controller, + }); + + try { + await expect(deliverTextToActiveStreamingCard({ sessionKey: 'session-2', text: 'x', card: {} })).resolves.toBe( + null, + ); + await expect( + deliverTextToActiveStreamingCard({ sessionKey: 'session-2', text: 'x', mediaUrl: 'https://example.test/a.png' }), + ).resolves.toBe(null); + await expect( + deliverTextToActiveStreamingCard({ sessionKey: 'session-2', text: 'x', to: 'oc_other' }), + ).resolves.toBe(null); + expect(controller.onDeliver).not.toHaveBeenCalled(); + } finally { + unregister(); + } + }); +});