-
Notifications
You must be signed in to change notification settings - Fork 19
Catch up MLS backlog before group sends in app path #513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1746,7 +1746,7 @@ impl AppCore { | |
| self.last_outgoing_ts | ||
| }; | ||
|
|
||
| let (client, wrapper, relays, rumor_id_hex) = { | ||
| let (client, relays, rumor, rumor_id_hex) = { | ||
| let Some(sess) = self.session.as_mut() else { | ||
| return; | ||
| }; | ||
|
|
@@ -1788,6 +1788,149 @@ impl AppCore { | |
| }, | ||
| ); | ||
|
|
||
| let relays: Vec<RelayUrl> = if network_enabled { | ||
| sess.mdk | ||
| .get_relays(&group.mls_group_id) | ||
| .ok() | ||
| .map(|s| s.into_iter().collect()) | ||
| .filter(|v: &Vec<RelayUrl>| !v.is_empty()) | ||
| .unwrap_or_else(|| fallback_relays.clone()) | ||
| } else { | ||
| vec![] | ||
| }; | ||
|
|
||
| (sess.client.clone(), relays, rumor, rumor_id_hex) | ||
| }; | ||
|
|
||
| self.prune_local_outbox(&chat_id); | ||
| self.refresh_chat_list_from_storage(); | ||
| self.refresh_current_chat_if_open(&chat_id); | ||
|
|
||
| if !network_enabled { | ||
| let _ = self.core_sender.send(CoreMsg::Internal(Box::new( | ||
| InternalEvent::PublishMessageResult { | ||
| chat_id, | ||
| rumor_id: rumor_id_hex, | ||
| ok: false, | ||
| error: Some("offline".into()), | ||
| }, | ||
| ))); | ||
| return; | ||
| } | ||
|
|
||
| let tx = self.core_sender.clone(); | ||
| let mut seen_ids = self.processed_group_event_ids.clone(); | ||
| let chat_id_for_task = chat_id.clone(); | ||
| self.runtime.spawn(async move { | ||
| let filter = Filter::new() | ||
| .kind(Kind::MlsGroupMessage) | ||
| .custom_tags( | ||
| SingleLetterTag::lowercase(Alphabet::H), | ||
| vec![chat_id_for_task.clone()], | ||
| ) | ||
| .limit(200); | ||
| let (mut backlog_events, error) = match client | ||
| .fetch_events_from(relays.clone(), filter, std::time::Duration::from_secs(8)) | ||
| .await | ||
| { | ||
| Ok(evts) => (evts.into_iter().collect::<Vec<Event>>(), None), | ||
| Err(e) => (Vec::new(), Some(format!("group backlog fetch failed: {e}"))), | ||
| }; | ||
| backlog_events.retain(|ev| seen_ids.insert(ev.id)); | ||
| backlog_events.sort_by_key(|ev| ev.created_at.as_secs()); | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| let _ = tx.send(CoreMsg::Internal(Box::new( | ||
| InternalEvent::GroupSendCatchupCompleted { | ||
| chat_id: chat_id_for_task, | ||
| rumor, | ||
| backlog_events, | ||
| error, | ||
| }, | ||
| ))); | ||
| }); | ||
|
Comment on lines
+1821
to
+1850
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Crash safety regression: encrypted wrapper not persisted until after 8-second async backlog fetch In the old code, the rumor was encrypted via In the new code, Comparison of persistence timingOld code persisted the encrypted wrapper synchronously before the async send: New code defers persistence past an 8-second async gap: The crash-safety window expanded from near-zero to 8+ seconds. Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
| } | ||
|
|
||
| pub(super) fn handle_group_send_catchup_completed( | ||
| &mut self, | ||
| chat_id: String, | ||
| rumor: UnsignedEvent, | ||
| backlog_events: Vec<Event>, | ||
| error: Option<String>, | ||
| ) { | ||
| if let Some(err) = error { | ||
| tracing::warn!(%chat_id, %err, "group send backlog fetch failed"); | ||
| } | ||
| self.ingest_group_backlog_events(backlog_events); | ||
| self.publish_prepared_group_rumor(chat_id, rumor); | ||
| } | ||
|
|
||
| fn ingest_group_backlog_events(&mut self, backlog_events: Vec<Event>) { | ||
| let mut remaining = backlog_events; | ||
| let mut processed_any = false; | ||
| for _ in 0..3 { | ||
| if remaining.is_empty() { | ||
| break; | ||
| } | ||
| let current = std::mem::take(&mut remaining); | ||
| let mut next = Vec::new(); | ||
| for event in current.into_iter() { | ||
| let event_id = event.id; | ||
| let result = { | ||
| let Some(sess) = self.session.as_mut() else { | ||
| return; | ||
| }; | ||
| sess.mdk.process_message(&event) | ||
| }; | ||
| match result { | ||
| Ok(r) => { | ||
| self.note_processed_group_event_id_in_memory(event_id); | ||
| processed_any = true; | ||
| self.handle_message_processing_result(r); | ||
| } | ||
| Err(e) => { | ||
| tracing::debug!(event_id = %event.id.to_hex(), %e, "deferred backlog message"); | ||
| next.push(event); | ||
| } | ||
| } | ||
| } | ||
| if next.is_empty() { | ||
| break; | ||
| } | ||
| remaining = next; | ||
| } | ||
| if !remaining.is_empty() { | ||
| tracing::warn!(remaining = remaining.len(), "failed to ingest some backlog events"); | ||
| } | ||
| if processed_any { | ||
| self.persist_processed_group_event_ids_cache(); | ||
| } | ||
| } | ||
|
Comment on lines
+1867
to
+1907
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 No tests added for new backlog catchup logic The AGENTS.md rule states: "Always add tests for changes when possible." This PR introduces three new non-trivial functions — Was this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| fn publish_prepared_group_rumor(&mut self, chat_id: String, mut rumor: UnsignedEvent) { | ||
| let network_enabled = self.network_enabled(); | ||
| let fallback_relays = self.default_relays(); | ||
| let rumor_id_hex = rumor.id().to_hex(); | ||
|
|
||
| let mark_publish_failed = |core: &mut Self, reason: &str| { | ||
| core.handle_publish_message_result( | ||
| chat_id.clone(), | ||
| rumor_id_hex.clone(), | ||
| false, | ||
| Some(reason.to_string()), | ||
| ); | ||
| }; | ||
|
|
||
| let (client, wrapper, relays) = { | ||
| let Some(sess) = self.session.as_mut() else { | ||
| mark_publish_failed(self, "session lost"); | ||
| return; | ||
| }; | ||
| let Some(group) = sess.groups.get(&chat_id).cloned() else { | ||
| self.toast("Chat not found"); | ||
| mark_publish_failed(self, "chat not found"); | ||
| return; | ||
| }; | ||
|
|
||
| let wrapper = match sess.mdk.create_message(&group.mls_group_id, rumor) { | ||
| Ok(e) => e, | ||
| Err(e) => { | ||
|
|
@@ -1821,13 +1964,9 @@ impl AppCore { | |
| vec![] | ||
| }; | ||
|
|
||
| (sess.client.clone(), wrapper, relays, rumor_id_hex) | ||
| (sess.client.clone(), wrapper, relays) | ||
| }; | ||
|
|
||
| self.prune_local_outbox(&chat_id); | ||
| self.refresh_chat_list_from_storage(); | ||
| self.refresh_current_chat_if_open(&chat_id); | ||
|
|
||
| if !network_enabled { | ||
| let _ = self.core_sender.send(CoreMsg::Internal(Box::new( | ||
| InternalEvent::PublishMessageResult { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -771,6 +771,7 @@ pub struct AppCore { | |
| // Nostr kind:0 profile cache (survives across session refreshes). | ||
| profiles: HashMap<String, ProfileCache>, // hex pubkey -> cached global profile | ||
| group_profiles: HashMap<String, HashMap<String, ProfileCache>>, // chat_id -> (pubkey -> profile) | ||
| processed_group_event_ids: HashSet<EventId>, | ||
| profile_db: Option<rusqlite::Connection>, | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| chat_media_db: Option<rusqlite::Connection>, | ||
|
|
||
|
|
@@ -887,6 +888,8 @@ impl AppCore { | |
|
|
||
| let push_device_id = Self::load_or_create_push_device_id(&data_dir); | ||
| let push_subscribed_chat_ids = Self::load_push_subscriptions(&data_dir); | ||
| let processed_group_event_ids = | ||
| pika_marmot_runtime::load_processed_mls_event_ids(std::path::Path::new(&data_dir)); | ||
|
|
||
| let mut this = Self { | ||
| state, | ||
|
|
@@ -919,6 +922,7 @@ impl AppCore { | |
| local_outbox: HashMap::new(), | ||
| profiles, | ||
| group_profiles: HashMap::new(), | ||
| processed_group_event_ids, | ||
| profile_db, | ||
| typing_state: HashMap::new(), | ||
| last_typing_sent: HashMap::new(), | ||
|
|
@@ -3139,6 +3143,7 @@ impl AppCore { | |
| self.cancel_call_offer_timeout(); | ||
| self.cancel_voice_recording_ticks(); | ||
|
|
||
| self.clear_processed_group_event_ids_cache(); | ||
| let root = std::path::Path::new(&self.data_dir); | ||
| match std::fs::read_dir(root) { | ||
| Ok(entries) => { | ||
|
|
@@ -3327,6 +3332,12 @@ impl AppCore { | |
| ok, | ||
| error, | ||
| } => self.handle_publish_message_result(chat_id, rumor_id, ok, error), | ||
| InternalEvent::GroupSendCatchupCompleted { | ||
| chat_id, | ||
| rumor, | ||
| backlog_events, | ||
| error, | ||
| } => self.handle_group_send_catchup_completed(chat_id, rumor, backlog_events, error), | ||
| InternalEvent::ChatMediaUploadCompleted { | ||
| request_id, | ||
| uploaded_url, | ||
|
|
@@ -4451,6 +4462,7 @@ impl AppCore { | |
| } | ||
|
|
||
| pub(crate) fn handle_group_message(&mut self, event: Event) { | ||
| let event_id = event.id; | ||
| let result = { | ||
| let Some(sess) = self.session.as_mut() else { | ||
| tracing::warn!("group_message but no session"); | ||
|
|
@@ -4465,9 +4477,48 @@ impl AppCore { | |
| } | ||
| } | ||
| }; | ||
| self.note_processed_group_event_id(event_id); | ||
| self.handle_message_processing_result(result); | ||
| } | ||
|
|
||
| fn note_processed_group_event_id(&mut self, event_id: EventId) { | ||
| if !self.processed_group_event_ids.insert(event_id) { | ||
| return; | ||
| } | ||
| self.persist_processed_group_event_ids_cache(); | ||
| } | ||
|
|
||
| fn note_processed_group_event_id_in_memory(&mut self, event_id: EventId) { | ||
| self.processed_group_event_ids.insert(event_id); | ||
| } | ||
|
|
||
| fn persist_processed_group_event_ids_cache(&mut self) { | ||
| if let Err(e) = pika_marmot_runtime::persist_processed_mls_event_ids( | ||
| std::path::Path::new(&self.data_dir), | ||
| &self.processed_group_event_ids, | ||
| ) { | ||
| tracing::warn!(%e, "failed to persist processed group event ids"); | ||
| } | ||
|
devin-ai-integration[bot] marked this conversation as resolved.
|
||
| } | ||
|
Comment on lines
+4484
to
+4502
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bound the processed-event cache by recency in memory too.
🤖 Prompt for AI Agents
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure but I see this as follow-up hardening not a blocker for this parity fix. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If you’d like, I can open a tracking issue with the following scope so it’s queued up:
Preference on the default cap? I can proceed with opening the issue now if you confirm. ✏️ Learnings added
🧠 Learnings used |
||
|
|
||
| fn clear_processed_group_event_ids_cache(&mut self) { | ||
| self.processed_group_event_ids.clear(); | ||
| let path = pika_marmot_runtime::processed_mls_event_ids_path( | ||
| std::path::Path::new(&self.data_dir), | ||
| ); | ||
| match std::fs::remove_file(&path) { | ||
| Ok(()) => {} | ||
| Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} | ||
| Err(e) => { | ||
| tracing::warn!( | ||
| %e, | ||
| path = %path.display(), | ||
| "failed to delete processed group event id cache file" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn handle_message_processing_result(&mut self, result: MessageProcessingResult) { | ||
| // Phase 1: Extract mls_group_id and optional app message. | ||
| let (mls_group_id, app_msg) = match result { | ||
|
|
@@ -4729,6 +4780,7 @@ impl AppCore { | |
| tracing::info!(path = %db_path.display(), "deleted mdk db on logout"); | ||
| } | ||
| } | ||
| self.clear_processed_group_event_ids_cache(); | ||
| self.clear_push_subscriptions(); | ||
| self.stop_session(); | ||
| self.state.auth = AuthState::LoggedOut; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't continue to
create_message()after a failed catch-up.Line 1837 converts backlog fetch failure into
error, but Lines 1860-1864 only log it and continue. Lines 1899-1901 do the same when backlog ingestion still leaves events behind. That meanspublish_prepared_group_rumor()can still encrypt from a known-incomplete MLS state, which is the exact stale-epoch failure mode this PR is trying to avoid.Suggested direction
pub(super) fn handle_group_send_catchup_completed( &mut self, chat_id: String, rumor: UnsignedEvent, backlog_events: Vec<Event>, error: Option<String>, ) { + let rumor_id_hex = rumor.id().to_hex(); if let Some(err) = error { - tracing::warn!(%chat_id, %err, "group send backlog fetch failed"); + let delivery = MessageDeliveryState::Failed { + reason: format!("group backlog fetch failed: {err}"), + }; + self.delivery_overrides + .entry(chat_id.clone()) + .or_default() + .insert(rumor_id_hex.clone(), delivery.clone()); + self.fail_delivery_or_refresh(&chat_id, &rumor_id_hex, delivery); + self.refresh_chat_list_from_storage(); + return; } - self.ingest_group_backlog_events(backlog_events); + if !self.ingest_group_backlog_events(backlog_events) { + let delivery = MessageDeliveryState::Failed { + reason: "group backlog catch-up incomplete".into(), + }; + self.delivery_overrides + .entry(chat_id.clone()) + .or_default() + .insert(rumor_id_hex.clone(), delivery.clone()); + self.fail_delivery_or_refresh(&chat_id, &rumor_id_hex, delivery); + self.refresh_chat_list_from_storage(); + return; + } self.publish_prepared_group_rumor(chat_id, rumor); } -fn ingest_group_backlog_events(&mut self, backlog_events: Vec<Event>) { +fn ingest_group_backlog_events(&mut self, backlog_events: Vec<Event>) -> bool { let mut remaining = backlog_events; for _ in 0..3 { if remaining.is_empty() { break; } @@ - let Some(sess) = self.session.as_mut() else { - return; + let Some(sess) = self.session.as_mut() else { + return false; }; sess.mdk.process_message(&event) }; @@ - if !remaining.is_empty() { - tracing::warn!(remaining = remaining.len(), "failed to ingest some backlog events"); - } + if !remaining.is_empty() { + tracing::warn!(remaining = remaining.len(), "failed to ingest some backlog events"); + return false; + } + true }Also applies to: 1860-1864, 1899-1901
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, but this PR is intentionally matching the CLI’s current best-effort catch-up behavior (parity) rather than changing/introducing a new fail-closed policy.
but it could be a good follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reez, understood — keeping best-effort parity with the CLI makes sense as an incremental step. Would you like me to open a follow-up issue to track the fail-closed catch-up policy (failing the send when backlog fetch errors or ingestion is incomplete), so it doesn't get lost?✏️ Learnings added
🧠 Learnings used