1
1
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
2
2
//
3
3
// SPDX-License-Identifier: Apache-2.0
4
- import { type AudioBuffer , AudioByteStream , AudioEnergyFilter , log , stt } from '@livekit/agents' ;
4
+ import {
5
+ type AudioBuffer ,
6
+ AudioByteStream ,
7
+ AudioEnergyFilter ,
8
+ Future ,
9
+ log ,
10
+ stt ,
11
+ } from '@livekit/agents' ;
5
12
import type { AudioFrame } from '@livekit/rtc-node' ;
6
13
import { type RawData , WebSocket } from 'ws' ;
7
14
import type { STTLanguages , STTModels } from './models.js' ;
@@ -98,7 +105,11 @@ export class STT extends stt.STT {
98
105
throw new Error ( 'Recognize is not supported on Deepgram STT' ) ;
99
106
}
100
107
101
- stream ( ) : stt . SpeechStream {
108
+ updateOptions ( opts : Partial < STTOptions > ) {
109
+ this . #opts = { ...this . #opts, ...opts } ;
110
+ }
111
+
112
+ stream ( ) : SpeechStream {
102
113
return new SpeechStream ( this , this . #opts) ;
103
114
}
104
115
}
@@ -108,6 +119,7 @@ export class SpeechStream extends stt.SpeechStream {
108
119
#audioEnergyFilter: AudioEnergyFilter ;
109
120
#logger = log ( ) ;
110
121
#speaking = false ;
122
+ #resetWS = new Future ( ) ;
111
123
label = 'deepgram.SpeechStream' ;
112
124
113
125
constructor ( stt : STT , opts : STTOptions ) {
@@ -184,7 +196,13 @@ export class SpeechStream extends stt.SpeechStream {
184
196
this . closed = true ;
185
197
}
186
198
199
+ updateOptions ( opts : Partial < STTOptions > ) {
200
+ this . #opts = { ...this . #opts, ...opts } ;
201
+ this . #resetWS. resolve ( ) ;
202
+ }
203
+
187
204
async #runWS( ws : WebSocket ) {
205
+ this . #resetWS = new Future ( ) ;
188
206
let closing = false ;
189
207
190
208
const keepalive = setInterval ( ( ) => {
@@ -238,7 +256,7 @@ export class SpeechStream extends stt.SpeechStream {
238
256
) ;
239
257
240
258
const listenTask = async ( ) => {
241
- while ( ! this . closed ) {
259
+ while ( ! this . closed && ! closing ) {
242
260
try {
243
261
await new Promise < RawData > ( ( resolve ) => {
244
262
ws . once ( 'message' , ( data ) => resolve ( data ) ) ;
@@ -312,7 +330,9 @@ export class SpeechStream extends stt.SpeechStream {
312
330
}
313
331
} ;
314
332
315
- await Promise . all ( [ sendTask ( ) , listenTask ( ) , wsMonitor ] ) ;
333
+ await Promise . race ( [ this . #resetWS. await , Promise . all ( [ sendTask ( ) , listenTask ( ) , wsMonitor ] ) ] ) ;
334
+ closing = true ;
335
+ ws . close ( ) ;
316
336
clearInterval ( keepalive ) ;
317
337
}
318
338
}
0 commit comments