Skip to content

Commit 7084636

Browse files
feat: echo STT transcripts to thread before agent reply (#571)
* feat: STT transcript echo to thread (Discord + Slack) When STT transcribes a voice message, optionally post the transcript back to the thread (no mentions) before the agent reply so users can verify what was heard. Default is OFF — opt in via [stt] echo_transcript = true. - New config: [stt] echo_transcript (default false, opt-in) - New helper: stt::post_echo with platform-agnostic ChatAdapter handle — future LINE/Telegram/Teams adapters get echo for free - Format: > 🎤 <transcript> per clip, all in one thread message - Failure: > 🎤 (transcription failed) line + ⚠️ reaction on the user msg - Helm: agents.<name>.stt.echoTranscript (camelCase) wired through configmap - Docs: docs/stt.md and docs/config-reference.md updated Rebased on top of #567 (gateway config rendering). Tests: 133/133 cargo. helm-unittest: 28/28. Clippy --all-targets -D warnings clean. * fix: close unclosed test fn delimiter + cargo fmt --------- Co-authored-by: obrutjack <obrutjack@yahoo.com>
1 parent 80cea4f commit 7084636

26 files changed

Lines changed: 1569 additions & 543 deletions

charts/openab/templates/configmap.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ data:
167167
api_key = "${STT_API_KEY}"
168168
model = {{ ($cfg.stt).model | default "whisper-large-v3-turbo" | toJson }}
169169
base_url = {{ ($cfg.stt).baseUrl | default "https://api.groq.com/openai/v1" | toJson }}
170+
{{- if hasKey ($cfg.stt | default dict) "echoTranscript" }}
171+
echo_transcript = {{ ($cfg.stt).echoTranscript }}
172+
{{- end }}
170173
{{- end }}
171174
{{- if ($cfg.gateway).enabled }}
172175
{{- if not ($cfg.gateway).url }}

charts/openab/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ agents:
231231
apiKey: ""
232232
model: "whisper-large-v3-turbo"
233233
baseUrl: "https://api.groq.com/openai/v1"
234+
# Echo the transcribed text back to the thread before the agent reply
235+
# so users can verify STT accuracy. Default: false (opt-in).
236+
echoTranscript: false
234237
gateway:
235238
enabled: false # set to true + provide url to enable the [gateway] config block
236239
deploy: true # set to false to skip Gateway Deployment/Service (config-only mode)

docs/config-reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ Speech-to-text transcription for voice messages. Uses an OpenAI-compatible `/aud
204204
| `api_key` | string | `""` | API key for the STT service. When empty and `base_url` contains `groq.com`, the `GROQ_API_KEY` environment variable is used automatically. For local servers, use `api_key = "not-needed"`. |
205205
| `model` | string | `"whisper-large-v3-turbo"` | Model name to use for transcription. |
206206
| `base_url` | string | `"https://api.groq.com/openai/v1"` | Base URL of the STT API. Any OpenAI-compatible `/audio/transcriptions` endpoint works. |
207+
| `echo_transcript` | bool | `false` | When set to `true` and STT runs, post a `> 🎤 <transcript>` message to the thread before the agent reply so users can verify what was heard. Failures show `(transcription failed)` and add a ⚠️ reaction to the original message. |
207208

208209
---
209210

docs/stt.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ enabled = true # default: false
5050
api_key = "${GROQ_API_KEY}" # required for cloud providers
5151
model = "whisper-large-v3-turbo" # default
5252
base_url = "https://api.groq.com/openai/v1" # default
53+
echo_transcript = true # default: false (opt-in)
5354
```
5455

5556
| Field | Required | Default | Description |
@@ -58,6 +59,7 @@ base_url = "https://api.groq.com/openai/v1" # default
5859
| `api_key` | no* || API key for the STT provider. *Auto-detected from `GROQ_API_KEY` env var if not set. For local servers, use any non-empty string (e.g. `"not-needed"`). |
5960
| `model` | no | `whisper-large-v3-turbo` | Whisper model name. Varies by provider. |
6061
| `base_url` | no | `https://api.groq.com/openai/v1` | OpenAI-compatible API base URL. |
62+
| `echo_transcript` | no | `false` | When set to `true` and STT runs, post a `> 🎤 <transcript>` message to the thread before the agent reply so users can verify what was heard. Failures show `(transcription failed)` and add a ⚠️ reaction to the original message. |
6163

6264
## Deployment Options
6365

@@ -147,6 +149,13 @@ helm upgrade openab openab/openab \
147149
--set agents.kiro.stt.baseUrl=https://api.groq.com/openai/v1
148150
```
149151

152+
```bash
153+
helm upgrade openab openab/openab \
154+
--set agents.kiro.stt.enabled=true \
155+
--set agents.kiro.stt.apiKey=gsk_xxx \
156+
--set agents.kiro.stt.echoTranscript=true # opt in to transcript echo
157+
```
158+
150159
## Disabling STT
151160

152161
Omit the `[stt]` section entirely, or set:

src/acp/connection.rs

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::acp::protocol::{ConfigOption, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, parse_config_options};
1+
use crate::acp::protocol::{
2+
parse_config_options, ConfigOption, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse,
3+
};
24
use anyhow::{anyhow, Result};
35
use serde_json::{json, Value};
46
use std::collections::HashMap;
@@ -10,7 +12,6 @@ use tokio::sync::{mpsc, oneshot, Mutex};
1012
use tokio::task::JoinHandle;
1113
use tracing::{debug, error, info};
1214

13-
1415
/// Pick the most permissive selectable permission option from ACP options.
1516
fn pick_best_option(options: &[Value]) -> Option<String> {
1617
let mut fallback: Option<&Value> = None;
@@ -187,20 +188,39 @@ impl AcpConnection {
187188
// Preserve the real HOME so agents can find OAuth/auth files (~/.codex,
188189
// ~/.claude, ~/.config/gh, etc.). working_dir is already set via
189190
// current_dir() above and is not necessarily the user's home directory.
190-
cmd.env("HOME", std::env::var("HOME").unwrap_or_else(|_| working_dir.into()));
191-
cmd.env("PATH", std::env::var("PATH").unwrap_or_else(|_| "/usr/local/bin:/usr/bin:/bin".into()));
191+
cmd.env(
192+
"HOME",
193+
std::env::var("HOME").unwrap_or_else(|_| working_dir.into()),
194+
);
195+
cmd.env(
196+
"PATH",
197+
std::env::var("PATH").unwrap_or_else(|_| "/usr/local/bin:/usr/bin:/bin".into()),
198+
);
192199
#[cfg(unix)]
193200
{
194-
cmd.env("USER", std::env::var("USER").unwrap_or_else(|_| "agent".into()));
201+
cmd.env(
202+
"USER",
203+
std::env::var("USER").unwrap_or_else(|_| "agent".into()),
204+
);
195205
}
196206
#[cfg(windows)]
197207
{
198208
// Windows requires SystemRoot for DLL loading and basic OS functionality.
199209
// USERPROFILE is the Windows equivalent of HOME.
200-
cmd.env("USERPROFILE", std::env::var("USERPROFILE").unwrap_or_else(|_| working_dir.into()));
201-
cmd.env("USERNAME", std::env::var("USERNAME").unwrap_or_else(|_| "agent".into()));
202-
if let Ok(v) = std::env::var("SystemRoot") { cmd.env("SystemRoot", v); }
203-
if let Ok(v) = std::env::var("SystemDrive") { cmd.env("SystemDrive", v); }
210+
cmd.env(
211+
"USERPROFILE",
212+
std::env::var("USERPROFILE").unwrap_or_else(|_| working_dir.into()),
213+
);
214+
cmd.env(
215+
"USERNAME",
216+
std::env::var("USERNAME").unwrap_or_else(|_| "agent".into()),
217+
);
218+
if let Ok(v) = std::env::var("SystemRoot") {
219+
cmd.env("SystemRoot", v);
220+
}
221+
if let Ok(v) = std::env::var("SystemDrive") {
222+
cmd.env("SystemDrive", v);
223+
}
204224
}
205225
for (k, v) in env {
206226
cmd.env(k, expand_env(v));
@@ -223,8 +243,7 @@ impl AcpConnection {
223243
let mut proc = cmd
224244
.spawn()
225245
.map_err(|e| anyhow!("failed to spawn {command}: {e}"))?;
226-
let child_pgid = proc.id()
227-
.and_then(|pid| i32::try_from(pid).ok());
246+
let child_pgid = proc.id().and_then(|pid| i32::try_from(pid).ok());
228247

229248
let stdout = proc.stdout.take().ok_or_else(|| anyhow!("no stdout"))?;
230249
let stdin = proc.stdin.take().ok_or_else(|| anyhow!("no stdin"))?;
@@ -403,19 +422,22 @@ impl AcpConnection {
403422
.and_then(|c| c.get("loadSession"))
404423
.and_then(|v| v.as_bool())
405424
.unwrap_or(false);
406-
info!(agent = agent_name, load_session = self.supports_load_session, "initialized");
425+
info!(
426+
agent = agent_name,
427+
load_session = self.supports_load_session,
428+
"initialized"
429+
);
407430
Ok(())
408431
}
409432

410433
pub async fn session_new(&mut self, cwd: &str) -> Result<String> {
411434
let resp = self
412-
.send_request(
413-
"session/new",
414-
Some(json!({"cwd": cwd, "mcpServers": []})),
415-
)
435+
.send_request("session/new", Some(json!({"cwd": cwd, "mcpServers": []})))
416436
.await?;
417437

418-
let session_id = resp.result.as_ref()
438+
let session_id = resp
439+
.result
440+
.as_ref()
419441
.and_then(|r| r.get("sessionId"))
420442
.and_then(|s| s.as_str())
421443
.ok_or_else(|| anyhow!("no sessionId in session/new response"))?
@@ -434,7 +456,11 @@ impl AcpConnection {
434456

435457
/// Set a config option (e.g. model, mode) via ACP session/set_config_option.
436458
/// Returns the updated list of all config options.
437-
pub async fn set_config_option(&mut self, config_id: &str, value: &str) -> Result<Vec<ConfigOption>> {
459+
pub async fn set_config_option(
460+
&mut self,
461+
config_id: &str,
462+
value: &str,
463+
) -> Result<Vec<ConfigOption>> {
438464
let session_id = self
439465
.acp_session_id
440466
.as_ref()
@@ -462,7 +488,10 @@ impl AcpConnection {
462488
Err(_) => {
463489
// Fall back: send as a slash command (e.g. "/model claude-sonnet-4")
464490
let cmd = format!("/{config_id} {value}");
465-
info!(cmd, "set_config_option not supported, falling back to prompt");
491+
info!(
492+
cmd,
493+
"set_config_option not supported, falling back to prompt"
494+
);
466495
let _resp = self
467496
.send_request(
468497
"session/prompt",
@@ -503,10 +532,7 @@ impl AcpConnection {
503532
let id = self.next_id();
504533

505534
// Convert content blocks to JSON
506-
let prompt_json: Vec<Value> = content_blocks
507-
.iter()
508-
.map(|b| b.to_json())
509-
.collect();
535+
let prompt_json: Vec<Value> = content_blocks.iter().map(|b| b.to_json()).collect();
510536

511537
let req = JsonRpcRequest::new(
512538
id,
@@ -572,11 +598,15 @@ impl AcpConnection {
572598
#[cfg(unix)]
573599
{
574600
// Stage 1: SIGTERM the process group
575-
unsafe { libc::kill(-pgid, libc::SIGTERM); }
601+
unsafe {
602+
libc::kill(-pgid, libc::SIGTERM);
603+
}
576604
// Stage 2: SIGKILL after brief grace (std::thread survives runtime shutdown)
577605
std::thread::spawn(move || {
578606
std::thread::sleep(std::time::Duration::from_millis(1500));
579-
unsafe { libc::kill(-pgid, libc::SIGKILL); }
607+
unsafe {
608+
libc::kill(-pgid, libc::SIGKILL);
609+
}
580610
});
581611
}
582612
#[cfg(not(unix))]

src/acp/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ pub mod connection;
22
pub mod pool;
33
pub mod protocol;
44

5+
pub use connection::ContentBlock;
56
pub use pool::SessionPool;
67
pub use protocol::{classify_notification, AcpEvent};
7-
pub use connection::ContentBlock;

src/acp/pool.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,7 @@ pub struct SessionPool {
3232
mapping_path: PathBuf,
3333
}
3434

35-
type EvictionCandidate = (
36-
String,
37-
Arc<Mutex<AcpConnection>>,
38-
Instant,
39-
Option<String>,
40-
);
35+
type EvictionCandidate = (String, Arc<Mutex<AcpConnection>>, Instant, Option<String>);
4136

4237
fn remove_if_same_handle<T>(
4338
map: &mut HashMap<String, Arc<Mutex<T>>>,
@@ -54,10 +49,7 @@ fn remove_if_same_handle<T>(
5449
}
5550
}
5651

57-
fn get_or_insert_gate(
58-
map: &mut HashMap<String, Arc<Mutex<()>>>,
59-
key: &str,
60-
) -> Arc<Mutex<()>> {
52+
fn get_or_insert_gate(map: &mut HashMap<String, Arc<Mutex<()>>>, key: &str) -> Arc<Mutex<()>> {
6153
map.entry(key.to_string())
6254
.or_insert_with(|| Arc::new(Mutex::new(())))
6355
.clone()
@@ -104,7 +96,9 @@ impl SessionPool {
10496
}
10597
};
10698
let tmp = self.mapping_path.with_extension("json.tmp");
107-
if let Err(e) = std::fs::write(&tmp, &data).and_then(|_| std::fs::rename(&tmp, &self.mapping_path)) {
99+
if let Err(e) =
100+
std::fs::write(&tmp, &data).and_then(|_| std::fs::rename(&tmp, &self.mapping_path))
101+
{
108102
warn!(path = %self.mapping_path.display(), error = %e, "failed to persist thread mapping");
109103
}
110104
}
@@ -157,7 +151,12 @@ impl SessionPool {
157151
skipped_locked_candidates += 1;
158152
continue;
159153
};
160-
let candidate = (key, conn_handle, conn.last_active, conn.acp_session_id.clone());
154+
let candidate = (
155+
key,
156+
conn_handle,
157+
conn.last_active,
158+
conn.acp_session_id.clone(),
159+
);
161160
match &eviction_candidate {
162161
Some((_, _, oldest_last_active, _)) if candidate.2 >= *oldest_last_active => {}
163162
_ => eviction_candidate = Some(candidate),
@@ -250,7 +249,9 @@ impl SessionPool {
250249
state.active.insert(thread_id.to_string(), new_conn);
251250
self.save_mapping(&state.suspended);
252251
if !cancel_session_id.is_empty() {
253-
state.cancel_handles.insert(thread_id.to_string(), (cancel_handle, cancel_session_id));
252+
state
253+
.cancel_handles
254+
.insert(thread_id.to_string(), (cancel_handle, cancel_session_id));
254255
}
255256
Ok(())
256257
}
@@ -260,7 +261,9 @@ impl SessionPool {
260261
where
261262
F: for<'a> FnOnce(
262263
&'a mut AcpConnection,
263-
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R>> + Send + 'a>>,
264+
) -> std::pin::Pin<
265+
Box<dyn std::future::Future<Output = Result<R>> + Send + 'a>,
266+
>,
264267
{
265268
let conn = {
266269
let state = self.state.read().await;
@@ -311,7 +314,10 @@ impl SessionPool {
311314
pub async fn cancel_session(&self, thread_id: &str) -> Result<()> {
312315
let (stdin, session_id) = {
313316
let state = self.state.read().await;
314-
state.cancel_handles.get(thread_id).cloned()
317+
state
318+
.cancel_handles
319+
.get(thread_id)
320+
.cloned()
315321
.ok_or_else(|| anyhow!("no session for thread {thread_id}"))?
316322
};
317323
let data = serde_json::to_string(&serde_json::json!({
@@ -414,7 +420,11 @@ impl SessionPool {
414420
// awaiting a connection lock).
415421
let snapshot: Vec<(String, Arc<Mutex<AcpConnection>>)> = {
416422
let state = self.state.read().await;
417-
state.active.iter().map(|(k, v)| (k.clone(), Arc::clone(v))).collect()
423+
state
424+
.active
425+
.iter()
426+
.map(|(k, v)| (k.clone(), Arc::clone(v)))
427+
.collect()
418428
};
419429

420430
let mut session_ids: Vec<(String, String)> = Vec::new();

0 commit comments

Comments
 (0)