Skip to content

Commit a0c49c9

Browse files
authored
Publish transcriptions additionally via text stream APIs (#348)
1 parent 77b69d1 commit a0c49c9

File tree

4 files changed

+108
-46
lines changed

4 files changed

+108
-46
lines changed

.changeset/heavy-cats-unite.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@livekit/agents": patch
3+
---
4+
5+
Publish transcriptions additionally via text stream APIs

agents/src/constants.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
export const ATTRIBUTE_TRANSCRIPTION_TRACK_ID = 'lk.transcribed_track_id';
5+
export const ATTRIBUTE_TRANSCRIPTION_FINAL = 'lk.transcription_final';
6+
export const TOPIC_TRANSCRIPTION = 'lk.transcription';
7+
export const TOPIC_CHAT = 'lk.chat';

agents/src/multimodal/multimodal_agent.ts

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ import {
2020
} from '@livekit/rtc-node';
2121
import { EventEmitter } from 'node:events';
2222
import { AudioByteStream } from '../audio.js';
23+
import {
24+
ATTRIBUTE_TRANSCRIPTION_FINAL,
25+
ATTRIBUTE_TRANSCRIPTION_TRACK_ID,
26+
TOPIC_TRANSCRIPTION,
27+
} from '../constants.js';
2328
import * as llm from '../llm/index.js';
2429
import { log } from '../log.js';
2530
import type { MultimodalLLMMetrics } from '../metrics/base.js';
@@ -251,8 +256,8 @@ export class MultimodalAgent extends EventEmitter {
251256
if (message.contentType === 'text') return;
252257

253258
const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions);
254-
synchronizer.on('textUpdated', (text) => {
255-
this.#publishTranscription(
259+
synchronizer.on('textUpdated', async (text) => {
260+
await this.#publishTranscription(
256261
this.room!.localParticipant!.identity!,
257262
this.#getLocalTrackSid()!,
258263
text.text,
@@ -302,25 +307,31 @@ export class MultimodalAgent extends EventEmitter {
302307
});
303308

304309
// eslint-disable-next-line @typescript-eslint/no-explicit-any
305-
this.#session.on('input_speech_committed', (ev: any) => {
310+
this.#session.on('input_speech_committed', async (ev: any) => {
306311
// openai.realtime.InputSpeechCommittedEvent
307312
const participantIdentity = this.linkedParticipant?.identity;
308313
const trackSid = this.subscribedTrack?.sid;
309314
if (participantIdentity && trackSid) {
310-
this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId);
315+
await this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId);
311316
} else {
312317
this.#logger.error('Participant or track not set');
313318
}
314319
});
315320

316321
// eslint-disable-next-line @typescript-eslint/no-explicit-any
317-
this.#session.on('input_speech_transcription_completed', (ev: any) => {
322+
this.#session.on('input_speech_transcription_completed', async (ev: any) => {
318323
// openai.realtime.InputSpeechTranscriptionCompletedEvent
319324
const transcription = ev.transcript;
320325
const participantIdentity = this.linkedParticipant?.identity;
321326
const trackSid = this.subscribedTrack?.sid;
322327
if (participantIdentity && trackSid) {
323-
this.#publishTranscription(participantIdentity, trackSid, transcription, true, ev.itemId);
328+
await this.#publishTranscription(
329+
participantIdentity,
330+
trackSid,
331+
transcription,
332+
true,
333+
ev.itemId,
334+
);
324335
} else {
325336
this.#logger.error('Participant or track not set');
326337
}
@@ -332,7 +343,7 @@ export class MultimodalAgent extends EventEmitter {
332343
this.#logger.child({ transcription }).debug('committed user speech');
333344
});
334345

335-
this.#session.on('input_speech_started', (ev: any) => {
346+
this.#session.on('input_speech_started', async (ev: any) => {
336347
this.emit('user_started_speaking');
337348
if (this.#playingHandle && !this.#playingHandle.done) {
338349
this.#playingHandle.interrupt();
@@ -349,7 +360,7 @@ export class MultimodalAgent extends EventEmitter {
349360
const participantIdentity = this.linkedParticipant?.identity;
350361
const trackSid = this.subscribedTrack?.sid;
351362
if (participantIdentity && trackSid) {
352-
this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId);
363+
await this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId);
353364
}
354365
});
355366

@@ -475,13 +486,13 @@ export class MultimodalAgent extends EventEmitter {
475486
return this.#localTrackSid;
476487
}
477488

478-
#publishTranscription(
489+
async #publishTranscription(
479490
participantIdentity: string,
480491
trackSid: string,
481492
text: string,
482493
isFinal: boolean,
483494
id: string,
484-
): void {
495+
): Promise<void> {
485496
this.#logger.debug(
486497
`Publishing transcription ${participantIdentity} ${trackSid} ${text} ${isFinal} ${id}`,
487498
);
@@ -504,6 +515,17 @@ export class MultimodalAgent extends EventEmitter {
504515
},
505516
],
506517
});
518+
519+
const stream = await this.room.localParticipant.streamText({
520+
topic: TOPIC_TRANSCRIPTION,
521+
senderIdentity: participantIdentity,
522+
attributes: {
523+
[ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: trackSid,
524+
[ATTRIBUTE_TRANSCRIPTION_FINAL]: isFinal.toString(),
525+
},
526+
});
527+
await stream.write(text);
528+
await stream.close();
507529
}
508530

509531
#updateState() {

agents/src/pipeline/pipeline_agent.ts

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ import {
1717
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
1818
import { randomUUID } from 'node:crypto';
1919
import EventEmitter from 'node:events';
20+
import {
21+
ATTRIBUTE_TRANSCRIPTION_FINAL,
22+
ATTRIBUTE_TRANSCRIPTION_TRACK_ID,
23+
TOPIC_TRANSCRIPTION,
24+
} from '../constants.js';
2025
import type {
2126
CallableFunctionResult,
2227
FunctionCallInfo,
@@ -518,28 +523,21 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
518523
this.emit(VPAEvent.USER_STOPPED_SPEAKING);
519524
this.#deferredValidation.onHumanEndOfSpeech(event);
520525
});
521-
this.#humanInput.on(HumanInputEvent.INTERIM_TRANSCRIPT, (event) => {
526+
this.#humanInput.on(HumanInputEvent.INTERIM_TRANSCRIPT, async (event) => {
522527
if (!this.#transcriptionId) {
523528
this.#transcriptionId = randomUUID();
524529
}
525530
this.#transcribedInterimText = event.alternatives![0].text;
526531

527-
this.#room!.localParticipant!.publishTranscription({
528-
participantIdentity: this.#humanInput!.participant.identity,
529-
trackSid: this.#humanInput!.subscribedTrack!.sid!,
530-
segments: [
531-
{
532-
text: this.#transcribedInterimText,
533-
id: this.#transcriptionId,
534-
final: true,
535-
startTime: BigInt(0),
536-
endTime: BigInt(0),
537-
language: '',
538-
},
539-
],
540-
});
532+
await this.#publishTranscription(
533+
this.#humanInput!.participant.identity,
534+
this.#humanInput!.subscribedTrack!.sid!,
535+
this.#transcribedInterimText,
536+
false,
537+
this.#transcriptionId,
538+
);
541539
});
542-
this.#humanInput.on(HumanInputEvent.FINAL_TRANSCRIPT, (event) => {
540+
this.#humanInput.on(HumanInputEvent.FINAL_TRANSCRIPT, async (event) => {
543541
const newTranscript = event.alternatives![0].text;
544542
if (!newTranscript) return;
545543

@@ -550,20 +548,14 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
550548
this.#lastFinalTranscriptTime = Date.now();
551549
this.transcribedText += (this.transcribedText ? ' ' : '') + newTranscript;
552550

553-
this.#room!.localParticipant!.publishTranscription({
554-
participantIdentity: this.#humanInput!.participant.identity,
555-
trackSid: this.#humanInput!.subscribedTrack!.sid!,
556-
segments: [
557-
{
558-
text: this.transcribedText,
559-
id: this.#transcriptionId,
560-
final: true,
561-
startTime: BigInt(0),
562-
endTime: BigInt(0),
563-
language: '',
564-
},
565-
],
566-
});
551+
await this.#publishTranscription(
552+
this.#humanInput!.participant.identity,
553+
this.#humanInput!.subscribedTrack!.sid!,
554+
this.transcribedText,
555+
true,
556+
this.#transcriptionId,
557+
);
558+
567559
this.#transcriptionId = undefined;
568560

569561
if (
@@ -894,18 +886,54 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
894886
handle.setDone();
895887
}
896888

889+
async #publishTranscription(
890+
participantIdentity: string,
891+
trackSid: string,
892+
text: string,
893+
isFinal: boolean,
894+
id: string,
895+
) {
896+
this.#room!.localParticipant!.publishTranscription({
897+
participantIdentity: participantIdentity,
898+
trackSid: trackSid,
899+
segments: [
900+
{
901+
text: text,
902+
final: isFinal,
903+
id: id,
904+
startTime: BigInt(0),
905+
endTime: BigInt(0),
906+
language: '',
907+
},
908+
],
909+
});
910+
const stream = await this.#room!.localParticipant!.streamText({
911+
senderIdentity: participantIdentity,
912+
topic: TOPIC_TRANSCRIPTION,
913+
attributes: {
914+
[ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: trackSid,
915+
[ATTRIBUTE_TRANSCRIPTION_FINAL]: isFinal.toString(),
916+
},
917+
});
918+
await stream.write(text);
919+
await stream.close();
920+
}
921+
897922
#synthesizeAgentSpeech(
898923
speechId: string,
899924
source: string | LLMStream | AsyncIterable<string>,
900925
): SynthesisHandle {
901926
const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions);
902-
synchronizer.on('textUpdated', (text) => {
927+
// TODO: where possible we would want to use deltas instead of full text segments, esp for LLM streams over the streamText API
928+
synchronizer.on('textUpdated', async (text) => {
903929
this.#agentTranscribedText = text.text;
904-
this.#room!.localParticipant!.publishTranscription({
905-
participantIdentity: this.#room!.localParticipant!.identity,
906-
trackSid: this.#agentPublication!.sid!,
907-
segments: [text],
908-
});
930+
await this.#publishTranscription(
931+
this.#room!.localParticipant!.identity!,
932+
this.#agentPublication?.sid ?? '',
933+
text.text,
934+
text.final,
935+
text.id,
936+
);
909937
});
910938

911939
if (!this.#agentOutput) {

0 commit comments

Comments
 (0)