diff --git a/refact-agent/engine/src/background_tasks.rs b/refact-agent/engine/src/background_tasks.rs index 5ea2c44a9..60869533a 100644 --- a/refact-agent/engine/src/background_tasks.rs +++ b/refact-agent/engine/src/background_tasks.rs @@ -44,7 +44,6 @@ pub async fn start_background_tasks(gcx: Arc>) -> Backgro tokio::spawn(crate::vecdb::vdb_highlev::vecdb_background_reload(gcx.clone())), // this in turn can create global_context::vec_db tokio::spawn(crate::integrations::sessions::remove_expired_sessions_background_task(gcx.clone())), tokio::spawn(crate::git::cleanup::git_shadow_cleanup_background_task(gcx.clone())), - tokio::spawn(crate::cloud::threads_sub::watch_threads_subscription(gcx.clone())), ]); let ast = gcx.clone().read().await.ast_service.clone(); if let Some(ast_service) = ast { diff --git a/refact-agent/engine/src/cloud/subchat.rs b/refact-agent/engine/src/cloud/subchat.rs index 9981f35f7..9f574027a 100644 --- a/refact-agent/engine/src/cloud/subchat.rs +++ b/refact-agent/engine/src/cloud/subchat.rs @@ -102,7 +102,7 @@ pub async fn subchat( }; let thread_id = thread.ft_id.clone(); - let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id).await; + let connection_result = initialize_connection(&cmd_address_url, &api_key, &located_fgroup_id, &app_searchable_id).await; let mut connection = match connection_result { Ok(conn) => conn, Err(err) => return Err(format!("Failed to initialize WebSocket connection: {}", err)), diff --git a/refact-agent/engine/src/cloud/threads_processing.rs b/refact-agent/engine/src/cloud/threads_processing.rs index c85087cff..f792e1b3a 100644 --- a/refact-agent/engine/src/cloud/threads_processing.rs +++ b/refact-agent/engine/src/cloud/threads_processing.rs @@ -337,7 +337,6 @@ pub async fn process_thread_event( basic_info: BasicStuff, cmd_address_url: String, api_key: String, - app_searchable_id: String, located_fgroup_id: String, ) -> Result<(), String> { if thread_payload.ft_need_tool_calls == -1 @@ -345,17 +344,6 @@ pub async fn process_thread_event( || !thread_payload.ft_locked_by.is_empty() { return Ok(()); } - if let Some(ft_app_searchable) = thread_payload.ft_app_searchable.clone() { - if ft_app_searchable != app_searchable_id { - info!("thread `{}` has different `app_searchable` id, skipping it: {} != {}", - thread_payload.ft_id, app_searchable_id, ft_app_searchable - ); - return Ok(()); - } - } else { - info!("thread `{}` doesn't have the `app_searchable` id, skipping it", thread_payload.ft_id); - return Ok(()); - } if let Some(error) = thread_payload.ft_error.as_ref() { info!("thread `{}` has the error: `{}`. Skipping it", thread_payload.ft_id, error); return Ok(()); diff --git a/refact-agent/engine/src/cloud/threads_sub.rs b/refact-agent/engine/src/cloud/threads_sub.rs index f9aed471d..c0561109a 100644 --- a/refact-agent/engine/src/cloud/threads_sub.rs +++ b/refact-agent/engine/src/cloud/threads_sub.rs @@ -37,10 +37,9 @@ pub struct BasicStuff { pub workspaces: Vec, } -// XXX use xxx_subs::filter for ft_app_capture const THREADS_SUBSCRIPTION_QUERY: &str = r#" - subscription ThreadsPageSubs($located_fgroup_id: String!) { - threads_in_group(located_fgroup_id: $located_fgroup_id) { + subscription ThreadsPageSubs($located_fgroup_id: String!, $filter: [String!]) { + threads_in_group(located_fgroup_id: $located_fgroup_id, filter: $filter) { news_action news_payload_id news_payload { @@ -78,19 +77,23 @@ pub async fn watch_threads_subscription(gcx: Arc>) { let restart_flag = gcx.read().await.threads_subscription_restart_flag.clone(); restart_flag.store(false, Ordering::SeqCst); } - let located_fgroup_id = if let Some(located_fgroup_id) = gcx.read().await.active_group_id.clone() { - located_fgroup_id - } else { - warn!("no active group is set, skipping threads subscription"); - tokio::time::sleep(Duration::from_secs(RECONNECT_DELAY_SECONDS)).await; - continue; + let (located_fgroup_id, app_searchable_id) = { + let gcx_locked = gcx.read().await; + let located_fgroup_id = if let Some(located_fgroup_id) = gcx_locked.active_group_id.clone() { + located_fgroup_id + } else { + warn!("no active group is set, skipping threads subscription"); + tokio::time::sleep(Duration::from_secs(RECONNECT_DELAY_SECONDS)).await; + continue; + }; + (located_fgroup_id, gcx_locked.app_searchable_id.clone()) }; info!( - "starting subscription for threads_in_group with fgroup_id=\"{}\"", - located_fgroup_id + "starting subscription for threads_in_group with fgroup_id=\"{}\" and app_searchable_id=\"{}\"", + located_fgroup_id, app_searchable_id ); - let connection_result = initialize_connection(&address_url, &api_key, &located_fgroup_id).await; + let connection_result = initialize_connection(&address_url, &api_key, &located_fgroup_id, &app_searchable_id).await; let mut connection = match connection_result { Ok(conn) => conn, Err(err) => { @@ -129,6 +132,7 @@ pub async fn initialize_connection( cmd_address_url: &str, api_key: &str, located_fgroup_id: &str, + app_searchable_id: &str, ) -> Result< futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< @@ -208,7 +212,8 @@ pub async fn initialize_connection( "payload": { "query": THREADS_SUBSCRIPTION_QUERY, "variables": { - "located_fgroup_id": located_fgroup_id + "located_fgroup_id": located_fgroup_id, + "filter": [format!("ft_app_searchable:eq:{}", app_searchable_id)] } } }); @@ -231,7 +236,6 @@ async fn actual_subscription_loop( located_fgroup_id: &str, ) -> Result<(), String> { info!("cloud threads subscription started, waiting for events..."); - let app_searchable_id = gcx.read().await.app_searchable_id.clone(); let basic_info = get_basic_info(cmd_address_url, api_key).await?; while let Some(msg) = connection.next().await { if gcx.clone().read().await.shutdown_flag.load(Ordering::SeqCst) { @@ -266,11 +270,10 @@ async fn actual_subscription_loop( let basic_info_clone = basic_info.clone(); let cmd_address_url_clone = cmd_address_url.to_string(); let api_key_clone = api_key.to_string(); - let app_searchable_id_clone = app_searchable_id.clone(); let located_fgroup_id_clone = located_fgroup_id.to_string(); tokio::spawn(async move { crate::cloud::threads_processing::process_thread_event( - gcx_clone, payload_clone, basic_info_clone, cmd_address_url_clone, api_key_clone, app_searchable_id_clone, located_fgroup_id_clone + gcx_clone, payload_clone, basic_info_clone, cmd_address_url_clone, api_key_clone, located_fgroup_id_clone ).await }); } else { diff --git a/refact-agent/engine/src/constants.rs b/refact-agent/engine/src/constants.rs index 8fb3df11e..22c978fa7 100644 --- a/refact-agent/engine/src/constants.rs +++ b/refact-agent/engine/src/constants.rs @@ -1,35 +1,56 @@ +use std::net::IpAddr; + use tracing::info; use url::Url; const BASE_REFACT_URL: &str = "app.refact.ai"; -/// Extracts the host (and optional port) from a URL string, e.g.: -/// ws://app.refact.ai/v1/graphql -> app.refact.ai -/// https://example.com:8080/path -> example.com:8080 -/// app.refact.ai -> app.refact.ai -fn extract_base_host(address: &str) -> String { +/// Extracts the host (and optional port) from a URL string and determines if the protocol is secure, e.g.: +/// ws://app.refact.ai/v1/graphql -> (app.refact.ai, Some(false)) +/// https://example.com:8080/path -> (example.com:8080, Some(true)) +/// app.refact.ai -> (app.refact.ai, None) +fn get_host_and_is_protocol_secure(address: &str) -> (String, Option) { if let Ok(url) = Url::parse(address) { if let Some(host) = url.host_str() { - return if let Some(port) = url.port() { + let host_with_port = if let Some(port) = url.port() { format!("{}:{}", host, port) } else { host.to_string() - } + }; + + let is_secure = match url.scheme() { + "https" | "wss" => Some(true), + "http" | "ws" => Some(false), + _ => None, + }; + + return (host_with_port, is_secure); } } + let mut address = address; - for prefix in ["ws://", "wss://", "http://", "https://"] { + let mut is_secure = None; + + for (prefix, secure) in [ + ("https://", Some(true)), + ("wss://", Some(true)), + ("http://", Some(false)), + ("ws://", Some(false)), + ] { if let Some(stripped) = address.strip_prefix(prefix) { address = stripped; + is_secure = secure; break; } } - let address = address; - if let Some(idx) = address.find('/') { + + let host = if let Some(idx) = address.find('/') { address[..idx].to_string() } else { address.to_string() - } + }; + + (host, is_secure) } fn is_localhost(address: &str) -> bool { @@ -50,8 +71,11 @@ fn is_localhost(address: &str) -> bool { address.to_string() } }; + if let Ok(ip) = address.parse::() { + return ip.is_loopback(); + } match address.to_ascii_lowercase().as_str() { - "localhost" | "127.0.0.1" | "::1" | "[::1]" => true, + "localhost" | "127.0.0.1" | "::1" | "[::1]" | "host.docker.internal" => true, _ => false, } } @@ -60,9 +84,13 @@ pub fn get_cloud_url(cmd_address_url: &str) -> String { let final_address = if cmd_address_url.to_lowercase() == "refact" { format!("https://{}/v1", BASE_REFACT_URL) } else { - let base_part = extract_base_host(cmd_address_url); - let protocol = if is_localhost(&base_part) { "http" } else { "https" }; - format!("{}://{}/v1", protocol, base_part) + let (host, is_secure) = get_host_and_is_protocol_secure(cmd_address_url); + let protocol = match is_secure { + Some(true) => "https", + Some(false) => "http", + None => if is_localhost(&host) { "http" } else { "https" }, + }; + format!("{}://{}/v1", protocol, host) }; info!("resolved cloud url: {}", final_address); final_address @@ -72,9 +100,13 @@ pub fn get_graphql_ws_url(cmd_address_url: &str) -> String { let final_address = if cmd_address_url.to_lowercase() == "refact" { format!("ws://{}/v1/graphql", BASE_REFACT_URL) } else { - let base_part = extract_base_host(cmd_address_url); - let protocol = if is_localhost(&base_part) { "ws" } else { "wss" }; - format!("{}://{}/v1/graphql", protocol, base_part) + let (host, is_secure) = get_host_and_is_protocol_secure(cmd_address_url); + let protocol = match is_secure { + Some(true) => "wss", + Some(false) => "ws", + None => if is_localhost(&host) { "ws" } else { "wss" }, + }; + format!("{}://{}/v1/graphql", protocol, host) }; info!("resolved graphql ws url: {}", final_address); final_address @@ -84,9 +116,13 @@ pub fn get_graphql_url(cmd_address_url: &str) -> String { let final_address = if cmd_address_url.to_lowercase() == "refact" { format!("https://{}/v1/graphql", BASE_REFACT_URL) } else { - let base_part = extract_base_host(cmd_address_url); - let protocol = if is_localhost(&base_part) { "http" } else { "https" }; - format!("{}://{}/v1/graphql", protocol, base_part) + let (host, is_secure) = get_host_and_is_protocol_secure(cmd_address_url); + let protocol = match is_secure { + Some(true) => "https", + Some(false) => "http", + None => if is_localhost(&host) { "http" } else { "https" }, + }; + format!("{}://{}/v1/graphql", protocol, host) }; info!("resolved graphql url: {}", final_address); final_address diff --git a/refact-agent/engine/src/files_in_workspace.rs b/refact-agent/engine/src/files_in_workspace.rs index a90b1252b..39d4805f0 100644 --- a/refact-agent/engine/src/files_in_workspace.rs +++ b/refact-agent/engine/src/files_in_workspace.rs @@ -696,11 +696,18 @@ pub async fn on_workspaces_init(gcx: Arc>) -> i32 { // Called from lsp and lsp_like // Not called from main.rs as part of initialization - let folders = gcx.read().await.documents_state.workspace_folders.lock().unwrap().clone(); - let old_app_searchable_id = gcx.read().await.app_searchable_id.clone(); - let new_app_searchable_id = get_app_searchable_id(&folders); + let (folders, old_app_searchable_id, cmdline_app_searchable_id) = { + let gcx_locked = gcx.read().await; + let folders = gcx_locked.documents_state.workspace_folders.lock().unwrap().clone(); + ( + folders, + gcx_locked.app_searchable_id.clone(), + gcx_locked.cmdline.app_searchable_id.clone(), + ) + }; + let new_app_searchable_id = get_app_searchable_id(&folders, &cmdline_app_searchable_id); if old_app_searchable_id != new_app_searchable_id { - gcx.write().await.app_searchable_id = get_app_searchable_id(&folders); + gcx.write().await.app_searchable_id = new_app_searchable_id; crate::cloud::threads_sub::trigger_threads_subscription_restart(gcx.clone()).await; } watcher_init(gcx.clone()).await; diff --git a/refact-agent/engine/src/global_context.rs b/refact-agent/engine/src/global_context.rs index 425aa1091..fc32a04f0 100644 --- a/refact-agent/engine/src/global_context.rs +++ b/refact-agent/engine/src/global_context.rs @@ -104,6 +104,8 @@ pub struct CommandLine { #[structopt(long, help="An pre-setup active group id")] pub active_group_id: Option, + #[structopt(long, default_value="", help="Custom app_searchable_id. It overrides the generated default.")] + pub app_searchable_id: String, } pub struct AtCommandsPreviewCache { @@ -195,10 +197,10 @@ pub async fn migrate_to_config_folder( } #[cfg(target_os = "macos")] -pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { +pub fn get_local_machine_id() -> String { use std::process::Command; use rand::Rng; - + // Try multiple methods to get a unique machine identifier on macOS let machine_id = { // First attempt: Use system_profiler to get hardware UUID (most reliable) @@ -217,13 +219,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().to_string()) }) }); - + if let Some(uuid) = hardware_uuid { if !uuid.trim().is_empty() { return uuid; } } - + // Second attempt: Try to get the serial number let serial_number = Command::new("system_profiler") .args(&["SPHardwareDataType"]) @@ -239,13 +241,13 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().to_string()) }) }); - + if let Some(serial) = serial_number { if !serial.trim().is_empty() { return serial; } } - + // Third attempt: Try to get the MAC address using ifconfig let mac_address = Command::new("ifconfig") .args(&["en0"]) @@ -261,30 +263,24 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .map(|s| s.trim().replace(":", "")) }) }); - + if let Some(mac) = mac_address { if !mac.trim().is_empty() && mac != "000000000000" { return mac; } } - + // Final fallback: Generate a random ID and store it persistently // This is just a temporary solution in case all other methods fail let mut rng = rand::thread_rng(); format!("macos-{:016x}", rng.gen::()) }; - let folders = workspace_folders - .iter() - .map(|p| p.file_name().unwrap_or_default().to_string_lossy().to_string()) - .collect::>() - .join(";"); - - format!("{}-{}", machine_id, folders) + machine_id } #[cfg(all(not(target_os = "windows"), not(target_os = "macos")))] -pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { +pub fn get_local_machine_id() -> String { let mac = pnet_datalink::interfaces() .into_iter() .find(|iface: &pnet_datalink::NetworkInterface| { @@ -293,30 +289,32 @@ pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { .and_then(|iface| iface.mac) .map(|mac| mac.to_string().replace(":", "")) .unwrap_or_else(|| "no-mac".to_string()); - - let folders = workspace_folders - .iter() - .map(|p| p.file_name().unwrap_or_default().to_string_lossy().to_string()) - .collect::>() - .join(";"); - - format!("{}-{}", mac, folders) + mac } #[cfg(target_os = "windows")] -pub fn get_app_searchable_id(workspace_folders: &[PathBuf]) -> String { +pub fn get_local_machine_id() -> String { use winreg::enums::*; use winreg::RegKey; let machine_guid = RegKey::predef(HKEY_LOCAL_MACHINE) .open_subkey("SOFTWARE\\Microsoft\\Cryptography") .and_then(|key| key.get_value::("MachineGuid")) .unwrap_or_else(|_| "no-machine-guid".to_string()); + machine_guid +} + +pub fn get_app_searchable_id(workspace_folders: &[PathBuf], cmdline_app_searchable_id: &str) -> String { + if !cmdline_app_searchable_id.is_empty() { + return cmdline_app_searchable_id.to_string(); + } + let folders = workspace_folders .iter() .map(|p| p.file_name().unwrap_or_default().to_string_lossy().to_string()) .collect::>() .join(";"); - format!("{}-{}", machine_guid, folders) + + format!("{}-{}", get_local_machine_id(), folders) } pub async fn try_load_caps_quickly_if_not_present( @@ -497,7 +495,7 @@ pub async fn create_global_context( init_shadow_repos_background_task_holder: BackgroundTasksHolder::new(vec![]), init_shadow_repos_lock: Arc::new(AMutex::new(false)), git_operations_abort_flag: Arc::new(AtomicBool::new(false)), - app_searchable_id: get_app_searchable_id(&workspace_dirs), + app_searchable_id: get_app_searchable_id(&workspace_dirs, &cmdline.app_searchable_id), threads_subscription_restart_flag: Arc::new(AtomicBool::new(false)), }; let gcx = Arc::new(ARwLock::new(cx)); diff --git a/refact-agent/engine/src/main.rs b/refact-agent/engine/src/main.rs index 2221473f0..56dcf23e6 100644 --- a/refact-agent/engine/src/main.rs +++ b/refact-agent/engine/src/main.rs @@ -189,6 +189,12 @@ async fn main() { background_tasks.push_back(spawn_lsp_task(gcx.clone(), cmdline.clone()).await.unwrap()) } } + if main_handle.is_none() { + main_handle = Some(tokio::spawn(crate::cloud::threads_sub::watch_threads_subscription(gcx.clone()))); + } else { + background_tasks.push_back(tokio::spawn(crate::cloud::threads_sub::watch_threads_subscription(gcx.clone()))); + } + if main_handle.is_some() { let _ = main_handle.unwrap().await; }