diff --git a/docs/config-reference.md b/docs/config-reference.md index f01f0245..7be165e9 100644 --- a/docs/config-reference.md +++ b/docs/config-reference.md @@ -78,6 +78,15 @@ Custom Gateway adapter for platforms like Telegram, LINE, Feishu/Lark, and Googl | `allow_all_users` | bool \| omit | auto-detect | `true` = any user; `false` = only `allowed_users`. Omitted = inferred from list. | | `allowed_users` | string[] | `[]` | User IDs to allow. Only checked when `allow_all_users` resolves to false. | +### Gateway Environment Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `GATEWAY_MEDIA_BASE_URL` | Public base URL for media proxy (e.g. `https://gateway.example.com`). | `http://localhost:8080` | +| `GATEWAY_MEDIA_STORE_TTL` | Seconds to keep media in memory before expiration. | `300` | +| `GATEWAY_MEDIA_STORE_MAX_ENTRIES` | Maximum number of media items to keep in memory. | `1000` | + + --- ## `[agent]` diff --git a/docs/line.md b/docs/line.md index a7fe4f8d..9a8730b5 100644 --- a/docs/line.md +++ b/docs/line.md @@ -84,6 +84,7 @@ In the LINE Developers Console → **Messaging API** tab → scan the QR code wi - **1:1 chat** — send a message to the bot, get an AI agent response - **Group chat** — add the bot to a group, it responds to all messages - **Webhook signature validation** — HMAC-SHA256 via `LINE_CHANNEL_SECRET` +- **Media Support** — Support for incoming images and audio/voice messages via media proxy. ### Not Supported (LINE API limitations) diff --git a/docs/telegram.md b/docs/telegram.md index d7dd9ae0..1de5be91 100644 --- a/docs/telegram.md +++ b/docs/telegram.md @@ -168,6 +168,10 @@ explain VPC peering ← ignored in groups DMs and replies within forum topics always trigger the agent (no @mention needed). +### Media support + +Incoming images and audio/voice messages are supported via media proxy. The gateway downloads the media and provides a temporary URL to the agent for analysis. + ### Emoji reactions The bot shows status reactions on your message as the agent works: diff --git a/gateway/src/adapters/feishu.rs b/gateway/src/adapters/feishu.rs index 922fae34..3abae2b5 100644 --- a/gateway/src/adapters/feishu.rs +++ b/gateway/src/adapters/feishu.rs @@ -189,12 +189,14 @@ mod event_types { pub event: Option, pub challenge: Option, #[serde(rename = "type")] + #[allow(dead_code)] pub event_type_field: Option, } #[derive(Debug, Deserialize)] pub struct FeishuEventHeader { pub event_id: Option, + #[allow(dead_code)] pub event_type: Option, } @@ -231,6 +233,7 @@ mod event_types { pub struct FeishuMention { pub key: Option, pub id: Option, + #[allow(dead_code)] pub name: Option, } @@ -781,6 +784,7 @@ pub async fn start_websocket( } /// Single WebSocket connection lifecycle. +#[allow(clippy::too_many_arguments)] async fn ws_connect_loop( token_cache: &Arc, bot_open_id_store: &Arc>>, @@ -848,7 +852,7 @@ async fn ws_connect_loop( ack.payload = Some(b"{\"code\":200}".to_vec()); let ack_bytes = ack.encode_to_vec(); let _ = ws_tx.send( - tokio_tungstenite::tungstenite::Message::Binary(ack_bytes.into()) + tokio_tungstenite::tungstenite::Message::Binary(ack_bytes) ).await; } } @@ -869,6 +873,7 @@ async fn ws_connect_loop( } /// Process a single WebSocket text message. +#[allow(clippy::too_many_arguments)] async fn handle_ws_message( text: &str, bot_open_id_store: &Arc>>, @@ -1080,8 +1085,8 @@ fn markdown_to_post(md: &str) -> serde_json::Value { let line = raw_lines[li]; // Detect fenced code block let trimmed = line.trim_start(); - if trimmed.starts_with("```") { - let lang = trimmed[3..].trim().to_string(); + if let Some(rest) = trimmed.strip_prefix("```") { + let lang = rest.trim().to_string(); let mut code = String::new(); li += 1; while li < raw_lines.len() { @@ -1154,8 +1159,8 @@ fn parse_inline(line: &str) -> Vec { } if close_ticks == ticks { // Found matching close — content between is literal - for j in i..end { - buf.push(chars[j]); + for ch in chars.iter().take(end).skip(i) { + buf.push(*ch); } i = end + close_ticks; break 'outer; @@ -1167,8 +1172,8 @@ fn parse_inline(line: &str) -> Vec { } if end >= len { // No matching close — treat backticks as literal - for j in i..len { - buf.push(chars[j]); + for ch in chars.iter().skip(i) { + buf.push(*ch); } i = len; } @@ -1194,8 +1199,8 @@ fn parse_inline(line: &str) -> Vec { } if close_run == run { // Found matching close — strip both, keep inner text - for j in after..scan { - buf.push(chars[j]); + for ch in chars.iter().take(scan).skip(after) { + buf.push(*ch); } i = scan + close_run; found_close = true; @@ -1351,13 +1356,15 @@ pub async fn download_feishu_image( let ext = if mime == "image/gif" { "gif" } else { "jpg" }; Some(crate::schema::Attachment { attachment_type: "image".into(), - filename: format!("{}.{}", image_key, ext), - mime_type: mime, - data, - size: compressed.len() as u64, + url: None, + filename: Some(format!("{}.{}", image_key, ext)), + mime_type: Some(mime), + data: Some(data), + size: Some(compressed.len() as u64), }) } + /// Download a Feishu file by message_id + file_key → base64 Attachment (text files only). pub async fn download_feishu_file( client: &reqwest::Client, @@ -1413,13 +1420,15 @@ pub async fn download_feishu_file( let data = base64::engine::general_purpose::STANDARD.encode(text.as_bytes()); Some(crate::schema::Attachment { attachment_type: "text_file".into(), - filename: file_name.to_string(), - mime_type: "text/plain".into(), - data, - size: bytes.len() as u64, + url: None, + filename: Some(file_name.to_string()), + mime_type: Some("text/plain".into()), + data: Some(data), + size: Some(bytes.len() as u64), }) } + /// Send a post (rich text) message to a feishu chat_id. /// Returns the sent message_id on success, None on failure. /// When `reply_to` is Some(root_id), uses the reply API to stay in a thread. diff --git a/gateway/src/adapters/googlechat.rs b/gateway/src/adapters/googlechat.rs index 71a2ff7e..1655642b 100644 --- a/gateway/src/adapters/googlechat.rs +++ b/gateway/src/adapters/googlechat.rs @@ -64,6 +64,7 @@ pub struct GoogleChatSpace { pub name: String, #[serde(rename = "type")] pub space_type: Option, + #[allow(dead_code)] pub space_type_renamed: Option, } diff --git a/gateway/src/adapters/line.rs b/gateway/src/adapters/line.rs index 3bfc36d0..59481e6e 100644 --- a/gateway/src/adapters/line.rs +++ b/gateway/src/adapters/line.rs @@ -90,45 +90,138 @@ pub async fn webhook( let Some(ref msg) = event.message else { continue; }; - if msg.message_type != "text" { + + let mut attachments = vec![]; + let mut text = msg.text.clone().unwrap_or_default(); + + // Handle Image/Audio attachments (Issue #690 Phase 1) + if let Some(ref access_token) = state.line_access_token { + if msg.message_type == "image" || msg.message_type == "audio" { + let url = format!("https://api-data.line.me/v2/bot/message/{}/content", msg.id); + let client = reqwest::Client::new(); + let resp = client.get(url).bearer_auth(access_token).send().await; + + if let Ok(r) = resp { + if !r.status().is_success() { + warn!(status = %r.status(), id = %msg.id, "failed to download LINE media"); + continue; + } + // Issue #690 review fix: Check file size before downloading + let content_length = r + .headers() + .get("content-length") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + + if content_length > state.media_max_file_size { + warn!( + size = content_length, + max = state.media_max_file_size, + id = %msg.id, + "LINE media too large, skipping" + ); + continue; + } + + let mime = r + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or(if msg.message_type == "image" { + "image/jpeg" + } else { + "audio/x-m4a" + }) + .to_string(); + + if let Ok(data) = r.bytes().await { + let uuid = uuid::Uuid::new_v4().to_string(); + let size = data.len() as u64; + let proxied = { + let mut store = + state.media_store.lock().unwrap_or_else(|e| e.into_inner()); + if store.len() >= state.media_max_entries { + warn!( + size = store.len(), + "media store full, skipping LINE media proxy" + ); + false + } else { + store.insert( + uuid.clone(), + (data.to_vec(), mime.clone(), std::time::Instant::now()), + ); + true + } + }; + if proxied { + attachments.push(Attachment { + attachment_type: msg.message_type.clone(), + url: Some(format!("{}/media/{}", state.public_url, uuid)), + mime_type: Some(mime), + filename: Some(format!( + "line-{}.{}", + msg.id, + if msg.message_type == "image" { + "jpg" + } else { + "m4a" + } + )), + size: Some(size), + data: None, + }); + if text.is_empty() { + text = format!("[{}]", msg.message_type); + } + info!(id = %msg.id, uuid = %uuid, "proxied LINE inbound media"); + } + } + } + } else if msg.message_type != "text" { + // Issue #690 review fix: Warn when media message is dropped due to missing access_token + warn!( + msg_type = %msg.message_type, + "LINE media message dropped (access_token not configured)" + ); + continue; + } + } else if msg.message_type != "text" { + warn!( + msg_type = %msg.message_type, + "LINE media message dropped (access_token not configured)" + ); continue; } - let Some(ref text) = msg.text else { - continue; - }; - if text.trim().is_empty() { + + if text.trim().is_empty() && attachments.is_empty() { continue; } let source = event.source.as_ref(); let (channel_id, channel_type) = match source { - Some(s) if s.source_type == "group" => { - match s.group_id.as_deref() { - Some(id) if !id.is_empty() => (id.to_string(), "group".to_string()), - _ => { - warn!("LINE group event missing groupId, skipping"); - continue; - } + Some(s) if s.source_type == "group" => match s.group_id.as_deref() { + Some(id) if !id.is_empty() => (id.to_string(), "group".to_string()), + _ => { + warn!("LINE group event missing groupId, skipping"); + continue; } - } - Some(s) if s.source_type == "room" => { - match s.room_id.as_deref() { - Some(id) if !id.is_empty() => (id.to_string(), "room".to_string()), - _ => { - warn!("LINE room event missing roomId, skipping"); - continue; - } + }, + Some(s) if s.source_type == "room" => match s.room_id.as_deref() { + Some(id) if !id.is_empty() => (id.to_string(), "room".to_string()), + _ => { + warn!("LINE room event missing roomId, skipping"); + continue; } - } - Some(s) => { - match s.user_id.as_deref() { - Some(id) if !id.is_empty() => (id.to_string(), "user".to_string()), - _ => { - warn!("LINE user event missing userId, skipping"); - continue; - } + }, + Some(s) => match s.user_id.as_deref() { + Some(id) if !id.is_empty() => (id.to_string(), "user".to_string()), + _ => { + warn!("LINE user event missing userId, skipping"); + continue; } - } + }, None => { warn!("LINE event missing source, skipping"); continue; @@ -138,7 +231,7 @@ pub async fn webhook( .and_then(|s| s.user_id.as_deref()) .unwrap_or("unknown"); - let gateway_event = GatewayEvent::new( + let mut gateway_event = GatewayEvent::new( "line", ChannelInfo { id: channel_id.clone(), @@ -151,10 +244,11 @@ pub async fn webhook( display_name: user_id.into(), is_bot: false, }, - text, + &text, &msg.id, vec![], ); + gateway_event.attachments = attachments; // Cache the reply token for hybrid Reply/Push dispatch if let Some(ref reply_token) = event.reply_token { diff --git a/gateway/src/adapters/telegram.rs b/gateway/src/adapters/telegram.rs index 6ae01624..d1e228c9 100644 --- a/gateway/src/adapters/telegram.rs +++ b/gateway/src/adapters/telegram.rs @@ -27,6 +27,25 @@ struct TelegramMessage { text: Option, #[serde(default)] entities: Vec, + photo: Option>, + voice: Option, + audio: Option, + caption: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramPhotoSize { + file_id: String, +} + +#[derive(Debug, Deserialize)] +struct TelegramVoice { + file_id: String, +} + +#[derive(Debug, Deserialize)] +struct TelegramAudio { + file_id: String, } #[derive(Debug, Deserialize)] @@ -75,10 +94,117 @@ pub async fn webhook( let Some(msg) = update.message else { return axum::http::StatusCode::OK; }; - let Some(text) = msg.text.as_deref() else { - return axum::http::StatusCode::OK; + + let mut text = msg + .text + .clone() + .unwrap_or_else(|| msg.caption.clone().unwrap_or_default()); + let mut attachments = vec![]; + + // Handle Image/Audio attachments (Issue #690 Phase 1) + let media_info = if let Some(ref photos) = msg.photo { + photos.last().map(|p| (p.file_id.clone(), "image")) + } else { + msg.voice + .as_ref() + .map(|v| (v.file_id.clone(), "audio")) + .or_else(|| msg.audio.as_ref().map(|a| (a.file_id.clone(), "audio"))) }; - if text.trim().is_empty() { + + if let (Some((file_id, m_type)), Some(ref token)) = (media_info, &state.telegram_bot_token) { + let client = reqwest::Client::new(); + // 1. getFile to get file_path + let url = format!("{TELEGRAM_API_BASE}/bot{token}/getFile?file_id={file_id}"); + if let Ok(resp) = client.get(url).send().await { + if !resp.status().is_success() { + warn!(status = %resp.status(), id = %file_id, "Telegram getFile failed"); + } else if let Ok(body) = resp.json::().await { + if let Some(file_path) = body["result"]["file_path"].as_str() { + // 2. Download the file + let download_url = format!("{TELEGRAM_API_BASE}/file/bot{token}/{file_path}"); + if let Ok(r) = client.get(download_url).send().await { + if !r.status().is_success() { + warn!(status = %r.status(), id = %file_id, "failed to download Telegram media"); + } else { + // Issue #690 review fix: Check file size before downloading + let content_length = r + .headers() + .get("content-length") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + + if content_length > state.media_max_file_size { + warn!( + size = content_length, + max = state.media_max_file_size, + id = %file_id, + "Telegram media too large, skipping" + ); + } else { + let mime = r + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or(if m_type == "image" { + "image/jpeg" + } else { + "audio/ogg" + }) + .to_string(); + + if let Ok(data) = r.bytes().await { + let uuid = uuid::Uuid::new_v4().to_string(); + let size = data.len() as u64; + let proxied = { + let mut store = + state.media_store.lock().unwrap_or_else(|e| e.into_inner()); + if store.len() >= state.media_max_entries { + warn!( + size = store.len(), + "media store full, skipping Telegram media proxy" + ); + false + } else { + store.insert( + uuid.clone(), + ( + data.to_vec(), + mime.clone(), + std::time::Instant::now(), + ), + ); + true + } + }; + if proxied { + attachments.push(Attachment { + attachment_type: m_type.into(), + url: Some(format!("{}/media/{}", state.public_url, uuid)), + mime_type: Some(mime), + filename: Some(format!( + "telegram-{}.{}", + file_id, + if m_type == "image" { "jpg" } else { "ogg" } + )), + size: Some(size), + data: None, + }); + if text.is_empty() { + text = format!("[{}]", m_type); + } + info!(id = %file_id, uuid = %uuid, "proxied Telegram inbound media"); + } + } + } + } + } + } + } + } + } + + if text.trim().is_empty() && attachments.is_empty() { return axum::http::StatusCode::OK; } @@ -107,7 +233,7 @@ pub async fn webhook( }) .collect(); - let event = GatewayEvent::new( + let mut event = GatewayEvent::new( "telegram", ChannelInfo { id: msg.chat.id.to_string(), @@ -120,10 +246,11 @@ pub async fn webhook( display_name, is_bot: from.map(|u| u.is_bot).unwrap_or(false), }, - text, + &text, &msg.message_id.to_string(), mentions, ); + event.attachments = attachments; let json = serde_json::to_string(&event).unwrap(); info!(chat_id = %msg.chat.id, sender = %sender_name, "telegram → gateway"); diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 51703dd4..191d5a14 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -31,6 +31,11 @@ pub const REPLY_TOKEN_TTL_SECS: u64 = 50; /// if webhooks arrive faster than OAB can reply (e.g. OAB offline, spam burst). pub const REPLY_TOKEN_CACHE_MAX: usize = 10_000; +// --- Media Store for Inbound Media Proxy (Issue #690) --- + +/// uuid -> (binary data, mime type, created_at) +pub type MediaStore = Arc, String, Instant)>>>; + // --- App state (shared across all adapters) --- pub struct AppState { @@ -55,10 +60,17 @@ pub struct AppState { /// Broadcast channel: gateway → OAB (events from all platforms) pub event_tx: broadcast::Sender, /// Cache: event_id → (LINE replyToken, timestamp). - /// Global across all OAB WebSocket clients. LINE reply tokens are single-use: - /// the first client to `remove()` a token wins the free Reply API call; - /// other clients for the same event naturally fall back to Push API. pub reply_token_cache: ReplyTokenCache, + /// Inbound media proxy store (UUID -> data) + pub media_store: MediaStore, + /// Public base URL for the gateway (used to construct attachment URLs) + pub public_url: String, + /// TTL for media in memory + pub media_ttl: u64, + /// Maximum number of entries in the media store + pub media_max_entries: usize, + /// Maximum file size for media downloads (in bytes, Issue #690 review fix) + pub media_max_file_size: u64, } // --- WebSocket handler (OAB connects here) --- @@ -86,11 +98,22 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: info!("OAB client connected via WebSocket"); + let last_event_id: Arc>> = Arc::new(Mutex::new(std::collections::HashMap::new())); + // Forward gateway events → OAB + let last_event_id_for_send = last_event_id.clone(); let send_task = tokio::spawn(async move { loop { tokio::select! { Ok(event_json) = event_rx.recv() => { + // Track the last event ID sent to this client per channel + if let Ok(v) = serde_json::from_str::(&event_json) { + if let (Some(eid), Some(cid)) = (v["event_id"].as_str(), v["channel"]["id"].as_str()) { + let mut last = last_event_id_for_send.lock().await; + last.insert(cid.to_string(), eid.to_string()); + } + } + if ws_tx.send(Message::Text(event_json.into())).await.is_err() { break; } @@ -104,12 +127,25 @@ async fn handle_oab_connection(state: Arc, socket: axum::extract::ws:: // Track per-message reaction state (Telegram replaces all reactions atomically) let reaction_state: Arc>>> = Arc::new(Mutex::new(HashMap::new())); + let last_event_id_for_recv = last_event_id.clone(); let recv_task = tokio::spawn(async move { let client = reqwest::Client::new(); while let Some(Ok(msg)) = ws_rx.next().await { if let Message::Text(text) = msg { - match serde_json::from_str::(&*text) { - Ok(reply) => { + match serde_json::from_str::(&text) { + Ok(mut reply) => { + // Auto-fill reply_to if empty using the last event sent to this client. + // Rationale: This ensures agents don't need to track event IDs for simple replies. + // Note: reply_to is used by the LINE adapter to look up replyTokens; auto-filling + // allows LINE hybrid Reply/Push dispatch to work even if the agent omits reply_to. + // Issue #690 review fix: Only auto-fill if channel.id is non-empty to prevent + // cross-channel contamination (e.g., LINE reply tokens used for wrong messages). + if reply.reply_to.is_empty() && !reply.channel.id.is_empty() { + let last = last_event_id_for_recv.lock().await; + if let Some(eid) = last.get(&reply.channel.id) { + reply.reply_to = eid.clone(); + } + } info!( platform = %reply.platform, channel = %reply.channel.id, @@ -191,6 +227,29 @@ async fn health() -> &'static str { "ok" } +async fn media_handler( + State(state): State>, + axum::extract::Path(uuid): axum::extract::Path, +) -> impl IntoResponse { + let entry = { + let cache = state.media_store.lock().unwrap_or_else(|e| e.into_inner()); + cache.get(&uuid).cloned() + }; + + if let Some((data, mime, created_at)) = entry { + // Check TTL before serving (Issue #690 review fix) + if created_at.elapsed().as_secs() >= state.media_ttl { + return axum::http::StatusCode::NOT_FOUND.into_response(); + } + axum::response::Response::builder() + .header("content-type", mime) + .body(axum::body::Body::from(data)) + .unwrap() + } else { + axum::http::StatusCode::NOT_FOUND.into_response() + } +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -199,19 +258,69 @@ async fn main() -> Result<()> { ) .init(); +<<<<<<< Updated upstream let listen_addr = std::env::var("GATEWAY_LISTEN").unwrap_or_else(|_| "0.0.0.0:8080".into()); let ws_token = std::env::var("GATEWAY_WS_TOKEN").ok(); + + // Inbound media proxy config (Issue #690) + let public_url = std::env::var("GATEWAY_MEDIA_BASE_URL") + .or_else(|_| std::env::var("GATEWAY_PUBLIC_URL")) + .unwrap_or_else(|_| "http://localhost:8080".into()); + + // Warn if public_url looks like a loopback address (Issue #690 review fix) + if public_url.contains("localhost") || public_url.contains("127.0.0.1") { + warn!( + public_url = %public_url, + "GATEWAY_PUBLIC_URL looks like a loopback address. Media attachments will not be reachable from other pods/containers. Set GATEWAY_PUBLIC_URL to the gateway's cluster-internal Service URL (e.g. http://openab-gateway:8080) or its external ingress URL." + ); +======= + // Load configuration from environment + let bot_token = std::env::var("TELEGRAM_BOT_TOKEN").expect("TELEGRAM_BOT_TOKEN must be set"); + let secret_token = std::env::var("TELEGRAM_SECRET_TOKEN").ok(); + let ws_token = std::env::var("GATEWAY_WS_TOKEN").ok(); + let line_channel_secret = std::env::var("LINE_CHANNEL_SECRET").ok(); + let line_access_token = std::env::var("LINE_CHANNEL_ACCESS_TOKEN").ok(); + let listen_addr = std::env::var("GATEWAY_LISTEN").unwrap_or_else(|_| "0.0.0.0:8080".into()); + // Configurable webhook paths + let telegram_webhook_path = std::env::var("TELEGRAM_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/telegram".into()); + let line_webhook_path = std::env::var("LINE_WEBHOOK_PATH").unwrap_or_else(|_| "/webhook/line".into()); + // Optional Cloudflare Tunnel domain for logging / external reference + let cloudflare_tunnel_url = std::env::var("CLOUDFLARE_TUNNEL_URL").ok(); + + if secret_token.is_none() { + warn!("TELEGRAM_SECRET_TOKEN not set — webhook requests are NOT validated (insecure)"); +>>>>>>> Stashed changes + } + + let media_ttl = std::env::var("GATEWAY_MEDIA_STORE_TTL") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(300); + let media_max_entries = std::env::var("GATEWAY_MEDIA_STORE_MAX_ENTRIES") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(1000); + // Issue #690 review fix: per-file size limit to prevent OOM + let media_max_file_size = std::env::var("GATEWAY_MEDIA_MAX_FILE_SIZE") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(10 * 1024 * 1024); // 10MB default if ws_token.is_none() { warn!("GATEWAY_WS_TOKEN not set — WebSocket connections are NOT authenticated (insecure)"); } + if let Some(ref url) = cloudflare_tunnel_url { + info!("Using Cloudflare Tunnel URL: {}", url); + } let (event_tx, _) = broadcast::channel::(256); let reply_token_cache: ReplyTokenCache = Arc::new(std::sync::Mutex::new(HashMap::new())); + let media_store: MediaStore = Arc::new(std::sync::Mutex::new(HashMap::new())); let mut app = Router::new() .route("/ws", get(ws_handler)) - .route("/health", get(health)); + .route("/health", get(health)) + .route("/media/{uuid}", get(media_handler)); // Telegram adapter let telegram_bot_token = std::env::var("TELEGRAM_BOT_TOKEN").ok(); @@ -335,6 +444,11 @@ async fn main() -> Result<()> { ws_token, event_tx, reply_token_cache, + media_store, + public_url, + media_ttl, + media_max_entries, + media_max_file_size, }); // Background task: sweep expired reply tokens every REPLY_TOKEN_TTL_SECS @@ -361,6 +475,28 @@ async fn main() -> Result<()> { }); } + // Background task: sweep expired media every 60 seconds (Issue #690) + { + let store = state.media_store.clone(); + let ttl = state.media_ttl; + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + let mut cache = store.lock().unwrap_or_else(|e| e.into_inner()); + let before = cache.len(); + cache.retain(|_, (_, _, t)| t.elapsed().as_secs() < ttl); + let after = cache.len(); + if before != after { + info!( + removed = before - after, + remaining = after, + "media store sweep" + ); + } + } + }); + } + // Periodic cleanup of stale Teams service_url entries (TTL: 4 hours) { let state_for_cleanup = state.clone(); diff --git a/gateway/src/schema.rs b/gateway/src/schema.rs index a38554df..462141cf 100644 --- a/gateway/src/schema.rs +++ b/gateway/src/schema.rs @@ -14,6 +14,24 @@ pub struct GatewayEvent { pub content: Content, pub mentions: Vec, pub message_id: String, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub attachments: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Attachment { + #[serde(rename = "type")] + pub attachment_type: String, // "image", "audio", etc. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub url: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub mime_type: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub filename: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub size: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub data: Option, // Legacy base64 support } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -41,15 +59,6 @@ pub struct Content { pub attachments: Vec, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Attachment { - #[serde(rename = "type")] - pub attachment_type: String, // "image", "text_file" - pub filename: String, - pub mime_type: String, - pub data: String, // base64 encoded - pub size: u64, // size in bytes (after compression for images) -} // --- Reply schema (ADR openab.gateway.reply.v1) --- @@ -107,6 +116,7 @@ impl GatewayEvent { }, mentions, message_id: message_id.into(), + attachments: vec![], } } } diff --git a/src/cron.rs b/src/cron.rs index 1aec1621..0a41b3cd 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -1,4 +1,4 @@ -use crate::adapter::{AdapterRouter, ChatAdapter, ChannelRef, SenderContext}; +use crate::adapter::{AdapterRouter, ChannelRef, ChatAdapter, SenderContext}; use crate::config::CronJobConfig; use crate::format; use chrono::{Timelike, Utc}; @@ -24,9 +24,7 @@ pub fn parse_cron_expr(expr: &str) -> Result { /// schedule has an event at exactly that minute. pub fn should_fire(schedule: &Schedule, tz: Tz) -> bool { let now = Utc::now().with_timezone(&tz); - let minute_start = now - .with_second(0).unwrap() - .with_nanosecond(0).unwrap(); + let minute_start = now.with_second(0).unwrap().with_nanosecond(0).unwrap(); let query_from = minute_start - chrono::Duration::seconds(1); schedule .after(&query_from) @@ -39,20 +37,35 @@ pub fn should_fire(schedule: &Schedule, tz: Tz) -> bool { const VALID_PLATFORMS: &[&str] = &["discord", "slack"]; /// Validate all cronjob configs (fail-fast on bad cron expressions or timezones). -pub fn validate_cronjobs(cronjobs: &[CronJobConfig], configured_platforms: &[&str]) -> anyhow::Result<()> { +pub fn validate_cronjobs( + cronjobs: &[CronJobConfig], + configured_platforms: &[&str], +) -> anyhow::Result<()> { for (i, job) in cronjobs.iter().enumerate() { - if !job.enabled { continue; } + if !job.enabled { + continue; + } parse_cron_expr(&job.schedule).map_err(|e| { - anyhow::anyhow!("cronjobs[{i}]: invalid cron expression {:?}: {e}", job.schedule) + anyhow::anyhow!( + "cronjobs[{i}]: invalid cron expression {:?}: {e}", + job.schedule + ) })?; job.timezone.parse::().map_err(|e| { anyhow::anyhow!("cronjobs[{i}]: invalid timezone {:?}: {e}", job.timezone) })?; if !VALID_PLATFORMS.contains(&job.platform.as_str()) { - anyhow::bail!("cronjobs[{i}]: unknown platform {:?} (expected one of: {VALID_PLATFORMS:?})", job.platform); + anyhow::bail!( + "cronjobs[{i}]: unknown platform {:?} (expected one of: {VALID_PLATFORMS:?})", + job.platform + ); } if !configured_platforms.contains(&job.platform.as_str()) { - anyhow::bail!("cronjobs[{i}]: platform {:?} is not configured — add [{}] to config.toml", job.platform, job.platform); + anyhow::bail!( + "cronjobs[{i}]: platform {:?} is not configured — add [{}] to config.toml", + job.platform, + job.platform + ); } } Ok(()) @@ -183,7 +196,9 @@ pub async fn run_scheduler( if baseline_jobs.is_empty() && usercron_jobs.is_empty() { if usercron_path.is_some() { - info!("no cronjobs yet, but usercron_path is set — scheduler will watch for cronjob.toml"); + info!( + "no cronjobs yet, but usercron_path is set — scheduler will watch for cronjob.toml" + ); } else { debug!("no cronjobs configured, scheduler not started"); return; @@ -191,14 +206,23 @@ pub async fn run_scheduler( } let total = baseline_jobs.len() + usercron_jobs.len(); - info!(baseline = baseline_jobs.len(), usercron = usercron_jobs.len(), total, "cron scheduler started"); + info!( + baseline = baseline_jobs.len(), + usercron = usercron_jobs.len(), + total, + "cron scheduler started" + ); let in_flight: Arc>> = Arc::new(Mutex::new(HashSet::new())); // Align to next minute boundary let now = Utc::now(); let secs_into_minute = now.timestamp() % 60; - let align_delay = if secs_into_minute == 0 { 0 } else { 60 - secs_into_minute as u64 }; + let align_delay = if secs_into_minute == 0 { + 0 + } else { + 60 - secs_into_minute as u64 + }; if align_delay > 0 { debug!(align_secs = align_delay, "aligning to next minute boundary"); tokio::time::sleep(std::time::Duration::from_secs(align_delay)).await; @@ -301,7 +325,10 @@ async fn fire_cronjob( adapters: &HashMap>, in_flight: Arc>>, ) { - let _guard = InFlightGuard { idx, set: in_flight }; + let _guard = InFlightGuard { + idx, + set: in_flight, + }; let adapter = match adapters.get(&job.platform) { Some(a) => a.clone(), @@ -319,7 +346,13 @@ async fn fire_cronjob( origin_event_id: None, }; - let trigger_msg = match adapter.send_message(&thread_channel, &format!("🕐 [{}]: {}", job.sender_name, job.message)).await { + let trigger_msg = match adapter + .send_message( + &thread_channel, + &format!("🕐 [{}]: {}", job.sender_name, job.message), + ) + .await + { Ok(msg) => msg, Err(e) => { error!(channel = %job.channel, error = %e, "failed to send cron message"); @@ -331,11 +364,19 @@ async fn fire_cronjob( thread_channel.clone() } else { let thread_name = format::shorten_thread_name(&job.message); - match adapter.create_thread(&thread_channel, &trigger_msg, &thread_name).await { + match adapter + .create_thread(&thread_channel, &trigger_msg, &thread_name) + .await + { Ok(ch) => ch, Err(e) => { error!(channel = %job.channel, error = %e, "failed to create cron thread"); - let _ = adapter.send_message(&thread_channel, &format!("⚠️ cronjob: failed to create thread: {e}")).await; + let _ = adapter + .send_message( + &thread_channel, + &format!("⚠️ cronjob: failed to create thread: {e}"), + ) + .await; return; } } @@ -347,8 +388,15 @@ async fn fire_cronjob( sender_name: job.sender_name.clone(), display_name: job.sender_name.clone(), channel: job.platform.clone(), - channel_id: reply_channel.parent_id.as_deref().unwrap_or(&reply_channel.channel_id).to_string(), - thread_id: reply_channel.thread_id.clone().or(Some(reply_channel.channel_id.clone())), + channel_id: reply_channel + .parent_id + .as_deref() + .unwrap_or(&reply_channel.channel_id) + .to_string(), + thread_id: reply_channel + .thread_id + .clone() + .or(Some(reply_channel.channel_id.clone())), is_bot: true, timestamp: Some(Utc::now().to_rfc3339()), }; @@ -372,7 +420,9 @@ async fn fire_cronjob( .await { error!("cron handle_message error: {e}"); - let _ = adapter.send_message(&reply_channel, &format!("⚠️ cronjob error: {e}")).await; + let _ = adapter + .send_message(&reply_channel, &format!("⚠️ cronjob error: {e}")) + .await; } } @@ -502,12 +552,16 @@ thread_id = "789" fn load_usercron_valid_file() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("cronjob.toml"); - std::fs::write(&path, r#" + std::fs::write( + &path, + r#" [[jobs]] schedule = "* * * * *" channel = "123" message = "ping" -"#).unwrap(); +"#, + ) + .unwrap(); let jobs = load_usercron_file(&path, &["discord"]); assert_eq!(jobs.len(), 1); assert_eq!(jobs[0].message, "ping"); @@ -526,7 +580,9 @@ message = "ping" fn load_usercron_skips_invalid_entries() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("cronjob.toml"); - std::fs::write(&path, r#" + std::fs::write( + &path, + r#" [[jobs]] schedule = "* * * * *" channel = "123" @@ -536,7 +592,9 @@ message = "good" schedule = "bad cron" channel = "456" message = "bad" -"#).unwrap(); +"#, + ) + .unwrap(); let jobs = load_usercron_file(&path, &["discord"]); assert_eq!(jobs.len(), 1); assert_eq!(jobs[0].message, "good"); @@ -546,7 +604,9 @@ message = "bad" fn load_usercron_skips_unconfigured_platform() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("cronjob.toml"); - std::fs::write(&path, r#" + std::fs::write( + &path, + r#" [[jobs]] schedule = "* * * * *" channel = "123" @@ -557,7 +617,9 @@ schedule = "* * * * *" channel = "456" message = "slack job" platform = "slack" -"#).unwrap(); +"#, + ) + .unwrap(); // Only discord configured let jobs = load_usercron_file(&path, &["discord"]); assert_eq!(jobs.len(), 1); @@ -569,9 +631,14 @@ platform = "slack" #[test] fn validate_cronjobs_valid_passes() { let jobs = vec![CronJobConfig { - enabled: true, schedule: "0 9 * * 1-5".into(), channel: "123".into(), - message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), - thread_id: None, timezone: "UTC".into(), + enabled: true, + schedule: "0 9 * * 1-5".into(), + channel: "123".into(), + message: "hi".into(), + platform: "discord".into(), + sender_name: "test".into(), + thread_id: None, + timezone: "UTC".into(), }]; assert!(validate_cronjobs(&jobs, &["discord"]).is_ok()); } @@ -579,9 +646,14 @@ platform = "slack" #[test] fn validate_cronjobs_invalid_cron_fails() { let jobs = vec![CronJobConfig { - enabled: true, schedule: "bad".into(), channel: "123".into(), - message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), - thread_id: None, timezone: "UTC".into(), + enabled: true, + schedule: "bad".into(), + channel: "123".into(), + message: "hi".into(), + platform: "discord".into(), + sender_name: "test".into(), + thread_id: None, + timezone: "UTC".into(), }]; let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); assert!(err.to_string().contains("invalid cron expression")); @@ -590,9 +662,14 @@ platform = "slack" #[test] fn validate_cronjobs_invalid_timezone_fails() { let jobs = vec![CronJobConfig { - enabled: true, schedule: "* * * * *".into(), channel: "123".into(), - message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), - thread_id: None, timezone: "Mars/Olympus".into(), + enabled: true, + schedule: "* * * * *".into(), + channel: "123".into(), + message: "hi".into(), + platform: "discord".into(), + sender_name: "test".into(), + thread_id: None, + timezone: "Mars/Olympus".into(), }]; let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); assert!(err.to_string().contains("invalid timezone")); @@ -601,9 +678,14 @@ platform = "slack" #[test] fn validate_cronjobs_unknown_platform_fails() { let jobs = vec![CronJobConfig { - enabled: true, schedule: "* * * * *".into(), channel: "123".into(), - message: "hi".into(), platform: "telegram".into(), sender_name: "test".into(), - thread_id: None, timezone: "UTC".into(), + enabled: true, + schedule: "* * * * *".into(), + channel: "123".into(), + message: "hi".into(), + platform: "telegram".into(), + sender_name: "test".into(), + thread_id: None, + timezone: "UTC".into(), }]; let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); assert!(err.to_string().contains("unknown platform")); @@ -612,9 +694,14 @@ platform = "slack" #[test] fn validate_cronjobs_unconfigured_platform_fails() { let jobs = vec![CronJobConfig { - enabled: true, schedule: "* * * * *".into(), channel: "123".into(), - message: "hi".into(), platform: "slack".into(), sender_name: "test".into(), - thread_id: None, timezone: "UTC".into(), + enabled: true, + schedule: "* * * * *".into(), + channel: "123".into(), + message: "hi".into(), + platform: "slack".into(), + sender_name: "test".into(), + thread_id: None, + timezone: "UTC".into(), }]; let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); assert!(err.to_string().contains("not configured")); @@ -623,9 +710,14 @@ platform = "slack" #[test] fn validate_cronjobs_disabled_with_invalid_cron_passes() { let jobs = vec![CronJobConfig { - enabled: false, schedule: "bad".into(), channel: "123".into(), - message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), - thread_id: None, timezone: "UTC".into(), + enabled: false, + schedule: "bad".into(), + channel: "123".into(), + message: "hi".into(), + platform: "discord".into(), + sender_name: "test".into(), + thread_id: None, + timezone: "UTC".into(), }]; assert!(validate_cronjobs(&jobs, &["discord"]).is_ok()); } @@ -633,9 +725,14 @@ platform = "slack" #[test] fn validate_cronjobs_enabled_with_invalid_cron_still_fails() { let jobs = vec![CronJobConfig { - enabled: true, schedule: "bad".into(), channel: "123".into(), - message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), - thread_id: None, timezone: "UTC".into(), + enabled: true, + schedule: "bad".into(), + channel: "123".into(), + message: "hi".into(), + platform: "discord".into(), + sender_name: "test".into(), + thread_id: None, + timezone: "UTC".into(), }]; assert!(validate_cronjobs(&jobs, &["discord"]).is_err()); } diff --git a/src/discord.rs b/src/discord.rs index 13987dea..09e918f5 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -256,6 +256,12 @@ impl Handler { #[serenity::async_trait] impl EventHandler for Handler { async fn message(&self, ctx: Context, msg: Message) { + tracing::info!( + author = %msg.author.name, + content = %msg.content, + channel_id = %msg.channel_id, + "received discord message" + ); let bot_id = ctx.cache.current_user().id; // Early multibot detection: cache that another bot is present. diff --git a/src/gateway.rs b/src/gateway.rs index 8aed6aab..6d76a9e1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,5 +1,7 @@ use crate::acp::ContentBlock; -use crate::adapter::{AdapterRouter, ChannelRef, ChatAdapter, MessageRef, SenderContext}; +use crate::adapter::{AdapterRouter, ChannelRef, ChatAdapter, MessageContext, MessageRef, SenderContext}; +use crate::config::SttConfig; +use crate::media; use anyhow::Result; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; @@ -27,6 +29,24 @@ struct GatewayEvent { #[allow(dead_code)] mentions: Vec, message_id: String, + #[serde(default)] + attachments: Vec, +} + +#[derive(Clone, Debug, Deserialize)] +struct GwAttachment { + #[serde(rename = "type")] + attachment_type: String, + #[serde(default)] + url: Option, + #[serde(default)] + mime_type: Option, + #[serde(default)] + filename: Option, + #[serde(default)] + size: Option, + #[serde(default)] + data: Option, } #[derive(Clone, Debug, Deserialize)] @@ -55,16 +75,7 @@ struct GwContent { attachments: Vec, } -#[derive(Clone, Debug, Deserialize)] -struct GwAttachment { - #[serde(rename = "type")] - attachment_type: String, - filename: String, - mime_type: String, - data: String, - #[allow(dead_code)] - size: u64, -} + #[derive(Serialize)] struct GatewayReply { @@ -107,12 +118,16 @@ struct GatewayResponse { // --- GatewayAdapter: ChatAdapter over WebSocket --- type PendingRequests = Arc>>>; -type SharedWsTx = Arc, +type SharedWsTx = Arc< + Mutex< + futures_util::stream::SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + Message, + >, >, - Message, ->>>; +>; pub struct GatewayAdapter { ws_tx: SharedWsTx, @@ -487,6 +502,81 @@ pub struct GatewayParams { pub allow_all_users: bool, pub allowed_users: Vec, pub streaming: bool, + pub stt_config: SttConfig, +} + +fn attachment_filename(attachment: &GwAttachment) -> &str { + attachment + .filename + .as_deref() + .unwrap_or(match attachment.attachment_type.as_str() { + "image" => "gateway-image", + "audio" => "gateway-audio", + _ => "gateway-attachment", + }) +} + +async fn build_attachment_blocks( + attachments: &[GwAttachment], + stt_config: &SttConfig, +) -> Vec { + let mut blocks = Vec::new(); + + for attachment in attachments { + match attachment.attachment_type.as_str() { + "image" => { + if let Some(block) = media::download_and_encode_image( + attachment.url.as_deref().unwrap_or(""), + attachment.mime_type.as_deref(), + attachment_filename(attachment), + attachment.size.unwrap_or(0), + None, + ) + .await + { + blocks.push(block); + } else { + // Issue #690 review fix: Don't leak internal URL to LLM on failure + blocks.push(ContentBlock::Text { + text: "[Image attachment failed to load]".to_string(), + }); + } + } + "audio" => { + let mut added_transcript = false; + if stt_config.enabled { + if let Some(transcript) = media::download_and_transcribe( + attachment.url.as_deref().unwrap_or(""), + attachment_filename(attachment), + attachment.mime_type.as_deref().unwrap_or("audio/ogg"), + attachment.size.unwrap_or(0), + stt_config, + None, + ) + .await + { + blocks.push(ContentBlock::Text { + text: format!("[Voice message transcript]: {transcript}"), + }); + added_transcript = true; + } + } + if !added_transcript { + blocks.push(ContentBlock::Text { + text: format!("[Audio attachment URL]: {}", attachment.url.as_deref().unwrap_or("N/A")), + }); + } + } + _ => blocks.push(ContentBlock::Text { + text: format!( + "[{} attachment URL]: {}", + attachment.attachment_type, attachment.url.as_deref().unwrap_or("N/A") + ), + }), + } + } + + blocks } pub async fn run_gateway_adapter( @@ -505,6 +595,7 @@ pub async fn run_gateway_adapter( let allow_all_users = params.allow_all_users; let allowed_users = params.allowed_users; let streaming = params.streaming; + let stt_config = params.stt_config; let connect_url = match ¶ms.token { Some(token) => { @@ -548,8 +639,12 @@ pub async fn run_gateway_adapter( let (ws_tx, mut ws_rx) = ws_stream.split(); let ws_tx: SharedWsTx = Arc::new(Mutex::new(ws_tx)); let pending: PendingRequests = Arc::new(Mutex::new(HashMap::new())); - let adapter: Arc = - Arc::new(GatewayAdapter::new(ws_tx.clone(), pending.clone(), platform, streaming)); + let adapter: Arc = Arc::new(GatewayAdapter::new( + ws_tx.clone(), + pending.clone(), + platform, + streaming, + )); let slash_ws_tx = ws_tx.clone(); // for fire-and-forget slash command responses let mut tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new(); @@ -648,32 +743,8 @@ pub async fn run_gateway_adapter( let adapter = adapter.clone(); let prompt = event.content.text.clone(); - let sender_name = event.sender.name.clone(); - let sender_id = event.sender.id.clone(); - let dispatcher = dispatcher.clone(); - - // Convert gateway attachments to ContentBlocks - let mut extra_blocks = Vec::new(); - for att in &event.content.attachments { - match att.attachment_type.as_str() { - "image" => { - extra_blocks.push(ContentBlock::Image { - media_type: att.mime_type.clone(), - data: att.data.clone(), - }); - } - "text_file" => { - use base64::Engine; - if let Ok(bytes) = base64::engine::general_purpose::STANDARD.decode(&att.data) { - let text = String::from_utf8_lossy(&bytes); - extra_blocks.push(ContentBlock::Text { - text: format!("```{}\n{}\n```", att.filename, text), - }); - } - } - _ => {} - } - } + let attachments = event.attachments.clone(); + let stt_config = stt_config.clone(); // Slash command interception for gateway platforms // (Feishu/LINE/Telegram don't have native slash commands) @@ -710,6 +781,7 @@ pub async fn run_gateway_adapter( } } + let router = router.clone(); tasks.spawn(async move { // If supergroup with no thread_id, create a forum topic let thread_channel = if event.channel.channel_type == "supergroup" @@ -727,33 +799,20 @@ pub async fn run_gateway_adapter( channel.clone() }; - let thread_id = thread_channel - .thread_id - .as_deref() - .unwrap_or(&thread_channel.channel_id); - let thread_key = dispatcher.key( - &thread_channel.platform, - thread_id, - &sender_id, - ); - let estimated_tokens = - crate::dispatch::estimate_tokens(&prompt, &extra_blocks); - let buf_msg = crate::dispatch::BufferedMessage { - sender_json, - sender_name, - prompt, + let extra_blocks = + build_attachment_blocks(&attachments, &stt_config).await; + + let ctx = MessageContext { + thread_channel: thread_channel.clone(), + sender_json: sender_json.clone(), + prompt: prompt.clone(), extra_blocks, - trigger_msg, - arrived_at: std::time::Instant::now(), - estimated_tokens, - // TODO: implement gateway multibot detection + trigger_msg: trigger_msg.clone(), other_bot_present: false, }; - if let Err(e) = dispatcher - .submit(thread_key, thread_channel, adapter, buf_msg) - .await - { - error!("gateway dispatcher submit error: {e}"); + + if let Err(e) = router.handle_message(&adapter, ctx).await { + error!("gateway message handling error: {e}"); } }); } diff --git a/src/main.rs b/src/main.rs index 04a0937f..188762f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -269,6 +269,7 @@ async fn main() -> anyhow::Result<()> { allow_all_users: config::resolve_allow_all(gw_cfg.allow_all_users, &gw_cfg.allowed_users), allowed_users: gw_cfg.allowed_users, streaming: gw_cfg.streaming, + stt_config: cfg.stt.clone(), }; let gw_router = router.clone(); Some(tokio::spawn(async move {