Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/card/reply-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
return;
}

let textHandledByController = false;

// ---- Streaming card mode ----
if (controller) {
if (meta?.kind === 'tool' && shouldRouteToolPayloadToCard(payload, toolUseDisplay.showToolUse)) {
Expand All @@ -242,18 +244,20 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (controller.cardMessageId) {
if (payload.isReasoning === true) {
await controller.onReasoningStream({ ...payload, text: controllerText });
return;
textHandledByController = true;
if (payloadMediaUrls.length === 0) return;
} else {
await controller.onDeliver({ ...payload, text: controllerText });
textHandledByController = true;
}
await controller.onDeliver({ ...payload, text: controllerText });
return;
}
// Card creation failed — fall through to static delivery
log.warn('deliver: card creation failed, falling back to static delivery');
}
}

// ---- Static text delivery ----
if (text.trim()) {
if (text.trim() && !textHandledByController) {
if (shouldUseCard(text)) {
const chunks = core.channel.text.chunkTextWithMode(text, textChunkLimit, chunkMode);
log.info('deliver: sending card chunks', {
Expand Down Expand Up @@ -356,6 +360,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
replyToMessageId,
replyInThread,
});
controller?.markMediaDelivered();
} catch (mediaErr) {
if (staticGuard?.terminate('deliver.media', mediaErr)) return;
log.error('deliver: static media send failed', {
Expand Down
13 changes: 11 additions & 2 deletions src/card/streaming-card-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ export class StreamingCardController {
private createEpoch = 0;
private _terminalReason: TerminalReason | null = null;
private dispatchFullyComplete = false;
private visibleMediaDelivered = false;
private cardCreationPromise: Promise<void> | null = null;
private disposeShutdownHook: (() => void) | null = null;
private readonly dispatchStartTime = Date.now();
Expand Down Expand Up @@ -690,8 +691,10 @@ export class StreamingCardController {
const isNoReplyLeak =
!this.text.completedText && SILENT_REPLY_TOKEN.startsWith(this.text.accumulatedText.trim());
const displayText =
this.text.completedText || (isNoReplyLeak ? '' : this.text.accumulatedText) || EMPTY_REPLY_FALLBACK_TEXT;
if (!this.text.completedText && !this.text.accumulatedText) {
this.text.completedText ||
(isNoReplyLeak ? '' : this.text.accumulatedText) ||
EMPTY_REPLY_FALLBACK_TEXT;
if (!this.text.completedText && !this.text.accumulatedText && !this.visibleMediaDelivered) {
log.warn('reply completed without visible text, using empty-reply fallback');
}

Expand Down Expand Up @@ -763,10 +766,16 @@ export class StreamingCardController {
log.debug('markFullyComplete', {
completedTextLen: this.text.completedText.length,
accumulatedTextLen: this.text.accumulatedText.length,
visibleMediaDelivered: this.visibleMediaDelivered,
});
this.dispatchFullyComplete = true;
}

markMediaDelivered(): void {
this.captureToolUseElapsed();
this.visibleMediaDelivered = true;
}

async abortCard(): Promise<void> {
try {
this.captureToolUseElapsed();
Expand Down
106 changes: 94 additions & 12 deletions tests/reply-dispatcher-media.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ import { beforeEach, describe, expect, it, vi } from 'vitest';
// Module mocks
// ---------------------------------------------------------------------------

const replyModeState = vi.hoisted(() => ({ mode: 'static' as 'static' | 'streaming' }));
const streamingControllerInstances = vi.hoisted(
() =>
[] as Array<{
cardMessageId: string | null;
ensureCardCreated: ReturnType<typeof vi.fn>;
isAborted: boolean;
isTerminated: boolean;
markMediaDelivered: ReturnType<typeof vi.fn>;
onDeliver: ReturnType<typeof vi.fn>;
onIdle: ReturnType<typeof vi.fn>;
onReasoningStream: ReturnType<typeof vi.fn>;
onToolPayload: ReturnType<typeof vi.fn>;
shouldSkipForUnavailable: ReturnType<typeof vi.fn>;
}>,
);

vi.mock('openclaw/plugin-sdk/channel-runtime', () => ({
createReplyPrefixContext: () => ({
responsePrefix: '',
Expand Down Expand Up @@ -76,24 +93,43 @@ vi.mock('../src/card/card-error', () => ({
isCardTableLimitError: () => false,
}));
vi.mock('../src/card/reply-mode', () => ({
resolveReplyMode: () => 'static',
resolveReplyMode: () => replyModeState.mode,
expandAutoMode: ({ mode }: { mode: string }) => mode,
shouldUseCard: () => false,
}));
vi.mock('../src/card/streaming-card-controller', () => ({
StreamingCardController: class {},
StreamingCardController: class {
cardMessageId: string | null = 'om_stream_card';
isAborted = false;
isTerminated = false;
ensureCardCreated = vi.fn().mockResolvedValue(undefined);
markMediaDelivered = vi.fn();
onDeliver = vi.fn().mockResolvedValue(undefined);
onIdle = vi.fn().mockResolvedValue(undefined);
onReasoningStream = vi.fn().mockResolvedValue(undefined);
onToolPayload = vi.fn().mockResolvedValue(undefined);
shouldSkipForUnavailable = vi.fn().mockReturnValue(false);

constructor() {
streamingControllerInstances.push(this);
}
},
}));

let terminateReturn = true;
const terminateCalls: Array<{ source: string; err: unknown }> = [];
vi.mock('../src/card/unavailable-guard', () => ({
UnavailableGuard: class {
shouldSkip() { return false; }
shouldSkip() {
return false;
}
terminate(source: string, err?: unknown) {
terminateCalls.push({ source, err });
return terminateReturn;
}
get isTerminated() { return false; }
get isTerminated() {
return false;
}
},
}));

Expand All @@ -115,22 +151,34 @@ interface TestContext {
sentMedia: unknown[];
}

function createDispatcher(options: {
sendMediaImpl?: (payload: unknown) => Promise<void>;
terminateReturn?: boolean;
} = {}): TestContext {
function createDispatcher(
options: {
replyMode?: 'static' | 'streaming';
sendMediaImpl?: (payload: unknown) => Promise<void>;
terminateReturn?: boolean;
} = {},
): TestContext {
const sentText: unknown[] = [];
const sentCards: unknown[] = [];
const sentMedia: unknown[] = [];

// Reset module-level state
replyModeState.mode = options.replyMode ?? 'static';
terminateReturn = options.terminateReturn ?? true;
terminateCalls.length = 0;

mockSendMessageFeishu.mockImplementation(async (payload: unknown) => { sentText.push(payload); });
mockSendMarkdownCardFeishu.mockImplementation(async (payload: unknown) => { sentCards.push(payload); });
mockSendMessageFeishu.mockImplementation(async (payload: unknown) => {
sentText.push(payload);
});
mockSendMarkdownCardFeishu.mockImplementation(async (payload: unknown) => {
sentCards.push(payload);
});

const sendMediaImpl = options.sendMediaImpl ?? (async (payload: unknown) => { sentMedia.push(payload); });
const sendMediaImpl =
options.sendMediaImpl ??
(async (payload: unknown) => {
sentMedia.push(payload);
});
mockSendMediaLark.mockImplementation(sendMediaImpl);

const result = createFeishuReplyDispatcher({
Expand Down Expand Up @@ -166,6 +214,8 @@ function createDispatcher(options: {
beforeEach(() => {
vi.clearAllMocks();
terminateCalls.length = 0;
streamingControllerInstances.length = 0;
replyModeState.mode = 'static';
});

describe('reply-dispatcher media delivery', () => {
Expand Down Expand Up @@ -196,10 +246,42 @@ describe('reply-dispatcher media delivery', () => {
]);
});

it('streaming mixed payload routes text to the active card and still sends media', async () => {
const ctx = createDispatcher({ replyMode: 'streaming' });

await ctx.dispatcher.deliver({
text: 'created image',
mediaUrl: 'https://example.com/image.png',
});

const controller = streamingControllerInstances[0];
expect(controller.ensureCardCreated).toHaveBeenCalledTimes(1);
expect(controller.onDeliver).toHaveBeenCalledWith(expect.objectContaining({ text: 'created image' }));
expect(ctx.sentText).toHaveLength(0);
expect(ctx.sentCards).toHaveLength(0);
expect(ctx.sentMedia).toHaveLength(1);
expect((ctx.sentMedia[0] as { mediaUrl: string }).mediaUrl).toBe('https://example.com/image.png');
});

it('streaming media-only payload marks delivered media as a visible result', async () => {
const ctx = createDispatcher({ replyMode: 'streaming' });

await ctx.dispatcher.deliver({
text: '',
mediaUrl: 'https://example.com/image.png',
});

const controller = streamingControllerInstances[0];
expect(ctx.sentMedia).toHaveLength(1);
expect(controller.markMediaDelivered).toHaveBeenCalledTimes(1);
});

it('failed media send triggers staticGuard terminate', async () => {
const mediaError = new Error('bot removed from chat');
const ctx = createDispatcher({
sendMediaImpl: async () => { throw mediaError; },
sendMediaImpl: async () => {
throw mediaError;
},
terminateReturn: true,
});

Expand Down