diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 213ac457..41c10caa 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -46,6 +46,7 @@ export class STT extends stt.STT { #opts: STTOptions; #logger = log(); label = 'deepgram.STT'; + #streams = new Set(); constructor(opts: Partial = defaultSTTOptions) { super({ @@ -62,25 +63,8 @@ export class STT extends stt.STT { if (this.#opts.detectLanguage) { this.#opts.language = undefined; - } else if ( - this.#opts.language && - !['en-US', 'en'].includes(this.#opts.language) && - [ - 'nova-2-meeting', - 'nova-2-phonecall', - 'nova-2-finance', - 'nova-2-conversationalai', - 'nova-2-voicemail', - 'nova-2-video', - 'nova-2-medical', - 'nova-2-drivethru', - 'nova-2-automotive', - ].includes(this.#opts.model) - ) { - this.#logger.warn( - `${this.#opts.model} does not support language ${this.#opts.language}, falling back to nova-2-general`, - ); - this.#opts.model = 'nova-2-general'; + } else { + this.#opts.model = validateModel(this.#opts.model, this.#opts.language); } } @@ -90,7 +74,29 @@ export class STT extends stt.STT { } stream(): stt.SpeechStream { - return new SpeechStream(this, this.#opts); + const stream = new SpeechStream(this, this.#opts); + this.#streams.add(stream); + return stream; + } + + updateOptions(opts: Partial) { + if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.detectLanguage !== undefined) this.#opts.language = undefined; + if (opts.model !== undefined) this.#opts.model = opts.model; + if (opts.interimResults !== undefined) this.#opts.interimResults = opts.interimResults; + if (opts.punctuate !== undefined) this.#opts.punctuate = opts.punctuate; + if (opts.smartFormat !== undefined) this.#opts.smartFormat = opts.smartFormat; + if (opts.noDelay !== undefined) this.#opts.noDelay = opts.noDelay; + if (opts.endpointing !== undefined) this.#opts.endpointing = opts.endpointing; + if (opts.fillerWords !== undefined) this.#opts.fillerWords = opts.fillerWords; + if (opts.sampleRate !== undefined) this.#opts.sampleRate = opts.sampleRate; + if (opts.numChannels !== undefined) this.#opts.numChannels = opts.numChannels; + if (opts.keywords !== undefined) this.#opts.keywords = opts.keywords; + if (opts.profanityFilter !== undefined) this.#opts.profanityFilter = opts.profanityFilter; + + this.#streams.forEach((stream) => { + stream.updateOptions(opts); + }); } } @@ -99,6 +105,7 @@ export class SpeechStream extends stt.SpeechStream { #audioEnergyFilter: AudioEnergyFilter; #logger = log(); #speaking = false; + #ws: WebSocket; label = 'deepgram.SpeechStream'; constructor(stt: STT, opts: STTOptions) { @@ -106,53 +113,31 @@ export class SpeechStream extends stt.SpeechStream { this.#opts = opts; this.closed = false; this.#audioEnergyFilter = new AudioEnergyFilter(); + this.#ws = this.#connectWs(); this.#run(); } async #run(maxRetry = 32) { let retries = 0; - let ws: WebSocket; while (!this.input.closed) { - const streamURL = new URL(API_BASE_URL_V1); - const params = { - model: this.#opts.model, - punctuate: this.#opts.punctuate, - smart_format: this.#opts.smartFormat, - no_delay: this.#opts.noDelay, - interim_results: this.#opts.interimResults, - encoding: 'linear16', - vad_events: true, - sample_rate: this.#opts.sampleRate, - channels: this.#opts.numChannels, - endpointing: this.#opts.endpointing || false, - filler_words: this.#opts.fillerWords, - keywords: this.#opts.keywords.map((x) => x.join(':')), - profanity_filter: this.#opts.profanityFilter, - language: this.#opts.language, - }; - Object.entries(params).forEach(([k, v]) => { - if (v !== undefined) { - if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { - streamURL.searchParams.append(k, encodeURIComponent(v)); - } else { - v.forEach((x) => streamURL.searchParams.append('keywords', encodeURIComponent(x))); - } - } - }); - - ws = new WebSocket(streamURL, { - headers: { Authorization: `Token ${this.#opts.apiKey}` }, - }); + this.#ws = this.#connectWs(); try { await new Promise((resolve, reject) => { - ws.on('open', resolve); - ws.on('error', (error) => reject(error)); - ws.on('close', (code) => reject(`WebSocket returned ${code}`)); + this.#ws.on('open', resolve); + this.#ws.on('error', (error) => reject(error)); + this.#ws.on('close', (code) => { + if (code === 4000) { + // WebSocket closed to update Deepgram STT options + reject('4000'); + } else { + reject(`WebSocket returned ${code}`); + } + }); }); - await this.#runWS(ws); + await this.#runWS(this.#ws); } catch (e) { if (retries >= maxRetry) { throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); @@ -160,7 +145,6 @@ export class SpeechStream extends stt.SpeechStream { const delay = Math.min(retries * 5, 10); retries++; - this.#logger.warn( `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, ); @@ -172,7 +156,7 @@ export class SpeechStream extends stt.SpeechStream { } async #runWS(ws: WebSocket) { - let closing = false; + let shouldExit = false; const keepalive = setInterval(() => { try { @@ -191,116 +175,184 @@ export class SpeechStream extends stt.SpeechStream { samples100Ms, ); - for await (const data of this.input) { - let frames: AudioFrame[]; - if (data === SpeechStream.FLUSH_SENTINEL) { - frames = stream.flush(); - } else if ( - data.sampleRate === this.#opts.sampleRate || - data.channels === this.#opts.numChannels - ) { - frames = stream.write(data.data.buffer); - } else { - throw new Error(`sample rate or channel count of frame does not match`); - } + try { + for await (const data of this.input) { + if (shouldExit) break; + + let frames: AudioFrame[]; + if (data === SpeechStream.FLUSH_SENTINEL) { + frames = stream.flush(); + } else if ( + data.sampleRate === this.#opts.sampleRate || + data.channels === this.#opts.numChannels + ) { + frames = stream.write(data.data.buffer); + } else { + throw new Error(`sample rate or channel count of frame does not match`); + } - for await (const frame of frames) { - if (this.#audioEnergyFilter.pushFrame(frame)) { - ws.send(frame.data.buffer); + for await (const frame of frames) { + if (shouldExit) break; + if (this.#audioEnergyFilter.pushFrame(frame)) { + ws.send(frame.data.buffer); + } } } - } - closing = true; - ws.send(JSON.stringify({ type: 'CloseStream' })); + if (!shouldExit) { + ws.send(JSON.stringify({ type: 'CloseStream' })); + } + } catch (error) { + if (!shouldExit) throw error; + } }; const listenTask = async () => { - new Promise((_, reject) => - ws.once('close', (code, reason) => { - if (!closing) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - reject(); - } - }), - ); + try { + while (!this.closed) { + if (shouldExit) break; - while (!this.closed) { - try { - await new Promise((resolve) => { + const msg = await new Promise((resolve, reject) => { ws.once('message', (data) => resolve(data)); - }).then((msg) => { - const json = JSON.parse(msg.toString()); - switch (json['type']) { - case 'SpeechStarted': { - // This is a normal case. Deepgram's SpeechStarted events - // are not correlated with speech_final or utterance end. - // It's possible that we receive two in a row without an endpoint - // It's also possible we receive a transcript without a SpeechStarted event. - if (this.#speaking) return; - this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); - break; - } - // see this page: - // https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final - // for more information about the different types of events - case 'Results': { - const isFinal = json['is_final']; - const isEndpoint = json['speech_final']; - - const alternatives = liveTranscriptionToSpeechData(this.#opts.language!, json); - - // If, for some reason, we didn't get a SpeechStarted event but we got - // a transcript with text, we should start speaking. It's rare but has - // been observed. - if (alternatives[0] && alternatives[0].text) { - if (!this.#speaking) { - this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); - } - - if (isFinal) { - this.queue.put({ - type: stt.SpeechEventType.FINAL_TRANSCRIPT, - alternatives: [alternatives[0], ...alternatives.slice(1)], - }); - } else { - this.queue.put({ - type: stt.SpeechEventType.INTERIM_TRANSCRIPT, - alternatives: [alternatives[0], ...alternatives.slice(1)], - }); - } - } + ws.once('close', () => reject(new Error('WebSocket closed'))); + }); - // if we receive an endpoint, only end the speech if - // we either had a SpeechStarted event or we have a seen - // a non-empty transcript (deepgram doesn't have a SpeechEnded event) - if (isEndpoint && this.#speaking) { - this.#speaking = false; - this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH }); + const json = JSON.parse(msg.toString()); + switch (json['type']) { + case 'SpeechStarted': { + // This is a normal case. Deepgram's SpeechStarted events + // are not correlated with speech_final or utterance end. + // It's possible that we receive two in a row without an endpoint + // It's also possible we receive a transcript without a SpeechStarted event. + if (this.#speaking) return; + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + break; + } + // see this page: + // https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final + // for more information about the different types of events + case 'Results': { + const isFinal = json['is_final']; + const isEndpoint = json['speech_final']; + + const alternatives = liveTranscriptionToSpeechData(this.#opts.language!, json); + + // If, for some reason, we didn't get a SpeechStarted event but we got + // a transcript with text, we should start speaking. It's rare but has + // been observed. + if (alternatives[0] && alternatives[0].text) { + if (!this.#speaking) { + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); } - break; - } - case 'Metadata': { - break; + if (isFinal) { + this.queue.put({ + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [alternatives[0], ...alternatives.slice(1)], + }); + } else { + this.queue.put({ + type: stt.SpeechEventType.INTERIM_TRANSCRIPT, + alternatives: [alternatives[0], ...alternatives.slice(1)], + }); + } } - default: { - this.#logger.child({ msg: json }).warn('received unexpected message from Deepgram'); - break; + + // if we receive an endpoint, only end the speech if + // we either had a SpeechStarted event or we have a seen + // a non-empty transcript (deepgram doesn't have a SpeechEnded event) + if (isEndpoint && this.#speaking) { + this.#speaking = false; + this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH }); } + + break; } - }); - } catch (error) { + case 'Metadata': { + break; + } + default: { + this.#logger.child({ msg: json }).warn('received unexpected message from Deepgram'); + break; + } + } + } + } catch (error) { + if (!shouldExit) { this.#logger.child({ error }).warn('unrecoverable error, exiting'); - break; + throw error; } } }; - await Promise.all([sendTask(), listenTask()]); - clearInterval(keepalive); + ws.once('close', (code) => { + if (code === 4000) { + this.#logger.info('WebSocket closed for updating options, restarting tasks'); + shouldExit = true; + } + }); + + try { + await Promise.all([sendTask(), listenTask()]); + } catch (error) { + this.#logger.child({ error }).warn('Error during WebSocket tasks'); + } finally { + clearInterval(keepalive); + } + } + + updateOptions(opts: Partial) { + if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.detectLanguage !== undefined) this.#opts.language = undefined; + if (opts.model !== undefined) this.#opts.model = opts.model; + if (opts.interimResults !== undefined) this.#opts.interimResults = opts.interimResults; + if (opts.punctuate !== undefined) this.#opts.punctuate = opts.punctuate; + if (opts.smartFormat !== undefined) this.#opts.smartFormat = opts.smartFormat; + if (opts.noDelay !== undefined) this.#opts.noDelay = opts.noDelay; + if (opts.endpointing !== undefined) this.#opts.endpointing = opts.endpointing; + if (opts.fillerWords !== undefined) this.#opts.fillerWords = opts.fillerWords; + if (opts.sampleRate !== undefined) this.#opts.sampleRate = opts.sampleRate; + if (opts.numChannels !== undefined) this.#opts.numChannels = opts.numChannels; + if (opts.keywords !== undefined) this.#opts.keywords = opts.keywords; + if (opts.profanityFilter !== undefined) this.#opts.profanityFilter = opts.profanityFilter; + // Custom close code to handle update options + this.#ws.close(4000); + } + + #connectWs(): WebSocket { + const streamURL = new URL(API_BASE_URL_V1); + const params = { + model: this.#opts.model, + punctuate: this.#opts.punctuate, + smart_format: this.#opts.smartFormat, + no_delay: this.#opts.noDelay, + interim_results: this.#opts.interimResults, + encoding: 'linear16', + vad_events: true, + sample_rate: this.#opts.sampleRate, + channels: this.#opts.numChannels, + endpointing: this.#opts.endpointing || false, + filler_words: this.#opts.fillerWords, + keywords: this.#opts.keywords.map((x) => x.join(':')), + profanity_filter: this.#opts.profanityFilter, + language: this.#opts.language, + }; + Object.entries(params).forEach(([k, v]) => { + if (v !== undefined) { + if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { + streamURL.searchParams.append(k, encodeURIComponent(v)); + } else { + v.forEach((x) => streamURL.searchParams.append('keywords', encodeURIComponent(x))); + } + } + }); + + const ws = new WebSocket(streamURL, { + headers: { Authorization: `Token ${this.#opts.apiKey}` }, + }); + return ws; } } @@ -318,3 +370,26 @@ const liveTranscriptionToSpeechData = ( text: alt['transcript'], })); }; + +const validateModel = ( + model: STTModels, + language: STTLanguages | string | undefined, +): STTModels => { + const logger = log(); + const engModels = [ + 'nova-2-meeting', + 'nova-2-phonecall', + 'nova-2-finance', + 'nova-2-conversationalai', + 'nova-2-voicemail', + 'nova-2-video', + 'nova-2-medical', + 'nova-2-drivethru', + 'nova-2-automotive', + ]; + if (language && !['en-US', 'en'].includes(language) && engModels.includes(model)) { + logger.warn(`${model} does not support language ${language}, falling back to nova-2-general`); + return 'nova-2-general'; + } + return model; +}; diff --git a/plugins/openai/src/stt.ts b/plugins/openai/src/stt.ts index 2a335c93..4b218c38 100644 --- a/plugins/openai/src/stt.ts +++ b/plugins/openai/src/stt.ts @@ -138,4 +138,9 @@ export class STT extends stt.STT { stream(): stt.SpeechStream { throw new Error('Streaming is not supported on OpenAI STT'); } + + update_options(model?: WhisperModels | GroqAudioModels, language?: string): void { + this.#opts.model = model || this.#opts.model; + this.#opts.language = language || this.#opts.language; + } }