Skip to content

Commit ece515b

Browse files
committed
testing 2
1 parent 98c22e4 commit ece515b

File tree

7 files changed

+114
-126
lines changed

7 files changed

+114
-126
lines changed

agents/src/llm/llm.ts

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
6868
#llm: LLM;
6969
#chatCtx: ChatContext;
7070
#fncCtx?: FunctionContext;
71-
#outputReadable: ReadableStream<ChatChunk>;
71+
#reader: ReadableStreamDefaultReader<ChatChunk>;
7272

7373
constructor(llm: LLM, chatCtx: ChatContext, fncCtx?: FunctionContext) {
7474
this.#llm = llm;
7575
this.#chatCtx = chatCtx;
7676
this.#fncCtx = fncCtx;
7777
const [r1, r2] = this.output.readable.tee();
78-
this.#outputReadable = r1;
78+
this.#reader = r1.getReader();
7979
this.monitorMetrics(r2);
8080
}
8181

@@ -140,20 +140,16 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
140140
}
141141

142142
async next(): Promise<IteratorResult<ChatChunk>> {
143-
return this.#outputReadable
144-
.getReader()
145-
.read()
146-
.then(({ value }) => {
147-
if (value) {
148-
return { value, done: false };
149-
} else {
150-
return { value: undefined, done: true };
151-
}
152-
});
143+
return this.#reader.read().then(({ value }) => {
144+
if (value) {
145+
return { value, done: false };
146+
} else {
147+
return { value: undefined, done: true };
148+
}
149+
});
153150
}
154151

155152
close() {
156-
this.output.writable.close();
157153
this.closed = true;
158154
}
159155

agents/src/pipeline/pipeline_agent.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
255255
#agentPublication?: LocalTrackPublication;
256256
#lastFinalTranscriptTime?: number;
257257
#lastSpeechTime?: number;
258+
#writer: WritableStreamDefaultWriter<SpeechHandle | typeof VoicePipelineAgent.FLUSH_SENTINEL>;
258259

259260
constructor(
260261
/** Voice Activity Detection instance. */
@@ -289,6 +290,8 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
289290
this.#validateReplyIfPossible.bind(this),
290291
this.#opts.minEndpointingDelay,
291292
);
293+
294+
this.#writer = this.#speechQueue.writable.getWriter();
292295
}
293296

294297
get fncCtx(): FunctionContext | undefined {
@@ -924,9 +927,8 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
924927
}
925928

926929
#addSpeechForPlayout(handle: SpeechHandle) {
927-
const writer = this.#speechQueue.writable.getWriter();
928-
writer.write(handle);
929-
writer.write(VoicePipelineAgent.FLUSH_SENTINEL);
930+
this.#writer.write(handle);
931+
this.#writer.write(VoicePipelineAgent.FLUSH_SENTINEL);
930932
this.#speechQueueOpen.resolve();
931933
}
932934

agents/src/stt/stt.ts

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,14 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
147147
protected closed = false;
148148
protected inputClosed = false;
149149
#stt: STT;
150-
#outputReadable: ReadableStream<SpeechEvent>;
150+
#reader: ReadableStreamDefaultReader<SpeechEvent>;
151151
#writer: WritableStreamDefaultWriter<AudioFrame | typeof SpeechStream.FLUSH_SENTINEL>;
152152

153153
constructor(stt: STT) {
154154
this.#stt = stt;
155155
this.#writer = this.input.writable.getWriter();
156156
const [r1, r2] = this.output.readable.tee();
157-
this.#outputReadable = r1;
157+
this.#reader = r1.getReader();
158158
this.monitorMetrics(r2);
159159
}
160160

@@ -211,22 +211,18 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
211211
}
212212

213213
async next(): Promise<IteratorResult<SpeechEvent>> {
214-
return this.#outputReadable
215-
.getReader()
216-
.read()
217-
.then(({ value }) => {
218-
if (value) {
219-
return { value, done: false };
220-
} else {
221-
return { value: undefined, done: true };
222-
}
223-
});
214+
return this.#reader.read().then(({ value }) => {
215+
if (value) {
216+
return { value, done: false };
217+
} else {
218+
return { value: undefined, done: true };
219+
}
220+
});
224221
}
225222

226223
/** Close both the input and output of the STT stream */
227224
close() {
228225
this.input.writable.close();
229-
this.output.writable.close();
230226
this.closed = true;
231227
this.inputClosed = true;
232228
}

agents/src/tokenize/token_stream.ts

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
1818
#inBuf = '';
1919
#outBuf = '';
2020
#currentSegmentId: string;
21+
#writer: WritableStreamDefaultWriter<TokenData>;
22+
#reader: ReadableStreamDefaultReader<TokenData>;
2123

2224
constructor(func: TokenizeFunc, minTokenLength: number, minContextLength: number) {
2325
this.#func = func;
2426
this.#minTokenLength = minTokenLength;
2527
this.#minContextLength = minContextLength;
28+
this.#reader = this.queue.readable.getReader();
29+
this.#writer = this.queue.writable.getWriter();
2630

2731
this.#currentSegmentId = randomUUID();
2832
}
@@ -33,8 +37,6 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
3337
throw new Error('Stream is closed');
3438
}
3539

36-
const writer = this.queue.writable.getWriter();
37-
3840
this.#inBuf += text;
3941
if (this.#inBuf.length < this.#minContextLength) return;
4042

@@ -52,7 +54,7 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
5254

5355
this.#outBuf += tokText;
5456
if (this.#outBuf.length >= this.#minTokenLength) {
55-
writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
57+
this.#writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
5658
this.#outBuf = '';
5759
}
5860

@@ -72,8 +74,6 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
7274
throw new Error('Stream is closed');
7375
}
7476

75-
const writer = this.queue.writable.getWriter();
76-
7777
if (this.#inBuf || this.#outBuf) {
7878
const tokens = this.#func(this.#inBuf);
7979
if (tokens) {
@@ -87,7 +87,7 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
8787
}
8888

8989
if (this.#outBuf) {
90-
writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
90+
this.#writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
9191
}
9292

9393
this.#currentSegmentId = randomUUID();
@@ -107,22 +107,21 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
107107
}
108108

109109
async next(): Promise<IteratorResult<TokenData>> {
110-
return this.queue.readable
111-
.getReader()
112-
.read()
113-
.then(({ value }) => {
114-
if (value) {
115-
return { value, done: false };
116-
} else {
117-
return { value: undefined, done: true };
118-
}
119-
});
110+
return this.#reader.read().then(({ value }) => {
111+
if (value) {
112+
return { value, done: false };
113+
} else {
114+
return { value: undefined, done: true };
115+
}
116+
});
120117
}
121118

122119
/** Close both the input and output of the token stream */
123120
close() {
124-
this.queue.writable.close();
125-
this.closed = true;
121+
if (!this.closed) {
122+
this.#writer.close();
123+
this.closed = true;
124+
}
126125
}
127126

128127
[Symbol.asyncIterator](): BufferedTokenStream {

agents/src/tokenize/tokenizer.ts

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,30 @@ export abstract class SentenceStream {
3131
>();
3232
protected output = new TransformStream<TokenData, TokenData>();
3333
#closed = false;
34+
#inputClosed = false;
35+
#reader = this.output.readable.getReader();
36+
#writer = this.input.writable.getWriter();
3437

3538
get closed(): boolean {
3639
return this.#closed;
3740
}
3841

3942
/** Push a string of text to the tokenizer */
4043
pushText(text: string) {
41-
// if (this.input.closed) {
42-
// throw new Error('Input is closed');
43-
// }
44+
if (this.#inputClosed) {
45+
throw new Error('Input is closed');
46+
}
4447
if (this.#closed) {
4548
throw new Error('Stream is closed');
4649
}
47-
this.input.writable.getWriter().write(text);
50+
this.#writer.write(text);
4851
}
4952

5053
/** Flush the tokenizer, causing it to process all pending text */
5154
flush() {
52-
// if (this.input.closed) {
53-
// throw new Error('Input is closed');
54-
// }
55+
if (this.#inputClosed) {
56+
throw new Error('Input is closed');
57+
}
5558
if (this.#closed) {
5659
throw new Error('Stream is closed');
5760
}
@@ -60,32 +63,31 @@ export abstract class SentenceStream {
6063

6164
/** Mark the input as ended and forbid additional pushes */
6265
endInput() {
63-
// if (this.input.closed) {
64-
// throw new Error('Input is closed');
65-
// }
66+
if (this.#inputClosed) {
67+
throw new Error('Input is closed');
68+
}
6669
if (this.#closed) {
6770
throw new Error('Stream is closed');
6871
}
69-
this.input.writable.close();
72+
this.#writer.close();
73+
this.#inputClosed = true;
7074
}
7175

7276
async next(): Promise<IteratorResult<TokenData>> {
73-
return this.output.readable
74-
.getReader()
75-
.read()
76-
.then(({ value }) => {
77-
if (value) {
78-
return { value, done: false };
79-
} else {
80-
return { value: undefined, done: true };
81-
}
82-
});
77+
return this.#reader.read().then(({ value }) => {
78+
if (value) {
79+
return { value, done: false };
80+
} else {
81+
return { value: undefined, done: true };
82+
}
83+
});
8384
}
8485

8586
/** Close both the input and output of the tokenizer stream */
8687
close() {
87-
this.input.writable.close();
88-
this.output.writable.close();
88+
if (!this.#inputClosed) {
89+
this.endInput();
90+
}
8991
this.#closed = true;
9092
}
9193

@@ -110,6 +112,9 @@ export abstract class WordStream {
110112
string | typeof WordStream.FLUSH_SENTINEL
111113
>();
112114
protected output = new TransformStream<TokenData, TokenData>();
115+
#writer = this.input.writable.getWriter();
116+
#reader = this.output.readable.getReader();
117+
#inputClosed = false;
113118
#closed = false;
114119

115120
get closed(): boolean {
@@ -118,54 +123,51 @@ export abstract class WordStream {
118123

119124
/** Push a string of text to the tokenizer */
120125
pushText(text: string) {
121-
// if (this.input.closed) {
122-
// throw new Error('Input is closed');
123-
// }
126+
if (this.#inputClosed) {
127+
throw new Error('Input is closed');
128+
}
124129
if (this.#closed) {
125130
throw new Error('Stream is closed');
126131
}
127-
this.input.writable.getWriter().write(text);
132+
this.#writer.write(text);
128133
}
129134

130135
/** Flush the tokenizer, causing it to process all pending text */
131136
flush() {
132-
// if (this.input.closed) {
133-
// throw new Error('Input is closed');
134-
// }
137+
if (this.#inputClosed) {
138+
throw new Error('Input is closed');
139+
}
135140
if (this.#closed) {
136141
throw new Error('Stream is closed');
137142
}
138-
this.input.writable.getWriter().write(WordStream.FLUSH_SENTINEL);
143+
this.#writer.write(WordStream.FLUSH_SENTINEL);
139144
}
140145

141146
/** Mark the input as ended and forbid additional pushes */
142147
endInput() {
143-
// if (this.input.closed) {
144-
// throw new Error('Input is closed');
145-
// }
148+
if (this.#inputClosed) {
149+
throw new Error('Input is closed');
150+
}
146151
if (this.#closed) {
147152
throw new Error('Stream is closed');
148153
}
149-
this.input.writable.close();
154+
this.#inputClosed = true;
150155
}
151156

152157
async next(): Promise<IteratorResult<TokenData>> {
153-
return this.output.readable
154-
.getReader()
155-
.read()
156-
.then(({ value }) => {
157-
if (value) {
158-
return { value, done: false };
159-
} else {
160-
return { value: undefined, done: true };
161-
}
162-
});
158+
return this.#reader.read().then(({ value }) => {
159+
if (value) {
160+
return { value, done: false };
161+
} else {
162+
return { value: undefined, done: true };
163+
}
164+
});
163165
}
164166

165167
/** Close both the input and output of the tokenizer stream */
166168
close() {
167-
this.input.writable.close();
168-
this.output.writable.close();
169+
this.endInput();
170+
this.#writer.close();
169171
this.#closed = true;
170172
}
171173

0 commit comments

Comments
 (0)