From 2c323fca60dd899575513193749180d67d978ad9 Mon Sep 17 00:00:00 2001 From: Hamdan Anwar Sayeed <96612374+s-hamdananwar@users.noreply.github.com> Date: Mon, 6 Jan 2025 20:35:02 +0300 Subject: [PATCH 1/5] feat: added stt update options for oai and deepgram --- plugins/deepgram/src/stt.ts | 187 ++++++++++++++++++++++++++---------- plugins/openai/src/stt.ts | 5 + 2 files changed, 142 insertions(+), 50 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 213ac457..9453c322 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -46,6 +46,7 @@ export class STT extends stt.STT { #opts: STTOptions; #logger = log(); label = 'deepgram.STT'; + #streams = new Set(); constructor(opts: Partial = defaultSTTOptions) { super({ @@ -62,25 +63,8 @@ export class STT extends stt.STT { if (this.#opts.detectLanguage) { this.#opts.language = undefined; - } else if ( - this.#opts.language && - !['en-US', 'en'].includes(this.#opts.language) && - [ - 'nova-2-meeting', - 'nova-2-phonecall', - 'nova-2-finance', - 'nova-2-conversationalai', - 'nova-2-voicemail', - 'nova-2-video', - 'nova-2-medical', - 'nova-2-drivethru', - 'nova-2-automotive', - ].includes(this.#opts.model) - ) { - this.#logger.warn( - `${this.#opts.model} does not support language ${this.#opts.language}, falling back to nova-2-general`, - ); - this.#opts.model = 'nova-2-general'; + } else { + this.#opts.model = validateModel(this.#opts.model, this.#opts.language); } } @@ -90,7 +74,54 @@ export class STT extends stt.STT { } stream(): stt.SpeechStream { - return new SpeechStream(this, this.#opts); + const stream = new SpeechStream(this, this.#opts); + this.#streams.add(stream); + return stream; + } + + updateOptions( + language?: STTLanguages, + model?: STTModels, + interimResults?: boolean, + punctuate?: boolean, + smartFormat?: boolean, + noDelay?: boolean, + endpointing?: number, + fillerWords?: boolean, + sampleRate?: number, + numChannels?: number, + keywords?: [string, number][], + profanityFilter?: boolean, + ) { + if (language !== undefined) this.#opts.language = language; + if (model !== undefined) this.#opts.model = model; + if (interimResults !== undefined) this.#opts.interimResults = interimResults; + if (punctuate !== undefined) this.#opts.punctuate = punctuate; + if (smartFormat !== undefined) this.#opts.smartFormat = smartFormat; + if (noDelay !== undefined) this.#opts.noDelay = noDelay; + if (endpointing !== undefined) this.#opts.endpointing = endpointing; + if (fillerWords !== undefined) this.#opts.fillerWords = fillerWords; + if (sampleRate !== undefined) this.#opts.sampleRate = sampleRate; + if (numChannels !== undefined) this.#opts.numChannels = numChannels; + if (keywords !== undefined) this.#opts.keywords = keywords; + if (profanityFilter !== undefined) this.#opts.profanityFilter = profanityFilter; + + this.#streams.forEach((stream) => { + stream.updateOptions( + language, + model, + interimResults, + punctuate, + smartFormat, + noDelay, + endpointing, + fillerWords, + sampleRate, + numChannels, + keywords, + profanityFilter, + ); + }); } } @@ -114,36 +145,7 @@ export class SpeechStream extends stt.SpeechStream { let retries = 0; let ws: WebSocket; while (!this.input.closed) { - const streamURL = new URL(API_BASE_URL_V1); - const params = { - model: this.#opts.model, - punctuate: this.#opts.punctuate, - smart_format: this.#opts.smartFormat, - no_delay: this.#opts.noDelay, - interim_results: this.#opts.interimResults, - encoding: 'linear16', - vad_events: true, - sample_rate: this.#opts.sampleRate, - channels: this.#opts.numChannels, - endpointing: this.#opts.endpointing || false, - filler_words: this.#opts.fillerWords, - keywords: this.#opts.keywords.map((x) => x.join(':')), - profanity_filter: this.#opts.profanityFilter, - language: this.#opts.language, - }; - Object.entries(params).forEach(([k, v]) => { - if (v !== undefined) { - if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { - streamURL.searchParams.append(k, encodeURIComponent(v)); - } else { - v.forEach((x) => streamURL.searchParams.append('keywords', encodeURIComponent(x))); - } - } - }); - - ws = new WebSocket(streamURL, { - headers: { Authorization: `Token ${this.#opts.apiKey}` }, - }); + ws = this.#connectWs(); try { await new Promise((resolve, reject) => { @@ -302,6 +304,68 @@ export class SpeechStream extends stt.SpeechStream { await Promise.all([sendTask(), listenTask()]); clearInterval(keepalive); } + + updateOptions( + language?: STTLanguages, + model?: STTModels, + interimResults?: boolean, + punctuate?: boolean, + smartFormat?: boolean, + noDelay?: boolean, + endpointing?: number, + fillerWords?: boolean, + sampleRate?: number, + numChannels?: number, + keywords?: [string, number][], + profanityFilter?: boolean, + ) { + if (language !== undefined) this.#opts.language = language; + if (model !== undefined) this.#opts.model = model; + if (interimResults !== undefined) this.#opts.interimResults = interimResults; + if (punctuate !== undefined) this.#opts.punctuate = punctuate; + if (smartFormat !== undefined) this.#opts.smartFormat = smartFormat; + if (noDelay !== undefined) this.#opts.noDelay = noDelay; + if (endpointing !== undefined) this.#opts.endpointing = endpointing; + if (fillerWords !== undefined) this.#opts.fillerWords = fillerWords; + if (sampleRate !== undefined) this.#opts.sampleRate = sampleRate; + if (numChannels !== undefined) this.#opts.numChannels = numChannels; + if (keywords !== undefined) this.#opts.keywords = keywords; + if (profanityFilter !== undefined) this.#opts.profanityFilter = profanityFilter; + } + + #connectWs(): WebSocket { + const streamURL = new URL(API_BASE_URL_V1); + const params = { + model: this.#opts.model, + punctuate: this.#opts.punctuate, + smart_format: this.#opts.smartFormat, + no_delay: this.#opts.noDelay, + interim_results: this.#opts.interimResults, + encoding: 'linear16', + vad_events: true, + sample_rate: this.#opts.sampleRate, + channels: this.#opts.numChannels, + endpointing: this.#opts.endpointing || false, + filler_words: this.#opts.fillerWords, + keywords: this.#opts.keywords.map((x) => x.join(':')), + profanity_filter: this.#opts.profanityFilter, + language: this.#opts.language, + }; + Object.entries(params).forEach(([k, v]) => { + if (v !== undefined) { + if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { + streamURL.searchParams.append(k, encodeURIComponent(v)); + } else { + v.forEach((x) => streamURL.searchParams.append('keywords', encodeURIComponent(x))); + } + } + }); + + const ws = new WebSocket(streamURL, { + headers: { Authorization: `Token ${this.#opts.apiKey}` }, + }); + return ws; + } } const liveTranscriptionToSpeechData = ( @@ -318,3 +382,26 @@ const liveTranscriptionToSpeechData = ( text: alt['transcript'], })); }; + +const validateModel = ( + model: STTModels, + language: STTLanguages | string | undefined, +): STTModels => { + const logger = log(); + const engModels = [ + 'nova-2-meeting', + 'nova-2-phonecall', + 'nova-2-finance', + 'nova-2-conversationalai', + 'nova-2-voicemail', + 'nova-2-video', + 'nova-2-medical', + 'nova-2-drivethru', + 'nova-2-automotive', + ]; + if (language && !['en-US', 'en'].includes(language) && engModels.includes(model)) { + logger.warn(`${model} does not support language ${language}, falling back to nova-2-general`); + return 'nova-2-general'; + } + return model; +}; diff --git a/plugins/openai/src/stt.ts b/plugins/openai/src/stt.ts index 2a335c93..4b218c38 100644 --- a/plugins/openai/src/stt.ts +++ b/plugins/openai/src/stt.ts @@ -138,4 +138,9 @@ export class STT extends stt.STT { stream(): stt.SpeechStream { throw new Error('Streaming is not supported on OpenAI STT'); } + + update_options(model?: WhisperModels | GroqAudioModels, language?: string): void { + this.#opts.model = model || this.#opts.model; + this.#opts.language = language || this.#opts.language; + } } From e53974634b240aeecceec6bfabb6c30f73735cbe Mon Sep 17 00:00:00 2001 From: Hamdan Anwar Sayeed <96612374+s-hamdananwar@users.noreply.github.com> Date: Fri, 17 Jan 2025 02:24:29 +0300 Subject: [PATCH 2/5] deepgram stt update options close ws --- plugins/deepgram/src/stt.ts | 47 ++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 9453c322..28565838 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -130,6 +130,7 @@ export class SpeechStream extends stt.SpeechStream { #audioEnergyFilter: AudioEnergyFilter; #logger = log(); #speaking = false; + #ws: WebSocket; label = 'deepgram.SpeechStream'; constructor(stt: STT, opts: STTOptions) { @@ -137,36 +138,47 @@ export class SpeechStream extends stt.SpeechStream { this.#opts = opts; this.closed = false; this.#audioEnergyFilter = new AudioEnergyFilter(); + this.#ws = new WebSocket(null); this.#run(); } async #run(maxRetry = 32) { let retries = 0; - let ws: WebSocket; + // let ws: WebSocket; while (!this.input.closed) { - ws = this.#connectWs(); + this.#ws = this.#connectWs(); try { await new Promise((resolve, reject) => { - ws.on('open', resolve); - ws.on('error', (error) => reject(error)); - ws.on('close', (code) => reject(`WebSocket returned ${code}`)); + this.#ws.on('open', resolve); + this.#ws.on('error', (error) => reject(error)); + this.#ws.on('close', (code) => { + if (code === 4000) { + // WebSocket closed to update Deepgram STT options + reject('4000'); + } else { + reject(`WebSocket returned ${code}`); + } + }); }); - await this.#runWS(ws); + await this.#runWS(this.#ws); } catch (e) { - if (retries >= maxRetry) { - throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); - } - - const delay = Math.min(retries * 5, 10); - retries++; + if (e === '4000') { + this.#logger.info('updating Deepgram STT options'); + } else { + if (retries >= maxRetry) { + throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); + } - this.#logger.warn( - `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + const delay = Math.min(retries * 5, 10); + retries++; + this.#logger.warn( + `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, + ); + await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + } } } @@ -319,6 +331,7 @@ export class SpeechStream extends stt.SpeechStream { keywords?: [string, number][], profanityFilter?: boolean, ) { + console.log('called update', punctuate); if (language !== undefined) this.#opts.language = language; if (model !== undefined) this.#opts.model = model; if (interimResults !== undefined) this.#opts.interimResults = interimResults; @@ -331,6 +344,8 @@ export class SpeechStream extends stt.SpeechStream { if (numChannels !== undefined) this.#opts.numChannels = numChannels; if (keywords !== undefined) this.#opts.keywords = keywords; if (profanityFilter !== undefined) this.#opts.profanityFilter = profanityFilter; + // Custom close code to handle update options + this.#ws.close(4000); } #connectWs(): WebSocket { From 6fb8243b51e4adb8331f9085d4e10609eb877215 Mon Sep 17 00:00:00 2001 From: Hamdan Anwar Sayeed <96612374+s-hamdananwar@users.noreply.github.com> Date: Tue, 21 Jan 2025 15:51:22 -0600 Subject: [PATCH 3/5] updated params for deepgram stt.updateOptions --- plugins/deepgram/src/stt.ts | 98 ++++++++++++------------------------- 1 file changed, 31 insertions(+), 67 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 28565838..3502b4ca 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -79,48 +79,22 @@ export class STT extends stt.STT { return stream; } - updateOptions( - language?: STTLanguages, - model?: STTModels, - interimResults?: boolean, - punctuate?: boolean, - smartFormat?: boolean, - noDelay?: boolean, - endpointing?: number, - fillerWords?: boolean, - sampleRate?: number, - numChannels?: number, - keywords?: [string, number][], - profanityFilter?: boolean, - ) { - if (language !== undefined) this.#opts.language = language; - if (model !== undefined) this.#opts.model = model; - if (interimResults !== undefined) this.#opts.interimResults = interimResults; - if (punctuate !== undefined) this.#opts.punctuate = punctuate; - if (smartFormat !== undefined) this.#opts.smartFormat = smartFormat; - if (noDelay !== undefined) this.#opts.noDelay = noDelay; - if (endpointing !== undefined) this.#opts.endpointing = endpointing; - if (fillerWords !== undefined) this.#opts.fillerWords = fillerWords; - if (sampleRate !== undefined) this.#opts.sampleRate = sampleRate; - if (numChannels !== undefined) this.#opts.numChannels = numChannels; - if (keywords !== undefined) this.#opts.keywords = keywords; - if (profanityFilter !== undefined) this.#opts.profanityFilter = profanityFilter; + updateOptions(opts: Partial) { + if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.model !== undefined) this.#opts.model = opts.model; + if (opts.interimResults !== undefined) this.#opts.interimResults = opts.interimResults; + if (opts.punctuate !== undefined) this.#opts.punctuate = opts.punctuate; + if (opts.smartFormat !== undefined) this.#opts.smartFormat = opts.smartFormat; + if (opts.noDelay !== undefined) this.#opts.noDelay = opts.noDelay; + if (opts.endpointing !== undefined) this.#opts.endpointing = opts.endpointing; + if (opts.fillerWords !== undefined) this.#opts.fillerWords = opts.fillerWords; + if (opts.sampleRate !== undefined) this.#opts.sampleRate = opts.sampleRate; + if (opts.numChannels !== undefined) this.#opts.numChannels = opts.numChannels; + if (opts.keywords !== undefined) this.#opts.keywords = opts.keywords; + if (opts.profanityFilter !== undefined) this.#opts.profanityFilter = opts.profanityFilter; this.#streams.forEach((stream) => { - stream.updateOptions( - language, - model, - interimResults, - punctuate, - smartFormat, - noDelay, - endpointing, - fillerWords, - sampleRate, - numChannels, - keywords, - profanityFilter, - ); + stream.updateOptions(opts); }); } } @@ -147,6 +121,7 @@ export class SpeechStream extends stt.SpeechStream { let retries = 0; // let ws: WebSocket; while (!this.input.closed) { + console.log('websocket running'); this.#ws = this.#connectWs(); try { @@ -155,6 +130,7 @@ export class SpeechStream extends stt.SpeechStream { this.#ws.on('error', (error) => reject(error)); this.#ws.on('close', (code) => { if (code === 4000) { + console.log('websocket closed'); // WebSocket closed to update Deepgram STT options reject('4000'); } else { @@ -317,33 +293,20 @@ export class SpeechStream extends stt.SpeechStream { clearInterval(keepalive); } - updateOptions( - language?: STTLanguages, - model?: STTModels, - interimResults?: boolean, - punctuate?: boolean, - smartFormat?: boolean, - noDelay?: boolean, - endpointing?: number, - fillerWords?: boolean, - sampleRate?: number, - numChannels?: number, - keywords?: [string, number][], - profanityFilter?: boolean, - ) { - console.log('called update', punctuate); - if (language !== undefined) this.#opts.language = language; - if (model !== undefined) this.#opts.model = model; - if (interimResults !== undefined) this.#opts.interimResults = interimResults; - if (punctuate !== undefined) this.#opts.punctuate = punctuate; - if (smartFormat !== undefined) this.#opts.smartFormat = smartFormat; - if (noDelay !== undefined) this.#opts.noDelay = noDelay; - if (endpointing !== undefined) this.#opts.endpointing = endpointing; - if (fillerWords !== undefined) this.#opts.fillerWords = fillerWords; - if (sampleRate !== undefined) this.#opts.sampleRate = sampleRate; - if (numChannels !== undefined) this.#opts.numChannels = numChannels; - if (keywords !== undefined) this.#opts.keywords = keywords; - if (profanityFilter !== undefined) this.#opts.profanityFilter = profanityFilter; + updateOptions(opts: Partial) { + console.log('called update', opts); + if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.model !== undefined) this.#opts.model = opts.model; + if (opts.interimResults !== undefined) this.#opts.interimResults = opts.interimResults; + if (opts.punctuate !== undefined) this.#opts.punctuate = opts.punctuate; + if (opts.smartFormat !== undefined) this.#opts.smartFormat = opts.smartFormat; + if (opts.noDelay !== undefined) this.#opts.noDelay = opts.noDelay; + if (opts.endpointing !== undefined) this.#opts.endpointing = opts.endpointing; + if (opts.fillerWords !== undefined) this.#opts.fillerWords = opts.fillerWords; + if (opts.sampleRate !== undefined) this.#opts.sampleRate = opts.sampleRate; + if (opts.numChannels !== undefined) this.#opts.numChannels = opts.numChannels; + if (opts.keywords !== undefined) this.#opts.keywords = opts.keywords; + if (opts.profanityFilter !== undefined) this.#opts.profanityFilter = opts.profanityFilter; // Custom close code to handle update options this.#ws.close(4000); } @@ -366,6 +329,7 @@ export class SpeechStream extends stt.SpeechStream { profanity_filter: this.#opts.profanityFilter, language: this.#opts.language, }; + console.log('params', params); Object.entries(params).forEach(([k, v]) => { if (v !== undefined) { if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { From 768d512acc646e3291b857ff73e18f45620b29a5 Mon Sep 17 00:00:00 2001 From: Hamdan Anwar Sayeed <96612374+s-hamdananwar@users.noreply.github.com> Date: Wed, 22 Jan 2025 00:38:59 -0600 Subject: [PATCH 4/5] fix --- plugins/deepgram/src/stt.ts | 225 +++++++++++++++++++----------------- 1 file changed, 119 insertions(+), 106 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 3502b4ca..2be4ea7a 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -81,6 +81,7 @@ export class STT extends stt.STT { updateOptions(opts: Partial) { if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.detectLanguage !== undefined) this.#opts.language = undefined; if (opts.model !== undefined) this.#opts.model = opts.model; if (opts.interimResults !== undefined) this.#opts.interimResults = opts.interimResults; if (opts.punctuate !== undefined) this.#opts.punctuate = opts.punctuate; @@ -112,7 +113,7 @@ export class SpeechStream extends stt.SpeechStream { this.#opts = opts; this.closed = false; this.#audioEnergyFilter = new AudioEnergyFilter(); - this.#ws = new WebSocket(null); + this.#ws = this.#connectWs(); this.#run(); } @@ -130,7 +131,6 @@ export class SpeechStream extends stt.SpeechStream { this.#ws.on('error', (error) => reject(error)); this.#ws.on('close', (code) => { if (code === 4000) { - console.log('websocket closed'); // WebSocket closed to update Deepgram STT options reject('4000'); } else { @@ -141,20 +141,16 @@ export class SpeechStream extends stt.SpeechStream { await this.#runWS(this.#ws); } catch (e) { - if (e === '4000') { - this.#logger.info('updating Deepgram STT options'); - } else { - if (retries >= maxRetry) { - throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); - } - - const delay = Math.min(retries * 5, 10); - retries++; - this.#logger.warn( - `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + if (retries >= maxRetry) { + throw new Error(`failed to connect to Deepgram after ${retries} attempts: ${e}`); } + + const delay = Math.min(retries * 5, 10); + retries++; + this.#logger.warn( + `failed to connect to Deepgram, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, + ); + await new Promise((resolve) => setTimeout(resolve, delay * 1000)); } } @@ -162,7 +158,7 @@ export class SpeechStream extends stt.SpeechStream { } async #runWS(ws: WebSocket) { - let closing = false; + let shouldExit = false; const keepalive = setInterval(() => { try { @@ -181,121 +177,138 @@ export class SpeechStream extends stt.SpeechStream { samples100Ms, ); - for await (const data of this.input) { - let frames: AudioFrame[]; - if (data === SpeechStream.FLUSH_SENTINEL) { - frames = stream.flush(); - } else if ( - data.sampleRate === this.#opts.sampleRate || - data.channels === this.#opts.numChannels - ) { - frames = stream.write(data.data.buffer); - } else { - throw new Error(`sample rate or channel count of frame does not match`); - } + try { + for await (const data of this.input) { + if (shouldExit) break; + + let frames: AudioFrame[]; + if (data === SpeechStream.FLUSH_SENTINEL) { + frames = stream.flush(); + } else if ( + data.sampleRate === this.#opts.sampleRate || + data.channels === this.#opts.numChannels + ) { + frames = stream.write(data.data.buffer); + } else { + throw new Error(`sample rate or channel count of frame does not match`); + } - for await (const frame of frames) { - if (this.#audioEnergyFilter.pushFrame(frame)) { - ws.send(frame.data.buffer); + for await (const frame of frames) { + if (shouldExit) break; + if (this.#audioEnergyFilter.pushFrame(frame)) { + ws.send(frame.data.buffer); + } } } - } - closing = true; - ws.send(JSON.stringify({ type: 'CloseStream' })); + if (!shouldExit) { + ws.send(JSON.stringify({ type: 'CloseStream' })); + } + } catch (error) { + if (!shouldExit) throw error; + } }; const listenTask = async () => { - new Promise((_, reject) => - ws.once('close', (code, reason) => { - if (!closing) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - reject(); - } - }), - ); + try { + while (!this.closed) { + if (shouldExit) break; - while (!this.closed) { - try { - await new Promise((resolve) => { + const msg = await new Promise((resolve, reject) => { ws.once('message', (data) => resolve(data)); - }).then((msg) => { - const json = JSON.parse(msg.toString()); - switch (json['type']) { - case 'SpeechStarted': { - // This is a normal case. Deepgram's SpeechStarted events - // are not correlated with speech_final or utterance end. - // It's possible that we receive two in a row without an endpoint - // It's also possible we receive a transcript without a SpeechStarted event. - if (this.#speaking) return; - this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); - break; - } - // see this page: - // https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final - // for more information about the different types of events - case 'Results': { - const isFinal = json['is_final']; - const isEndpoint = json['speech_final']; - - const alternatives = liveTranscriptionToSpeechData(this.#opts.language!, json); - - // If, for some reason, we didn't get a SpeechStarted event but we got - // a transcript with text, we should start speaking. It's rare but has - // been observed. - if (alternatives[0] && alternatives[0].text) { - if (!this.#speaking) { - this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); - } - - if (isFinal) { - this.queue.put({ - type: stt.SpeechEventType.FINAL_TRANSCRIPT, - alternatives: [alternatives[0], ...alternatives.slice(1)], - }); - } else { - this.queue.put({ - type: stt.SpeechEventType.INTERIM_TRANSCRIPT, - alternatives: [alternatives[0], ...alternatives.slice(1)], - }); - } - } + ws.once('close', () => reject(new Error('WebSocket closed'))); + }); - // if we receive an endpoint, only end the speech if - // we either had a SpeechStarted event or we have a seen - // a non-empty transcript (deepgram doesn't have a SpeechEnded event) - if (isEndpoint && this.#speaking) { - this.#speaking = false; - this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH }); + const json = JSON.parse(msg.toString()); + switch (json['type']) { + case 'SpeechStarted': { + // This is a normal case. Deepgram's SpeechStarted events + // are not correlated with speech_final or utterance end. + // It's possible that we receive two in a row without an endpoint + // It's also possible we receive a transcript without a SpeechStarted event. + if (this.#speaking) return; + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + break; + } + // see this page: + // https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final + // for more information about the different types of events + case 'Results': { + const isFinal = json['is_final']; + const isEndpoint = json['speech_final']; + + const alternatives = liveTranscriptionToSpeechData(this.#opts.language!, json); + + // If, for some reason, we didn't get a SpeechStarted event but we got + // a transcript with text, we should start speaking. It's rare but has + // been observed. + if (alternatives[0] && alternatives[0].text) { + if (!this.#speaking) { + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); } - break; - } - case 'Metadata': { - break; + if (isFinal) { + this.queue.put({ + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [alternatives[0], ...alternatives.slice(1)], + }); + } else { + this.queue.put({ + type: stt.SpeechEventType.INTERIM_TRANSCRIPT, + alternatives: [alternatives[0], ...alternatives.slice(1)], + }); + } } - default: { - this.#logger.child({ msg: json }).warn('received unexpected message from Deepgram'); - break; + + // if we receive an endpoint, only end the speech if + // we either had a SpeechStarted event or we have a seen + // a non-empty transcript (deepgram doesn't have a SpeechEnded event) + if (isEndpoint && this.#speaking) { + this.#speaking = false; + this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH }); } + + break; } - }); - } catch (error) { + case 'Metadata': { + break; + } + default: { + this.#logger.child({ msg: json }).warn('received unexpected message from Deepgram'); + break; + } + } + } + } catch (error) { + if (!shouldExit) { this.#logger.child({ error }).warn('unrecoverable error, exiting'); - break; + throw error; } } }; - await Promise.all([sendTask(), listenTask()]); - clearInterval(keepalive); + ws.once('close', (code) => { + if (code === 4000) { + this.#logger.info('WebSocket closed for updating options, restarting tasks'); + shouldExit = true; + } + }); + + try { + await Promise.all([sendTask(), listenTask()]); + } catch (error) { + this.#logger.child({ error }).warn('Error during WebSocket tasks'); + } finally { + clearInterval(keepalive); + } } updateOptions(opts: Partial) { console.log('called update', opts); if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.detectLanguage !== undefined) this.#opts.language = undefined; if (opts.model !== undefined) this.#opts.model = opts.model; if (opts.interimResults !== undefined) this.#opts.interimResults = opts.interimResults; if (opts.punctuate !== undefined) this.#opts.punctuate = opts.punctuate; From 91dd6baffbe0e908db83b6973dcbfa602e2854bb Mon Sep 17 00:00:00 2001 From: Hamdan Anwar Sayeed <96612374+s-hamdananwar@users.noreply.github.com> Date: Wed, 22 Jan 2025 00:41:01 -0600 Subject: [PATCH 5/5] lint fix --- plugins/deepgram/src/stt.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 2be4ea7a..41c10caa 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -120,9 +120,7 @@ export class SpeechStream extends stt.SpeechStream { async #run(maxRetry = 32) { let retries = 0; - // let ws: WebSocket; while (!this.input.closed) { - console.log('websocket running'); this.#ws = this.#connectWs(); try { @@ -306,7 +304,6 @@ export class SpeechStream extends stt.SpeechStream { } updateOptions(opts: Partial) { - console.log('called update', opts); if (opts.language !== undefined) this.#opts.language = opts.language; if (opts.detectLanguage !== undefined) this.#opts.language = undefined; if (opts.model !== undefined) this.#opts.model = opts.model; @@ -342,7 +339,6 @@ export class SpeechStream extends stt.SpeechStream { profanity_filter: this.#opts.profanityFilter, language: this.#opts.language, }; - console.log('params', params); Object.entries(params).forEach(([k, v]) => { if (v !== undefined) { if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') {