diff --git a/rust/src/core/chat_media.rs b/rust/src/core/chat_media.rs index 8886077b0..0a4831cfa 100644 --- a/rust/src/core/chat_media.rs +++ b/rust/src/core/chat_media.rs @@ -1746,7 +1746,7 @@ impl AppCore { self.last_outgoing_ts }; - let (client, wrapper, relays, rumor_id_hex) = { + let (client, relays, rumor, rumor_id_hex) = { let Some(sess) = self.session.as_mut() else { return; }; @@ -1788,6 +1788,149 @@ impl AppCore { }, ); + let relays: Vec = if network_enabled { + sess.mdk + .get_relays(&group.mls_group_id) + .ok() + .map(|s| s.into_iter().collect()) + .filter(|v: &Vec| !v.is_empty()) + .unwrap_or_else(|| fallback_relays.clone()) + } else { + vec![] + }; + + (sess.client.clone(), relays, rumor, rumor_id_hex) + }; + + self.prune_local_outbox(&chat_id); + self.refresh_chat_list_from_storage(); + self.refresh_current_chat_if_open(&chat_id); + + if !network_enabled { + let _ = self.core_sender.send(CoreMsg::Internal(Box::new( + InternalEvent::PublishMessageResult { + chat_id, + rumor_id: rumor_id_hex, + ok: false, + error: Some("offline".into()), + }, + ))); + return; + } + + let tx = self.core_sender.clone(); + let mut seen_ids = self.processed_group_event_ids.clone(); + let chat_id_for_task = chat_id.clone(); + self.runtime.spawn(async move { + let filter = Filter::new() + .kind(Kind::MlsGroupMessage) + .custom_tags( + SingleLetterTag::lowercase(Alphabet::H), + vec![chat_id_for_task.clone()], + ) + .limit(200); + let (mut backlog_events, error) = match client + .fetch_events_from(relays.clone(), filter, std::time::Duration::from_secs(8)) + .await + { + Ok(evts) => (evts.into_iter().collect::>(), None), + Err(e) => (Vec::new(), Some(format!("group backlog fetch failed: {e}"))), + }; + backlog_events.retain(|ev| seen_ids.insert(ev.id)); + backlog_events.sort_by_key(|ev| ev.created_at.as_secs()); + + let _ = tx.send(CoreMsg::Internal(Box::new( + InternalEvent::GroupSendCatchupCompleted { + chat_id: chat_id_for_task, + rumor, + backlog_events, + error, + }, + ))); + }); + } + + pub(super) fn handle_group_send_catchup_completed( + &mut self, + chat_id: String, + rumor: UnsignedEvent, + backlog_events: Vec, + error: Option, + ) { + if let Some(err) = error { + tracing::warn!(%chat_id, %err, "group send backlog fetch failed"); + } + self.ingest_group_backlog_events(backlog_events); + self.publish_prepared_group_rumor(chat_id, rumor); + } + + fn ingest_group_backlog_events(&mut self, backlog_events: Vec) { + let mut remaining = backlog_events; + let mut processed_any = false; + for _ in 0..3 { + if remaining.is_empty() { + break; + } + let current = std::mem::take(&mut remaining); + let mut next = Vec::new(); + for event in current.into_iter() { + let event_id = event.id; + let result = { + let Some(sess) = self.session.as_mut() else { + return; + }; + sess.mdk.process_message(&event) + }; + match result { + Ok(r) => { + self.note_processed_group_event_id_in_memory(event_id); + processed_any = true; + self.handle_message_processing_result(r); + } + Err(e) => { + tracing::debug!(event_id = %event.id.to_hex(), %e, "deferred backlog message"); + next.push(event); + } + } + } + if next.is_empty() { + break; + } + remaining = next; + } + if !remaining.is_empty() { + tracing::warn!(remaining = remaining.len(), "failed to ingest some backlog events"); + } + if processed_any { + self.persist_processed_group_event_ids_cache(); + } + } + + fn publish_prepared_group_rumor(&mut self, chat_id: String, mut rumor: UnsignedEvent) { + let network_enabled = self.network_enabled(); + let fallback_relays = self.default_relays(); + let rumor_id_hex = rumor.id().to_hex(); + + let mark_publish_failed = |core: &mut Self, reason: &str| { + core.handle_publish_message_result( + chat_id.clone(), + rumor_id_hex.clone(), + false, + Some(reason.to_string()), + ); + }; + + let (client, wrapper, relays) = { + let Some(sess) = self.session.as_mut() else { + mark_publish_failed(self, "session lost"); + return; + }; + let Some(group) = sess.groups.get(&chat_id).cloned() else { + self.toast("Chat not found"); + mark_publish_failed(self, "chat not found"); + return; + }; + let wrapper = match sess.mdk.create_message(&group.mls_group_id, rumor) { Ok(e) => e, Err(e) => { @@ -1821,13 +1964,9 @@ impl AppCore { vec![] }; - (sess.client.clone(), wrapper, relays, rumor_id_hex) + (sess.client.clone(), wrapper, relays) }; - self.prune_local_outbox(&chat_id); - self.refresh_chat_list_from_storage(); - self.refresh_current_chat_if_open(&chat_id); - if !network_enabled { let _ = self.core_sender.send(CoreMsg::Internal(Box::new( InternalEvent::PublishMessageResult { diff --git a/rust/src/core/mod.rs b/rust/src/core/mod.rs index b25512f29..1f200a724 100644 --- a/rust/src/core/mod.rs +++ b/rust/src/core/mod.rs @@ -771,6 +771,7 @@ pub struct AppCore { // Nostr kind:0 profile cache (survives across session refreshes). profiles: HashMap, // hex pubkey -> cached global profile group_profiles: HashMap>, // chat_id -> (pubkey -> profile) + processed_group_event_ids: HashSet, profile_db: Option, chat_media_db: Option, @@ -887,6 +888,8 @@ impl AppCore { let push_device_id = Self::load_or_create_push_device_id(&data_dir); let push_subscribed_chat_ids = Self::load_push_subscriptions(&data_dir); + let processed_group_event_ids = + pika_marmot_runtime::load_processed_mls_event_ids(std::path::Path::new(&data_dir)); let mut this = Self { state, @@ -919,6 +922,7 @@ impl AppCore { local_outbox: HashMap::new(), profiles, group_profiles: HashMap::new(), + processed_group_event_ids, profile_db, typing_state: HashMap::new(), last_typing_sent: HashMap::new(), @@ -3139,6 +3143,7 @@ impl AppCore { self.cancel_call_offer_timeout(); self.cancel_voice_recording_ticks(); + self.clear_processed_group_event_ids_cache(); let root = std::path::Path::new(&self.data_dir); match std::fs::read_dir(root) { Ok(entries) => { @@ -3327,6 +3332,12 @@ impl AppCore { ok, error, } => self.handle_publish_message_result(chat_id, rumor_id, ok, error), + InternalEvent::GroupSendCatchupCompleted { + chat_id, + rumor, + backlog_events, + error, + } => self.handle_group_send_catchup_completed(chat_id, rumor, backlog_events, error), InternalEvent::ChatMediaUploadCompleted { request_id, uploaded_url, @@ -4451,6 +4462,7 @@ impl AppCore { } pub(crate) fn handle_group_message(&mut self, event: Event) { + let event_id = event.id; let result = { let Some(sess) = self.session.as_mut() else { tracing::warn!("group_message but no session"); @@ -4465,9 +4477,48 @@ impl AppCore { } } }; + self.note_processed_group_event_id(event_id); self.handle_message_processing_result(result); } + fn note_processed_group_event_id(&mut self, event_id: EventId) { + if !self.processed_group_event_ids.insert(event_id) { + return; + } + self.persist_processed_group_event_ids_cache(); + } + + fn note_processed_group_event_id_in_memory(&mut self, event_id: EventId) { + self.processed_group_event_ids.insert(event_id); + } + + fn persist_processed_group_event_ids_cache(&mut self) { + if let Err(e) = pika_marmot_runtime::persist_processed_mls_event_ids( + std::path::Path::new(&self.data_dir), + &self.processed_group_event_ids, + ) { + tracing::warn!(%e, "failed to persist processed group event ids"); + } + } + + fn clear_processed_group_event_ids_cache(&mut self) { + self.processed_group_event_ids.clear(); + let path = pika_marmot_runtime::processed_mls_event_ids_path( + std::path::Path::new(&self.data_dir), + ); + match std::fs::remove_file(&path) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => { + tracing::warn!( + %e, + path = %path.display(), + "failed to delete processed group event id cache file" + ); + } + } + } + fn handle_message_processing_result(&mut self, result: MessageProcessingResult) { // Phase 1: Extract mls_group_id and optional app message. let (mls_group_id, app_msg) = match result { @@ -4729,6 +4780,7 @@ impl AppCore { tracing::info!(path = %db_path.display(), "deleted mdk db on logout"); } } + self.clear_processed_group_event_ids_cache(); self.clear_push_subscriptions(); self.stop_session(); self.state.auth = AuthState::LoggedOut; diff --git a/rust/src/updates.rs b/rust/src/updates.rs index 46afce20b..147ae9864 100644 --- a/rust/src/updates.rs +++ b/rust/src/updates.rs @@ -55,6 +55,12 @@ pub enum InternalEvent { ok: bool, error: Option, }, + GroupSendCatchupCompleted { + chat_id: String, + rumor: nostr_sdk::prelude::UnsignedEvent, + backlog_events: Vec, + error: Option, + }, ChatMediaUploadCompleted { request_id: String, uploaded_url: Option,