Skip to content

Commit 98c22e4

Browse files
committed
testing 1
1 parent 8f6060a commit 98c22e4

File tree

4 files changed

+40
-32
lines changed

4 files changed

+40
-32
lines changed

agents/src/pipeline/agent_output.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,10 @@ const stringSynthesisTask = (text: string, handle: SynthesisHandle): Cancellable
148148
if (cancelled || audio === SynthesizeStream.END_OF_STREAM) {
149149
break;
150150
}
151-
writer.write(audio.frame);
151+
await writer.write(audio.frame);
152152
}
153-
writer.write(SynthesisHandle.FLUSH_SENTINEL);
153+
await writer.write(SynthesisHandle.FLUSH_SENTINEL);
154+
writer.releaseLock();
154155

155156
resolve(text);
156157
});
@@ -177,9 +178,10 @@ const streamSynthesisTask = (
177178
if (audio === SynthesizeStream.END_OF_STREAM) {
178179
break;
179180
}
180-
writer.write(audio.frame);
181+
await writer.write(audio.frame);
181182
}
182-
writer.write(SynthesisHandle.FLUSH_SENTINEL);
183+
await writer.write(SynthesisHandle.FLUSH_SENTINEL);
184+
writer.releaseLock();
183185
};
184186
readGeneratedAudio();
185187

agents/src/stt/stt.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import type { AudioFrame } from '@livekit/rtc-node';
55
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
66
import { EventEmitter } from 'node:events';
7-
import type { ReadableStream } from 'node:stream/web';
7+
import type { ReadableStream, WritableStreamDefaultWriter } from 'node:stream/web';
88
import { TransformStream } from 'node:stream/web';
99
import type { STTMetrics } from '../metrics/base.js';
1010
import type { AudioBuffer } from '../utils.js';
@@ -148,12 +148,11 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
148148
protected inputClosed = false;
149149
#stt: STT;
150150
#outputReadable: ReadableStream<SpeechEvent>;
151+
#writer: WritableStreamDefaultWriter<AudioFrame | typeof SpeechStream.FLUSH_SENTINEL>;
151152

152153
constructor(stt: STT) {
153154
this.#stt = stt;
154-
this.output.writable.close().then(() => {
155-
this.inputClosed = true;
156-
});
155+
this.#writer = this.input.writable.getWriter();
157156
const [r1, r2] = this.output.readable.tee();
158157
this.#outputReadable = r1;
159158
this.monitorMetrics(r2);
@@ -185,7 +184,7 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
185184
if (this.closed) {
186185
throw new Error('Stream is closed');
187186
}
188-
this.input.writable.getWriter().write(frame);
187+
this.#writer.write(frame);
189188
}
190189

191190
/** Flush the STT, causing it to process all pending text */
@@ -196,7 +195,7 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
196195
if (this.closed) {
197196
throw new Error('Stream is closed');
198197
}
199-
this.input.writable.getWriter().write(SpeechStream.FLUSH_SENTINEL);
198+
this.#writer.write(SpeechStream.FLUSH_SENTINEL);
200199
}
201200

202201
/** Mark the input as ended and forbid additional pushes */
@@ -207,6 +206,7 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
207206
if (this.closed) {
208207
throw new Error('Stream is closed');
209208
}
209+
this.inputClosed = true;
210210
this.input.writable.close();
211211
}
212212

@@ -228,6 +228,7 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
228228
this.input.writable.close();
229229
this.output.writable.close();
230230
this.closed = true;
231+
this.inputClosed = true;
231232
}
232233

233234
[Symbol.asyncIterator](): SpeechStream {

agents/src/tts/tts.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import type { AudioFrame } from '@livekit/rtc-node';
55
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
66
import { EventEmitter } from 'node:events';
7-
import type { ReadableStream } from 'node:stream/web';
7+
import type { ReadableStream, WritableStreamDefaultWriter } from 'node:stream/web';
88
import { TransformStream } from 'node:stream/web';
99
import type { TTSMetrics } from '../metrics/base.js';
1010
import { mergeFrames } from '../utils.js';
@@ -122,12 +122,14 @@ export abstract class SynthesizeStream
122122
#metricsPendingTexts: string[] = [];
123123
#metricsText = '';
124124
#outputReadable: ReadableStream<SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM>;
125+
#writer: WritableStreamDefaultWriter<string | typeof SynthesizeStream.FLUSH_SENTINEL>;
125126

126127
constructor(tts: TTS) {
127128
this.#tts = tts;
128129
this.output.writable.close().then(() => {
129130
this.inputClosed = true;
130131
});
132+
this.#writer = this.input.writable.getWriter();
131133
const [r1, r2] = this.output.readable.tee();
132134
this.#outputReadable = r1;
133135
this.monitorMetrics(r2);
@@ -187,7 +189,7 @@ export abstract class SynthesizeStream
187189
if (this.closed) {
188190
throw new Error('Stream is closed');
189191
}
190-
this.input.writable.getWriter().write(text);
192+
this.#writer.write(text);
191193
}
192194

193195
/** Flush the TTS, causing it to process all pending text */
@@ -202,7 +204,7 @@ export abstract class SynthesizeStream
202204
if (this.closed) {
203205
throw new Error('Stream is closed');
204206
}
205-
this.input.writable.getWriter().write(SynthesizeStream.FLUSH_SENTINEL);
207+
this.#writer.write(SynthesizeStream.FLUSH_SENTINEL);
206208
}
207209

208210
/** Mark the input as ended and forbid additional pushes */
@@ -213,7 +215,7 @@ export abstract class SynthesizeStream
213215
if (this.closed) {
214216
throw new Error('Stream is closed');
215217
}
216-
this.input.writable.close();
218+
this.#writer.close();
217219
}
218220

219221
async next(): Promise<IteratorResult<SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM>> {
@@ -231,6 +233,7 @@ export abstract class SynthesizeStream
231233

232234
/** Close both the input and output of the TTS stream */
233235
close() {
236+
this.#writer.close();
234237
this.input.writable.close();
235238
this.output.writable.close();
236239
this.closed = true;

agents/src/vad.ts

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
import type { AudioFrame } from '@livekit/rtc-node';
55
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
66
import { EventEmitter } from 'node:events';
7-
import type { ReadableStream } from 'node:stream/web';
7+
import type {
8+
ReadableStream,
9+
ReadableStreamDefaultReader,
10+
WritableStreamDefaultWriter,
11+
} from 'node:stream/web';
812
import { TransformStream } from 'node:stream/web';
913
import type { VADMetrics } from './metrics/base.js';
1014

@@ -87,15 +91,14 @@ export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
8791
protected inputClosed = false;
8892
#vad: VAD;
8993
#lastActivityTime = BigInt(0);
90-
#outputReadable: ReadableStream<VADEvent>;
94+
#writer: WritableStreamDefaultWriter<AudioFrame | typeof VADStream.FLUSH_SENTINEL>;
95+
#reader: ReadableStreamDefaultReader<VADEvent>;
9196

9297
constructor(vad: VAD) {
9398
this.#vad = vad;
94-
this.output.writable.close().then(() => {
95-
this.inputClosed = true;
96-
});
9799
const [r1, r2] = this.output.readable.tee();
98-
this.#outputReadable = r1;
100+
this.#reader = r1.getReader();
101+
this.#writer = this.input.writable.getWriter();
99102
this.monitorMetrics(r2);
100103
}
101104

@@ -137,7 +140,7 @@ export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
137140
if (this.closed) {
138141
throw new Error('Stream is closed');
139142
}
140-
this.input.writable.getWriter().write(frame);
143+
this.#writer.write(frame);
141144
}
142145

143146
flush() {
@@ -147,7 +150,8 @@ export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
147150
if (this.closed) {
148151
throw new Error('Stream is closed');
149152
}
150-
this.input.writable.getWriter().write(VADStream.FLUSH_SENTINEL);
153+
this.inputClosed = true;
154+
this.#writer.write(VADStream.FLUSH_SENTINEL);
151155
}
152156

153157
endInput() {
@@ -157,20 +161,18 @@ export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
157161
if (this.closed) {
158162
throw new Error('Stream is closed');
159163
}
164+
this.inputClosed = true;
160165
this.input.writable.close();
161166
}
162167

163168
async next(): Promise<IteratorResult<VADEvent>> {
164-
return this.#outputReadable
165-
.getReader()
166-
.read()
167-
.then(({ value }) => {
168-
if (value) {
169-
return { value, done: false };
170-
} else {
171-
return { value: undefined, done: true };
172-
}
173-
});
169+
return this.#reader.read().then(({ value }) => {
170+
if (value) {
171+
return { value, done: false };
172+
} else {
173+
return { value: undefined, done: true };
174+
}
175+
});
174176
}
175177

176178
close() {

0 commit comments

Comments
 (0)