Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions src/card/active-streaming-card-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright (c) 2026 ByteDance Ltd. and/or its affiliates
* SPDX-License-Identifier: MIT
*
* Process-local registry for active Feishu streaming cards.
*
* The shared message tool can deliver a final text reply through the channel
* action path while an inbound turn already owns a streaming card. This bridge
* routes same-session, same-target text-only sends back into that card instead
* of posting a separate message and leaving the card stale.
*/

import type { ChannelThreadingToolContext } from 'openclaw/plugin-sdk/channel-contract';
import type { StreamingCardController } from './streaming-card-controller';

interface ActiveStreamingCard {
accountId?: string;
chatId: string;
controller: StreamingCardController;
}

interface RegisterActiveStreamingCardParams {
sessionKey?: string | null;
accountId?: string | null;
chatId: string;
controller: StreamingCardController;
}

interface DeliverTextToActiveStreamingCardParams {
sessionKey?: string | null;
accountId?: string | null;
to?: string;
text: string;
card?: unknown;
mediaUrl?: string;
toolContext?: ChannelThreadingToolContext;
}

const activeCards = new Map<string, ActiveStreamingCard>();

export function registerActiveStreamingCard(params: RegisterActiveStreamingCardParams): () => void {
const sessionKey = normalizeKey(params.sessionKey);
if (!sessionKey) return () => {};

activeCards.set(sessionKey, {
accountId: params.accountId ?? undefined,
chatId: params.chatId,
controller: params.controller,
});

return () => unregisterActiveStreamingCard(params.sessionKey, params.controller);
}

export function unregisterActiveStreamingCard(
sessionKey?: string | null,
controller?: StreamingCardController,
): void {
const key = normalizeKey(sessionKey);
if (!key) return;
const active = activeCards.get(key);
if (!active) return;
if (controller && active.controller !== controller) return;
activeCards.delete(key);
}

export async function deliverTextToActiveStreamingCard(params: DeliverTextToActiveStreamingCardParams): Promise<{
ok: true;
messageId: string;
chatId: string;
routedViaStreamingCard: true;
} | null> {
const key = normalizeKey(params.sessionKey);
if (!key || !params.text.trim()) return null;
if (params.card || params.mediaUrl) return null;

const active = activeCards.get(key);
if (!active) return null;
if (params.accountId && active.accountId && params.accountId !== active.accountId) return null;
if (!isCurrentTurnTarget(params.to, active.chatId, params.toolContext?.currentChannelId)) return null;

await active.controller.onDeliver({ text: params.text });
const messageId = active.controller.cardMessageId;
if (!messageId) return null;

return {
ok: true,
messageId,
chatId: active.chatId,
routedViaStreamingCard: true,
};
}

function isCurrentTurnTarget(to: string | undefined, activeChatId: string, currentChannelId: string | undefined): boolean {
const normalizedTo = normalizeTarget(to);
if (!normalizedTo) return true;
return normalizedTo === normalizeTarget(activeChatId) || normalizedTo === normalizeTarget(currentChannelId);
}

function normalizeKey(value: unknown): string {
return typeof value === 'string' ? value.trim().toLowerCase() : '';
}

function normalizeTarget(value: unknown): string {
if (typeof value !== 'string') return '';
return value
.trim()
.replace(/^(?:user|chat|group|open_id|open-chat-id):/i, '')
.toLowerCase();
}
23 changes: 22 additions & 1 deletion src/card/reply-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { larkLogger } from '../core/lark-logger';
import { sendMediaLark } from '../messaging/outbound/deliver';
import { sendMarkdownCardFeishu, sendMessageFeishu } from '../messaging/outbound/send';
import { type TypingIndicatorState, addTypingIndicator, removeTypingIndicator } from '../messaging/outbound/typing';
import { registerActiveStreamingCard } from './active-streaming-card-store';
import { splitReasoningText, stripReasoningTags } from './builder';
import { isCardTableLimitError } from './card-error';
import type { CreateFeishuReplyDispatcherParams, FeishuReplyDispatcherResult } from './reply-dispatcher-types';
Expand Down Expand Up @@ -101,6 +102,14 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
resolvedFooter,
})
: null;
const unregisterActiveCard = controller
? registerActiveStreamingCard({
sessionKey,
accountId,
chatId,
controller,
})
: () => {};

// ---- Static mode unavailable guard ----
// In streaming mode the controller owns its own guard; in static mode
Expand Down Expand Up @@ -359,10 +368,12 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (controller) {
if (controller.terminateIfUnavailable('onError', err)) {
typingCallbacks.onIdle?.();
unregisterActiveCard();
return;
}
await controller.onError(err, info);
typingCallbacks.onIdle?.();
unregisterActiveCard();
return;
}

Expand All @@ -388,18 +399,28 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP

if (controller) {
await controller.onIdle();
unregisterActiveCard();
}

typingCallbacks.onIdle?.();
},

onCleanup: async () => {
typingCallbacks.onCleanup?.();
unregisterActiveCard();
},
});

// ---- Abort card (delegates to controller or no-op for static) ----
const abortCard = controller ? () => controller.abortCard() : async () => {};
const abortCard = controller
? async () => {
try {
await controller.abortCard();
} finally {
unregisterActiveCard();
}
}
: async () => {};

return {
dispatcher,
Expand Down
21 changes: 20 additions & 1 deletion src/messaging/outbound/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { extractToolSend } from 'openclaw/plugin-sdk/tool-send';
import { readStringParam } from 'openclaw/plugin-sdk/param-readers';
import { jsonResult, readReactionParams } from '../../core/sdk-compat';

import { deliverTextToActiveStreamingCard } from '../../card/active-streaming-card-store';
import { LarkClient } from '../../core/lark-client';
import { getEnabledLarkAccounts } from '../../core/accounts';
import { larkLogger } from '../../core/lark-logger';
Expand Down Expand Up @@ -187,7 +188,10 @@ export const feishuMessageActions: ChannelMessageActionAdapter = {
try {
switch (action) {
case 'send':
return await deliverMessage(cfg, readFeishuSendParams(params, toolContext), aid, ctx.mediaLocalRoots);
return await deliverMessage(cfg, readFeishuSendParams(params, toolContext), aid, ctx.mediaLocalRoots, {
sessionKey: ctx.sessionKey,
toolContext,
});
case 'react':
return await handleReact(cfg, params, aid);
case 'reactions':
Expand Down Expand Up @@ -225,6 +229,7 @@ async function deliverMessage(
sp: FeishuSendParams,
accountId?: string,
mediaLocalRoots?: readonly string[],
runtimeContext?: { sessionKey?: string | null; toolContext?: ChannelThreadingToolContext },
) {
const { to, text, mediaUrl, fileName, replyToMessageId, replyInThread, card } = sp;

Expand All @@ -243,6 +248,20 @@ async function deliverMessage(
throw new Error('send requires at least one of: message, card, or media.');
}

const activeCardResult = await deliverTextToActiveStreamingCard({
sessionKey: runtimeContext?.sessionKey,
accountId,
to,
text,
card,
mediaUrl,
toolContext: runtimeContext?.toolContext,
});
if (activeCardResult) {
log.info(`deliverMessage: routed text into active streaming card, messageId=${activeCardResult.messageId}`);
return jsonResult(activeCardResult);
}

const sendCtx = { cfg, to, replyToMessageId, replyInThread, accountId };

// Send text first if both text and card/media are present.
Expand Down
69 changes: 69 additions & 0 deletions tests/active-streaming-card-store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { describe, expect, it, vi } from 'vitest';
import {
deliverTextToActiveStreamingCard,
registerActiveStreamingCard,
} from '../src/card/active-streaming-card-store';
import type { StreamingCardController } from '../src/card/streaming-card-controller';

function createController() {
return {
cardMessageId: 'om_card',
onDeliver: vi.fn().mockResolvedValue(undefined),
} as unknown as StreamingCardController & { onDeliver: ReturnType<typeof vi.fn> };
}

describe('active streaming card store', () => {
it('routes same-session text-only sends to the active card', async () => {
const controller = createController();
const unregister = registerActiveStreamingCard({
sessionKey: 'Agent:Main:Feishu:Direct:User',
accountId: 'default',
chatId: 'oc_chat',
controller,
});

try {
const result = await deliverTextToActiveStreamingCard({
sessionKey: 'agent:main:feishu:direct:user',
accountId: 'default',
to: 'chat:oc_chat',
text: 'final answer',
});

expect(result).toEqual({
ok: true,
messageId: 'om_card',
chatId: 'oc_chat',
routedViaStreamingCard: true,
});
expect(controller.onDeliver).toHaveBeenCalledWith({ text: 'final answer' });
} finally {
unregister();
}
});

it('does not route card, media, or different-target sends', async () => {
const controller = createController();
const unregister = registerActiveStreamingCard({
sessionKey: 'session-2',
accountId: 'default',
chatId: 'oc_chat',
controller,
});

try {
await expect(deliverTextToActiveStreamingCard({ sessionKey: 'session-2', text: 'x', card: {} })).resolves.toBe(
null,
);
await expect(
deliverTextToActiveStreamingCard({ sessionKey: 'session-2', text: 'x', mediaUrl: 'https://example.test/a.png' }),
).resolves.toBe(null);
await expect(
deliverTextToActiveStreamingCard({ sessionKey: 'session-2', text: 'x', to: 'oc_other' }),
).resolves.toBe(null);
expect(controller.onDeliver).not.toHaveBeenCalled();
} finally {
unregister();
}
});
});