From 42b37848990559a846f02e8ec9472596d65062c4 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Thu, 18 Sep 2025 22:52:04 -0300 Subject: [PATCH 1/3] Add MSC4357 live messaging support --- src/base.rs | 34 ++- src/message/compose.rs | 55 +++-- src/message/live.rs | 379 ++++++++++++++++++++++++++++++++++ src/message/live_detection.rs | 215 +++++++++++++++++++ src/message/mod.rs | 26 ++- src/notifications.rs | 8 +- src/windows/room/chat.rs | 348 ++++++++++++++++++++++++++++++- src/worker.rs | 193 ++++++++++++++--- 8 files changed, 1202 insertions(+), 56 deletions(-) create mode 100644 src/message/live.rs create mode 100644 src/message/live_detection.rs diff --git a/src/base.rs b/src/base.rs index bb4f491d..b6af3ac0 100644 --- a/src/base.rs +++ b/src/base.rs @@ -178,6 +178,7 @@ pub enum MessageAction { /// and error when it doesn't recognize it. The second [bool] argument forces it to be /// interpreted literally when it is `true`. Unreact(Option, bool), + } /// An action taken in the currently selected space. @@ -898,6 +899,9 @@ pub struct RoomInfo { /// A map of message identifiers to thread replies. threads: HashMap, + /// Set of event IDs for messages that are live (being typed) + pub live_message_ids: HashSet, + /// Whether the scrollback for this room is currently being fetched. pub fetching: bool, @@ -929,6 +933,7 @@ impl Default for RoomInfo { user_receipts: Default::default(), reactions: Default::default(), threads: Default::default(), + live_message_ids: Default::default(), fetching: Default::default(), fetch_id: Default::default(), fetch_last: Default::default(), @@ -1087,6 +1092,7 @@ impl RoomInfo { let event_id = msg.event_id; let new_msgtype = msg.new_content; + let Some(EventLocation::Message(thread, key)) = self.keys.get(&event_id) else { return; }; @@ -1103,9 +1109,12 @@ impl RoomInfo { return; }; + // Update live status based on live_message_ids + msg.is_live = self.live_message_ids.contains(&event_id); + match &mut msg.event { MessageEvent::Original(orig) => { - orig.content.apply_replacement(new_msgtype); + orig.content.apply_replacement(new_msgtype.clone()); }, MessageEvent::Local(_, content) => { content.apply_replacement(new_msgtype); @@ -1165,22 +1174,37 @@ impl RoomInfo { let event_id = msg.event_id().to_owned(); let key = (msg.origin_server_ts().into(), event_id.clone()); + // Check if this is a live message + let is_live = self.live_message_ids.contains(&event_id); + let loc = EventLocation::Message(None, key.clone()); - self.keys.insert(event_id, loc); - self.messages.insert_message(key, msg); + self.keys.insert(event_id.clone(), loc); + + // Convert to Message and set is_live flag if needed + let mut message: Message = msg.into(); + message.is_live = is_live; + + self.messages.insert_message(key, message); } fn insert_thread(&mut self, msg: RoomMessageEvent, thread_root: OwnedEventId) { let event_id = msg.event_id().to_owned(); let key = (msg.origin_server_ts().into(), event_id.clone()); + // Check if this is a live message + let is_live = self.live_message_ids.contains(&event_id); + let replies = self .threads .entry(thread_root.clone()) .or_insert_with(|| Messages::thread(thread_root.clone())); let loc = EventLocation::Message(Some(thread_root), key.clone()); - self.keys.insert(event_id, loc); - replies.insert_message(key, msg); + self.keys.insert(event_id.clone(), loc); + + let mut message: Message = msg.into(); + message.is_live = is_live; + + replies.insert_message(key, message); } /// Insert a new message event. diff --git a/src/message/compose.rs b/src/message/compose.rs index 8fd6ad00..bda8ce9e 100644 --- a/src/message/compose.rs +++ b/src/message/compose.rs @@ -16,7 +16,7 @@ use matrix_sdk::ruma::events::room::message::{ }; #[derive(Clone, Debug, Default)] -enum SlashCommand { +pub enum SlashCommand { /// Send an emote message. Emote, @@ -47,6 +47,9 @@ enum SlashCommand { /// Send a message with heart effects in clients that show them. SpaceInvaders, + + /// Send a live message that updates as you type (MSC4357) + Live, } impl SlashCommand { @@ -95,6 +98,11 @@ impl SlashCommand { Default::default(), )? }, + SlashCommand::Live => { + // Live messages are handled specially in the chat window + // This should not be called directly + return Err(anyhow::anyhow!("Live message")); + }, }; Ok(msgtype) @@ -104,6 +112,7 @@ impl SlashCommand { fn parse_slash_command_inner(input: &str) -> IResult<&str, SlashCommand> { let (input, _) = space0(input)?; let (input, slash) = alt(( + // Commands that require text after them (with space) value(SlashCommand::Emote, tag("/me ")), value(SlashCommand::Html, tag("/h ")), value(SlashCommand::Html, tag("/html ")), @@ -118,6 +127,7 @@ fn parse_slash_command_inner(input: &str) -> IResult<&str, SlashCommand> { value(SlashCommand::Rainfall, tag("/rainfall ")), value(SlashCommand::Snowfall, tag("/snowfall ")), value(SlashCommand::SpaceInvaders, tag("/spaceinvaders ")), + value(SlashCommand::Live, tag("/live ")), ))(input)?; let (input, _) = space0(input)?; @@ -170,12 +180,23 @@ fn text_to_message_content(input: String) -> TextMessageEventContent { } } +#[allow(dead_code)] pub fn text_to_message(input: String) -> RoomMessageEventContent { - let msg = parse_slash_command(input.as_str()) - .and_then(|(input, slash)| slash.to_message(input)) - .unwrap_or_else(|_| MessageType::Text(text_to_message_content(input))); + text_to_message_with_command(input).0 +} - RoomMessageEventContent::new(msg) +pub fn text_to_message_with_command(input: String) -> (RoomMessageEventContent, Option) { + match parse_slash_command(input.as_str()) { + Ok((text, slash)) => { + let msg = slash.to_message(text) + .unwrap_or_else(|_| MessageType::Text(text_to_message_content(text.to_string()))); + (RoomMessageEventContent::new(msg), Some(slash)) + }, + Err(_) => { + let msg = MessageType::Text(text_to_message_content(input)); + (RoomMessageEventContent::new(msg), None) + } + } } #[cfg(test)] @@ -330,8 +351,7 @@ pub mod tests { assert_eq!(content.body, "bold"); assert_eq!(content.formatted.unwrap().body, "bold"); - let MessageType::Text(content) = text_to_message("/plain bold".into()).msgtype - else { + let MessageType::Text(content) = text_to_message("/plain bold".into()).msgtype else { panic!("Expected MessageType::Text"); }; assert_eq!(content.body, "bold"); @@ -343,33 +363,40 @@ pub mod tests { assert_eq!(content.body, "bold"); assert!(content.formatted.is_none(), "{:?}", content.formatted); - let MessageType::Emote(content) = text_to_message("/me *bold*".into()).msgtype else { + let (msg, _) = text_to_message_with_command("/me *bold*".into()); + let MessageType::Emote(content) = msg.msgtype else { panic!("Expected MessageType::Emote"); }; assert_eq!(content.body, "*bold*"); assert_eq!(content.formatted.unwrap().body, "

bold

\n"); - let content = text_to_message("/confetti hello".into()).msgtype; + let (msg, _) = text_to_message_with_command("/confetti hello".into()); + let content = msg.msgtype; assert_eq!(content.msgtype(), "nic.custom.confetti"); assert_eq!(content.body(), "hello"); - let content = text_to_message("/fireworks hello".into()).msgtype; + let (msg, _) = text_to_message_with_command("/fireworks hello".into()); + let content = msg.msgtype; assert_eq!(content.msgtype(), "nic.custom.fireworks"); assert_eq!(content.body(), "hello"); - let content = text_to_message("/hearts hello".into()).msgtype; + let (msg, _) = text_to_message_with_command("/hearts hello".into()); + let content = msg.msgtype; assert_eq!(content.msgtype(), "io.element.effect.hearts"); assert_eq!(content.body(), "hello"); - let content = text_to_message("/rainfall hello".into()).msgtype; + let (msg, _) = text_to_message_with_command("/rainfall hello".into()); + let content = msg.msgtype; assert_eq!(content.msgtype(), "io.element.effect.rainfall"); assert_eq!(content.body(), "hello"); - let content = text_to_message("/snowfall hello".into()).msgtype; + let (msg, _) = text_to_message_with_command("/snowfall hello".into()); + let content = msg.msgtype; assert_eq!(content.msgtype(), "io.element.effect.snowfall"); assert_eq!(content.body(), "hello"); - let content = text_to_message("/spaceinvaders hello".into()).msgtype; + let (msg, _) = text_to_message_with_command("/spaceinvaders hello".into()); + let content = msg.msgtype; assert_eq!(content.msgtype(), "io.element.effects.space_invaders"); assert_eq!(content.body(), "hello"); } diff --git a/src/message/live.rs b/src/message/live.rs new file mode 100644 index 00000000..07633c66 --- /dev/null +++ b/src/message/live.rs @@ -0,0 +1,379 @@ +//! Live messaging support for Matrix (MSC4357) +//! +//! This module implements live message composition where messages are sent +//! as they're being typed and continuously updated until finalized. + +use std::collections::HashMap; +use std::time::{Duration, Instant}; +use matrix_sdk::ruma::{EventId, OwnedEventId, OwnedRoomId}; +use serde_json::{json, Value}; + +/// Minimum interval between live message updates +const MIN_UPDATE_INTERVAL: Duration = Duration::from_secs(2); + +/// Rate limit window for tracking update frequency +const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(60); + +/// Maximum updates allowed per window +const MAX_UPDATES_PER_WINDOW: usize = 30; + +/// The live message marker key used in event content +pub const LIVE_MESSAGE_MARKER: &str = "org.matrix.msc4357.live"; + +/// Live message session state +#[derive(Debug, Clone)] +pub struct LiveMessageSession { + /// The event ID of the initial message + pub event_id: OwnedEventId, + + /// Current content of the message + pub current_content: String, + + /// Timestamp of the last update sent + pub last_update: Instant, + + /// Number of updates sent for this message + pub update_count: usize, + + /// Whether this session is still active + pub is_active: bool, +} + +/// Rate limiter to prevent excessive updates +#[derive(Debug)] +struct RateLimiter { + /// Track update counts per room + update_counts: HashMap>, +} + +impl RateLimiter { + fn new() -> Self { + Self { + update_counts: HashMap::new(), + } + } + + fn can_send_update(&mut self, room_id: &OwnedRoomId) -> bool { + let now = Instant::now(); + let updates = self.update_counts.entry(room_id.clone()).or_default(); + + // Remove old updates outside the window + updates.retain(|&t| now.duration_since(t) < RATE_LIMIT_WINDOW); + + // Check if we're under the limit + updates.len() < MAX_UPDATES_PER_WINDOW + } + + fn record_update(&mut self, room_id: OwnedRoomId) { + let updates = self.update_counts.entry(room_id).or_default(); + updates.push(Instant::now()); + } +} + +/// Manager for live message sessions +#[derive(Debug)] +pub struct LiveMessageManager { + /// Active live message sessions per room + sessions: HashMap, + + /// Rate limiter for updates + rate_limiter: RateLimiter, + + /// Whether live messaging is enabled + pub enabled: bool, +} + +impl LiveMessageManager { + /// Create a new live message manager + pub fn new() -> Self { + Self { + sessions: HashMap::new(), + rate_limiter: RateLimiter::new(), + enabled: false, + } + } + + /// Start a new live message session + pub fn start_session(&mut self, room_id: OwnedRoomId, initial_content: String) -> Option { + tracing::info!("[LIVE] LiveMessageManager::start_session called, enabled={}", self.enabled); + if !self.enabled { + tracing::warn!("[LIVE] LiveMessageManager is disabled, cannot start session"); + return None; + } + + // Generate temporary event ID - use a valid format + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis(); + let temp_event_id = format!("$temp_{}_{}", timestamp, room_id.as_str().replace(":", "_")); + let event_id = temp_event_id.parse().ok()?; + + let session = LiveMessageSession { + event_id, + current_content: initial_content, + last_update: Instant::now(), + update_count: 0, + is_active: true, + }; + + self.sessions.insert(room_id.clone(), session.clone()); + self.rate_limiter.record_update(room_id); + + Some(session) + } + + /// Set the real event ID received from server + pub fn set_event_id(&mut self, room_id: &OwnedRoomId, event_id: OwnedEventId) { + if let Some(session) = self.sessions.get_mut(room_id) { + session.event_id = event_id; + } + } + + /// Check if we still have a temporary event ID + pub fn has_temp_event_id(&self, room_id: &OwnedRoomId) -> bool { + self.sessions + .get(room_id) + .map(|s| s.event_id.as_str().starts_with("$temp_") || s.event_id.as_str().contains("local")) + .unwrap_or(false) + } + + /// Update an existing session + pub fn update_session(&mut self, room_id: &OwnedRoomId, new_content: String) -> Option { + if !self.enabled { + return None; + } + + let session = self.sessions.get_mut(room_id)?; + + if !session.is_active { + return None; + } + + session.current_content = new_content; + session.last_update = Instant::now(); + session.update_count += 1; + + self.rate_limiter.record_update(room_id.clone()); + + Some(session.clone()) + } + + /// Check if we should send an update now + pub fn should_send_update(&mut self, room_id: &OwnedRoomId) -> bool { + if !self.enabled { + return false; + } + + let session = match self.sessions.get(room_id) { + Some(s) if s.is_active => s, + _ => return false, + }; + + // Don't send if we have a temp ID + if self.has_temp_event_id(room_id) { + return false; + } + + // Check minimum interval + if session.last_update.elapsed() < MIN_UPDATE_INTERVAL { + return false; + } + + // Check rate limit + self.rate_limiter.can_send_update(room_id) + } + + /// Get the current session for a room + pub fn get_session(&self, room_id: &OwnedRoomId) -> Option<&LiveMessageSession> { + self.sessions.get(room_id) + } + + /// End a live message session + pub fn end_session(&mut self, room_id: &OwnedRoomId) -> Option { + self.sessions.remove(room_id) + } +} + +/// Create initial live message JSON with fields +pub fn create_initial_live_message_with_fields( + content: &str, + custom_fields: Option, +) -> Value { + let mut json = json!({ + "msgtype": "m.text", + "body": content, + LIVE_MESSAGE_MARKER: true + }); + + if let Some(custom) = custom_fields { + if let Some(obj) = json.as_object_mut() { + if let Some(custom_obj) = custom.as_object() { + for (key, value) in custom_obj { + if !matches!(key.as_str(), "msgtype" | "body") { + obj.insert(key.clone(), value.clone()); + } + } + } + } + } + + json +} + +/// Create a live message update JSON with custom fields +pub fn create_live_update_json_with_fields( + original_event_id: &EventId, + new_content: &str, + is_final: bool, + custom_fields: Option, +) -> Value { + let mut json = json!({ + "msgtype": "m.text", + "body": format!("* {}", new_content), + "m.new_content": { + "msgtype": "m.text", + "body": new_content, + }, + "m.relates_to": { + "rel_type": "m.replace", + "event_id": original_event_id.to_string(), + } + }); + + // Add live marker to new_content unless it's final + if !is_final { + if let Some(new_content_obj) = json["m.new_content"].as_object_mut() { + new_content_obj.insert(LIVE_MESSAGE_MARKER.to_string(), json!(true)); + tracing::debug!("[LIVE JSON] Added live marker to update"); + } + } else { + tracing::info!("[LIVE JSON] Creating FINAL update WITHOUT live marker"); + } + + // Add custom fields if provided + if let Some(custom) = custom_fields { + if let Some(obj) = json.as_object_mut() { + if let Some(custom_obj) = custom.as_object() { + for (key, value) in custom_obj { + if !matches!(key.as_str(), "msgtype" | "body" | "m.new_content" | "m.relates_to") { + obj.insert(key.clone(), value.clone()); + } + } + } + } + } + + json +} + +/// Send a new live message with custom fields +pub async fn send_live_message_with_custom_fields( + room: &matrix_sdk::Room, + content: &str, + custom_fields: Option, +) -> Result { + let json = create_initial_live_message_with_fields(content, custom_fields); + + // Log for debugging + tracing::debug!("Sending initial live message JSON: {}", serde_json::to_string_pretty(&json).unwrap_or_default()); + + let resp = room.send_raw("m.room.message", json).await?; + Ok(resp.event_id) +} + +/// Send a live update with custom fields +pub async fn send_live_update_with_custom_fields( + room: &matrix_sdk::Room, + original_event_id: &EventId, + new_content: &str, + is_final: bool, + custom_fields: Option, +) -> Result { + let json = create_live_update_json_with_fields( + original_event_id, + new_content, + is_final, + custom_fields, + ); + + // Log for debugging + tracing::debug!("Sending live update JSON: {}", serde_json::to_string_pretty(&json).unwrap_or_default()); + + let resp = room.send_raw("m.room.message", json).await?; + Ok(resp.event_id) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_create_initial_live_message() { + let content = "Hello world"; + let json = create_initial_live_message_with_fields(content, None); + + // Check that the JSON has the required fields + assert_eq!(json["msgtype"], "m.text"); + assert_eq!(json["body"], "Hello world"); + assert_eq!(json["org.matrix.msc4357.live"], true); + } + + #[test] + fn test_create_live_update_with_marker() { + use std::str::FromStr; + let event_id = OwnedEventId::from_str("$test_event_id:example.org").unwrap(); + let content = "Updated content"; + let json = create_live_update_json_with_fields(&event_id, content, false, None); + + // Check update structure + assert_eq!(json["msgtype"], "m.text"); + assert!(json["body"].as_str().unwrap().starts_with("* ")); + assert_eq!(json["m.new_content"]["body"], "Updated content"); + assert_eq!(json["m.new_content"]["org.matrix.msc4357.live"], true); + assert_eq!(json["m.relates_to"]["rel_type"], "m.replace"); + assert_eq!(json["m.relates_to"]["event_id"], "$test_event_id:example.org"); + } + + #[test] + fn test_create_final_update_without_marker() { + use std::str::FromStr; + let event_id = OwnedEventId::from_str("$test_event_id:example.org").unwrap(); + let content = "Final content"; + let json = create_live_update_json_with_fields(&event_id, content, true, None); + + // Check that final update doesn't have live marker + assert_eq!(json["m.new_content"]["body"], "Final content"); + assert!(json["m.new_content"]["org.matrix.msc4357.live"].is_null()); + } + + #[test] + fn test_live_message_manager_session() { + use std::str::FromStr; + let mut manager = LiveMessageManager::new(); + manager.enabled = true; + + let room_id = OwnedRoomId::from_str("!test:example.org").unwrap(); + let content = "Test message".to_string(); + + // Start a session + let session = manager.start_session(room_id.clone(), content.clone()); + assert!(session.is_some()); + + let session = session.unwrap(); + assert_eq!(session.current_content, "Test message"); + assert!(session.is_active); + + // Get the session + let retrieved = manager.get_session(&room_id); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().current_content, "Test message"); + + // End the session + let ended = manager.end_session(&room_id); + assert!(ended.is_some()); + + // Session should be gone + assert!(manager.get_session(&room_id).is_none()); + } +} \ No newline at end of file diff --git a/src/message/live_detection.rs b/src/message/live_detection.rs new file mode 100644 index 00000000..ecea9949 --- /dev/null +++ b/src/message/live_detection.rs @@ -0,0 +1,215 @@ +//! Live message detection utilities +//! +//! This module provides centralized logic for detecting whether a Matrix event +//! contains the MSC4357 live message marker. + +use matrix_sdk::ruma::serde::Raw; +use serde_json::Value; + +/// The MSC4357 live message marker key +pub const LIVE_MARKER: &str = "org.matrix.msc4357.live"; + +/// Check if a raw JSON string contains the live message marker +/// +/// This function checks for the presence of the live marker in: +/// 1. Root level (for initial live messages) +/// 2. Inside m.new_content (for replacement/edit messages) +/// +/// # Arguments +/// * `json_str` - The raw JSON string of the event +/// +/// # Returns +/// * `true` if the event has the live marker set to true +/// * `false` otherwise +pub fn has_live_marker_in_json(json_str: &str) -> bool { + // Quick string check first for performance + if !json_str.contains(LIVE_MARKER) { + return false; + } + + // Parse JSON and check properly + if let Ok(json) = serde_json::from_str::(json_str) { + has_live_marker_in_value(&json) + } else { + // Fallback to simple string search if parsing fails + json_str.contains(&format!(r#""{}":true"#, LIVE_MARKER)) + } +} + +/// Check if a JSON value contains the live message marker +/// +/// Checks both at root level and inside m.new_content for replacements +pub fn has_live_marker_in_value(json: &Value) -> bool { + // Check in m.new_content first (for replacements) + if let Some(new_content) = json.get("m.new_content") { + if let Some(marker) = new_content.get(LIVE_MARKER) { + return marker.as_bool().unwrap_or(false); + } + } + + // Check at root level (for initial messages) + if let Some(marker) = json.get(LIVE_MARKER) { + return marker.as_bool().unwrap_or(false); + } + + // Also check in content field (for sync events) + if let Some(content) = json.get("content") { + // Check in content.m.new_content for replacements + if let Some(new_content) = content.get("m.new_content") { + if let Some(marker) = new_content.get(LIVE_MARKER) { + return marker.as_bool().unwrap_or(false); + } + } + // Check in content root for initial messages + if let Some(marker) = content.get(LIVE_MARKER) { + return marker.as_bool().unwrap_or(false); + } + } + + false +} + +/// Check if a Raw event contains the live message marker +pub fn has_live_marker_in_raw(raw: &Raw) -> bool { + has_live_marker_in_json(raw.json().get()) +} + +/// Determine if a replacement event is a live update or final update +/// +/// Returns Some(true) if it's a live update, Some(false) if it's final, +/// or None if it's not a replacement +#[allow(dead_code)] +pub fn is_live_replacement(json: &Value) -> Option { + // Check if this is a replacement + let is_replacement = json.get("m.relates_to") + .and_then(|r| r.get("rel_type")) + .and_then(|t| t.as_str()) + .map(|t| t == "m.replace") + .unwrap_or(false); + + if !is_replacement { + // Also check in content.m.relates_to for sync events + let is_replacement_in_content = json.get("content") + .and_then(|c| c.get("m.relates_to")) + .and_then(|r| r.get("rel_type")) + .and_then(|t| t.as_str()) + .map(|t| t == "m.replace") + .unwrap_or(false); + + if !is_replacement_in_content { + return None; + } + } + + // It's a replacement, check if it has the live marker + Some(has_live_marker_in_value(json)) +} + +/// Extract the target event ID from a replacement event +#[allow(dead_code)] +pub fn get_replacement_target_id(json: &Value) -> Option { + // Check in m.relates_to + if let Some(event_id) = json.get("m.relates_to") + .and_then(|r| r.get("event_id")) + .and_then(|id| id.as_str()) { + return Some(event_id.to_string()); + } + + // Check in content.m.relates_to for sync events + json.get("content") + .and_then(|c| c.get("m.relates_to")) + .and_then(|r| r.get("event_id")) + .and_then(|id| id.as_str()) + .map(|id| id.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_has_live_marker_in_json_root() { + let json = json!({ + "msgtype": "m.text", + "body": "Hello", + "org.matrix.msc4357.live": true + }); + let json_str = serde_json::to_string(&json).unwrap(); + assert!(has_live_marker_in_json(&json_str)); + } + + #[test] + fn test_has_live_marker_in_json_new_content() { + let json = json!({ + "msgtype": "m.text", + "body": "* Hello", + "m.new_content": { + "msgtype": "m.text", + "body": "Hello", + "org.matrix.msc4357.live": true + }, + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$test" + } + }); + let json_str = serde_json::to_string(&json).unwrap(); + assert!(has_live_marker_in_json(&json_str)); + } + + #[test] + fn test_has_live_marker_in_sync_event() { + let json = json!({ + "content": { + "msgtype": "m.text", + "body": "Hello", + "org.matrix.msc4357.live": true + }, + "event_id": "$test" + }); + assert!(has_live_marker_in_value(&json)); + } + + #[test] + fn test_no_live_marker() { + let json = json!({ + "msgtype": "m.text", + "body": "Hello" + }); + let json_str = serde_json::to_string(&json).unwrap(); + assert!(!has_live_marker_in_json(&json_str)); + } + + #[test] + fn test_is_live_replacement() { + let live_replacement = json!({ + "m.new_content": { + "body": "Hello", + "org.matrix.msc4357.live": true + }, + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$test" + } + }); + assert_eq!(is_live_replacement(&live_replacement), Some(true)); + + let final_replacement = json!({ + "m.new_content": { + "body": "Hello" + }, + "m.relates_to": { + "rel_type": "m.replace", + "event_id": "$test" + } + }); + assert_eq!(is_live_replacement(&final_replacement), Some(false)); + + let not_replacement = json!({ + "msgtype": "m.text", + "body": "Hello" + }); + assert_eq!(is_live_replacement(¬_replacement), None); + } +} \ No newline at end of file diff --git a/src/message/mod.rs b/src/message/mod.rs index 723a575e..c1377a33 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -48,7 +48,7 @@ use matrix_sdk::ruma::{ }; use ratatui::{ - style::{Modifier as StyleModifier, Style}, + style::{Color, Modifier as StyleModifier, Style}, symbols::line::THICK_VERTICAL, text::{Line, Span, Text}, }; @@ -67,10 +67,18 @@ use crate::{ mod compose; mod html; +mod live; +pub mod live_detection; mod printer; mod state; -pub use self::compose::text_to_message; +pub use self::compose::text_to_message_with_command; +pub use self::live::{ + LiveMessageManager, + LiveMessageSession, + send_live_message_with_custom_fields, + send_live_update_with_custom_fields, +}; use self::state::{body_cow_state, html_state}; pub use html::TreeGenState; @@ -478,6 +486,7 @@ impl MessageEvent { ) } + pub fn body(&self) -> Cow<'_, str> { match self { MessageEvent::EncryptedOriginal(_) => "[Unable to decrypt message]".into(), @@ -844,6 +853,7 @@ pub struct Message { pub downloaded: bool, pub html: Option, pub image_preview: ImageStatus, + pub is_live: bool, } impl Message { @@ -851,6 +861,9 @@ impl Message { let html = event.html(); let downloaded = false; + // The is_live flag will be set explicitly by the caller if needed + let is_live = false; + Message { event, sender, @@ -858,9 +871,11 @@ impl Message { downloaded, html, image_preview: ImageStatus::None, + is_live, } } + pub fn reply_to(&self) -> Option { let content = match &self.event { MessageEvent::EncryptedOriginal(_) => return None, @@ -914,11 +929,17 @@ impl Message { style = style.add_modifier(StyleModifier::ITALIC); } + // Apply user color first if enabled (but it will be overridden for live messages) if settings.tunables.message_user_color { let color = settings.get_user_color(&self.sender); style = style.fg(color); } + // Make live messages appear gray to indicate they're still being composed + if self.is_live { + style = style.fg(Color::Rgb(100, 100, 100)); + } + return style; } @@ -1069,6 +1090,7 @@ impl Message { msg.to_mut().push_str(" \u{2705}"); } + let mut proto = None; let placeholder = match &self.image_preview { ImageStatus::None => None, diff --git a/src/notifications.rs b/src/notifications.rs index b4938fe2..2a1098cc 100644 --- a/src/notifications.rs +++ b/src/notifications.rs @@ -142,7 +142,7 @@ async fn send_notification_bell(store: &AsyncProgramStore) { async fn send_notification_desktop( summary: &str, body: Option<&str>, - room_id: OwnedRoomId, + _room_id: OwnedRoomId, _store: &AsyncProgramStore, sound_hint: Option<&str>, ) { @@ -166,17 +166,19 @@ async fn send_notification_desktop( match desktop_notification.show() { Err(err) => tracing::error!("Failed to send notification: {err}"), + #[cfg(all(unix, not(target_os = "macos")))] Ok(handle) => { - #[cfg(all(unix, not(target_os = "macos")))] _store .lock() .await .application .open_notifications - .entry(room_id) + .entry(_room_id) .or_default() .push(NotificationHandle(Some(handle))); }, + #[cfg(not(all(unix, not(target_os = "macos"))))] + Ok(_) => {}, } } diff --git a/src/windows/room/chat.rs b/src/windows/room/chat.rs index 6cf74245..66bf99b1 100644 --- a/src/windows/room/chat.rs +++ b/src/windows/room/chat.rs @@ -87,12 +87,16 @@ use crate::base::{ }; use crate::message::{ - text_to_message, + text_to_message_with_command, + LiveMessageManager, + LiveMessageSession, Message, MessageEvent, MessageKey, MessageTimeStamp, TreeGenState, + send_live_message_with_custom_fields, + send_live_update_with_custom_fields, }; use crate::worker::Requester; @@ -112,6 +116,9 @@ pub struct ChatState { reply_to: Option, editing: Option, + + live_messages: LiveMessageManager, + live_event_id_receiver: Option>, } impl ChatState { @@ -135,6 +142,9 @@ impl ChatState { reply_to: None, editing: None, + + live_messages: LiveMessageManager::new(), + live_event_id_receiver: None, } } @@ -545,8 +555,55 @@ impl ChatState { msg.trim_end().to_string() }; - let mut msg = text_to_message(msg); + // Check if this is a /live message + let is_live_message = msg.trim_end().starts_with("/live "); + + if is_live_message { + // Remove "/live " prefix to get actual message + let actual_msg = msg.trim_end().strip_prefix("/live ").unwrap_or("").to_string(); + + // Check if we have an active live session that needs finalizing + if let Some(session) = self.live_messages.end_session(&self.room_id) { + if actual_msg.is_empty() { + // Cancel the live message by redacting it + let _ = room.redact(&session.event_id, Some("Live message cancelled"), None).await; + + self.tbox.reset(); + return Ok(EditInfo::from(InfoMessage::from("Live message cancelled"))); + } else { + // Send final update with content + tracing::info!("[LIVE FINAL] Sending final update for {:?} with content: {:?}", + session.event_id, actual_msg); + let result = send_live_update_with_custom_fields( + &room, + &session.event_id, + &actual_msg, + true, // final + None, + ).await; + + match result { + Ok(event_id) => { + tracing::info!("[LIVE FINAL] Final update sent successfully: {:?}", event_id); + }, + Err(e) => { + tracing::error!("[LIVE FINAL] Failed to send final update: {:?}", e); + } + } + + self.tbox.reset(); + return Ok(EditInfo::from(InfoMessage::from("Live message completed"))); + } + } else { + // No active session - just clear input + self.tbox.reset(); + return Ok(EditInfo::from(InfoMessage::from("No active live session"))); + } + } + + let (mut msg, _cmd) = text_to_message_with_command(msg); + // Process message relations (editing, threading, replies) if let Some((_, event_id)) = &self.editing { msg.relates_to = Some(Relation::Replacement(Replacement::new( event_id.clone(), @@ -566,8 +623,7 @@ impl ChatState { msg = msg.make_reply_to(m, ForwardThread::Yes, AddMentions::No); } - // XXX: second parameter can be a locally unique transaction id. - // Useful for doing retries. + // Send the normal message let resp = room.send(msg.clone()).await.map_err(IambError::from)?; let event_id = resp.event_id; @@ -666,7 +722,7 @@ impl ChatState { } pub fn typing_notice( - &self, + &mut self, act: &EditorAction, ctx: &ProgramContext, store: &mut ProgramStore, @@ -675,11 +731,274 @@ impl ChatState { return; } - if !store.application.settings.tunables.typing_notice_send { + if store.application.settings.tunables.typing_notice_send { + store.application.worker.typing_notice(self.room_id.clone()); + } + + // Handle live message updates if in live mode + self.handle_live_typing(store); + } + + /// Check if we should send a live update based on timer + /// This is called periodically from draw() to ensure updates are sent even when user isn't typing + fn check_live_updates(&mut self, store: &mut ProgramStore) { + // Check if we have a /live message in progress + let current_text = self.tbox.get(); + let current_text_str = current_text.trim_end().to_string(); + + // Check if we're in live mode + let is_live_command = current_text_str == "/live" || current_text_str.starts_with("/live "); + if !is_live_command { + return; + } + + // Extract content + let live_content = if current_text_str == "/live" { + String::new() + } else { + current_text_str.strip_prefix("/live ").unwrap_or("").to_string() + }; + + // First check if we have a pending event ID to update + if let Some(rx) = self.live_event_id_receiver.as_mut() { + // Try to receive the event ID without blocking + match rx.try_recv() { + Ok(event_id) => { + // Update the session with the real event ID + self.live_messages.set_event_id(&self.room_id, event_id.clone()); + tracing::debug!("Updated live session with real event ID: {:?}", event_id); + self.live_event_id_receiver = None; // Clear the receiver + }, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => { + // Still waiting for event ID, continue + }, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + // Channel closed without sending, clear it + self.live_event_id_receiver = None; + } + } + } + + // Check if there's an active session + if let Some(session) = self.live_messages.get_session(&self.room_id) { + // Skip if we have a temporary event ID + if self.live_messages.has_temp_event_id(&self.room_id) { + return; + } + + // Check if content differs and enough time has passed for an update + if live_content != session.current_content && self.live_messages.should_send_update(&self.room_id) { + // Send update based on timer + if let Some(updated_session) = self.live_messages.update_session( + &self.room_id, + live_content, + ) { + self.send_live_update(updated_session, store); + } + } + } + } + + fn handle_live_typing(&mut self, store: &mut ProgramStore) { + let current_text = self.tbox.get(); + let current_text_str = current_text.trim_end().to_string(); + + // Check if we're in live mode (text starts with "/live" with or without space) + let is_live_command = current_text_str == "/live" || current_text_str.starts_with("/live "); + + if !is_live_command { + // User completely removed /live prefix - cancel the message + if let Some(session) = self.live_messages.get_session(&self.room_id) { + // Only redact if we have a real event ID (not temporary) + if !self.live_messages.has_temp_event_id(&self.room_id) { + tracing::info!("[LIVE] User removed /live prefix completely, redacting message"); + + let worker = store.application.worker.clone(); + let room_id = self.room_id.clone(); + let event_id = session.event_id.clone(); + + tokio::spawn(async move { + if let Some(room) = worker.client.get_room(&room_id) { + let _ = room.redact(&event_id, Some("Live message cancelled"), None).await; + tracing::info!("[LIVE] Message redacted: {:?}", event_id); + } + }); + } + + self.live_messages.end_session(&self.room_id); + } return; } - store.application.worker.typing_notice(self.room_id.clone()); + // Extract the actual message content after "/live " or empty if just "/live" + let live_content = if current_text_str == "/live" { + String::new() + } else { + current_text_str.strip_prefix("/live ").unwrap_or("").to_string() + }; + + tracing::debug!("[LIVE] handle_live_typing: text_len={}, live_content_len={}, has_session={}", + current_text_str.len(), + live_content.len(), + self.live_messages.get_session(&self.room_id).is_some()); + + // Enable live messaging for this session + self.live_messages.enabled = true; + + // Check if there's an active session + if let Some(session) = self.live_messages.get_session(&self.room_id) { + // We have an active session - check if we should send an update + + // Skip if we have a temporary event ID (initial message still being sent) + if self.live_messages.has_temp_event_id(&self.room_id) { + return; + } + + // Check if content changed (including becoming empty) + if live_content != session.current_content { + // Update the local echo to reflect the new content + let event_id = session.event_id.clone(); + let info = store.application.rooms.get_or_default(self.room_id.clone()); + let thread = self.scrollback.get_thread_mut(info); + + // Find and update the local echo message + let key = (MessageTimeStamp::LocalEcho, event_id.clone()); + if let Some(msg) = thread.get_mut(&key) { + // Update the message content + if let MessageEvent::Local(_, content) = &mut msg.event { + *content = Box::new(RoomMessageEventContent::text_plain(&live_content)); + tracing::info!("[LIVE ECHO] Updated local echo content to: {:?}", live_content); + } + } + + // Check if we just typed a boundary character (space, punctuation) + let just_typed_boundary = live_content.len() > session.current_content.len() && + live_content.chars().last() + .map(|c| c.is_whitespace() || c.is_ascii_punctuation()) + .unwrap_or(false); + + // Also send update if user deleted significant amount of text or made content empty + let significant_deletion = live_content.len() < session.current_content.len() - 5; + let became_empty = live_content.is_empty() && !session.current_content.is_empty(); + + if just_typed_boundary || significant_deletion || became_empty { + // Send immediately + if let Some(updated_session) = self.live_messages.update_session( + &self.room_id, + live_content, + ) { + self.send_live_update(updated_session, store); + } + } + // Timer-based updates are handled by check_live_updates() + } + } else { + // No active session - start a new one if we have content after "/live " + if !live_content.is_empty() && self.live_event_id_receiver.is_none() { + tracing::info!("[LIVE] Starting new session with text: '{}'", live_content); + + // Start a live session with current content + if let Some(session) = self.live_messages.start_session( + self.room_id.clone(), + live_content.clone(), + ) { + tracing::info!("[LIVE] Session started with event_id: {:?}", session.event_id); + // Send initial message asynchronously + let worker = store.application.worker.clone(); + let room_id = self.room_id.clone(); + let session_content = session.current_content.clone(); + + // Create a channel to communicate back the event ID + use tokio::sync::oneshot; + let (tx, rx) = oneshot::channel(); + + // Store the receiver so we can check it later + self.live_event_id_receiver = Some(rx); + + // Spawn the task to send the initial message + tokio::spawn(async move { + if let Some(room) = worker.client.get_room(&room_id) { + use serde_json::json; + let custom_fields = json!({ + "org.matrix.msc4357.session_start": true, + "org.matrix.msc4357.animation": { + "smooth": true, + } + }); + + tracing::info!("Sending initial live message: content={:?}", session_content); + + match send_live_message_with_custom_fields( + &room, + &session_content, + Some(custom_fields) + ).await { + Ok(event_id) => { + tracing::info!("Initial live message sent successfully: {:?}", event_id); + // Send event ID back through channel + let _ = tx.send(event_id); + }, + Err(e) => { + tracing::error!("Failed to send initial live message: {:?}", e); + } + } + } + }); + + return; // Don't send update immediately after initial + } + } + } + } + + + fn send_live_update(&self, session: LiveMessageSession, store: &mut ProgramStore) { + let room_id = self.room_id.clone(); + let worker = store.application.worker.clone(); + let event_id = session.event_id.clone(); + let content = session.current_content.clone(); + let update_count = session.update_count; + + // Use a zero-width space for empty content to keep the message visible + let display_content = if content.is_empty() { + "\u{200B}".to_string() // Zero-width space + } else { + content.clone() + }; + + tracing::info!("Sending live update #{}: event_id={:?}, content={:?}", + update_count, event_id, display_content); + + // Send the update in the background + tokio::spawn(async move { + if let Some(room) = worker.client.get_room(&room_id) { + use serde_json::json; + // Add custom fields for live message tracking and animation hints + let custom_fields = json!({ + "org.matrix.msc4357.update_number": update_count, + "org.matrix.msc4357.animation": { + "smooth": true, // Enable smooth character-by-character animation + } + }); + + let result = send_live_update_with_custom_fields( + &room, + &event_id, + &display_content, + false, // not final + Some(custom_fields) + ).await; + + match result { + Ok(new_event_id) => { + tracing::info!("Live update sent successfully: {:?}", new_event_id); + }, + Err(e) => { + tracing::error!("Failed to send live update: {:?}", e); + } + } + } + }); } } @@ -702,6 +1021,12 @@ macro_rules! delegate { impl WindowOps for ChatState { fn draw(&mut self, area: Rect, buf: &mut Buffer, focused: bool, store: &mut ProgramStore) { + // Check for live message updates periodically when drawing + // This ensures timer-based updates work even when user isn't typing + if self.live_messages.get_session(&self.room_id).is_some() { + self.check_live_updates(store); + } + Chat::new(store).focus(focused).render(area, buf, self) } @@ -727,6 +1052,9 @@ impl WindowOps for ChatState { reply_to: None, editing: None, + + live_messages: LiveMessageManager::new(), + live_event_id_receiver: None, } } @@ -975,7 +1303,11 @@ impl StatefulWidget for Chat<'_> { Paragraph::new(desc_spans).render(descarea, buf); } - let prompt = if self.focused { "> " } else { " " }; + let prompt = if self.focused { + "> " + } else { + " " + }; let tbox = TextBox::new().prompt(prompt); tbox.render(textarea, buf, &mut state.tbox); diff --git a/src/worker.rs b/src/worker.rs index 22a6fdbf..514c9f00 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -16,7 +16,7 @@ use gethostname::gethostname; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::Semaphore; use tokio::task::JoinHandle; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use url::Url; use matrix_sdk::{ @@ -51,7 +51,7 @@ use matrix_sdk::{ room::{ encryption::RoomEncryptionEventContent, member::OriginalSyncRoomMemberEvent, - message::{MessageType, RoomMessageEventContent}, + message::{MessageType, Relation, RoomMessageEvent, RoomMessageEventContent}, name::RoomNameEventContent, redaction::OriginalSyncRoomRedactionEvent, }, @@ -71,6 +71,7 @@ use matrix_sdk::{ serde::Raw, EventEncryptionAlgorithm, EventId, + MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, @@ -116,7 +117,7 @@ const IAMB_DEVICE_NAME: &str = "iamb"; const IAMB_USER_AGENT: &str = "iamb"; const MIN_MSG_LOAD: u32 = 50; -type MessageFetchResult = IambResult<(Option, Vec<(AnyTimelineEvent, Vec)>)>; +type MessageFetchResult = IambResult<(Option, Vec<(AnyTimelineEvent, Vec, bool)>)>; fn initial_devname() -> String { format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy()) @@ -294,7 +295,12 @@ async fn load_older_one( let mut msgs = vec![]; for ev in chunk.into_iter() { - let Ok(msg) = ev.into_raw().deserialize() else { + let raw = ev.into_raw(); + + // Check if this is a live message using centralized detection + let is_live = crate::message::live_detection::has_live_marker_in_raw(&raw); + + let Ok(msg) = raw.deserialize() else { continue; }; @@ -311,7 +317,7 @@ async fn load_older_one( }; let msg = msg.into_full_event(room_id.to_owned()); - msgs.push((msg, receipts)); + msgs.push((msg, receipts, is_live)); } Ok((end, msgs)) @@ -333,7 +339,12 @@ fn load_insert( match res { Ok((fetch_id, msgs)) => { - for (msg, receipts) in msgs.into_iter() { + // First pass: collect all messages and replacements separately + let mut normal_messages = Vec::new(); + let mut replacement_messages = Vec::new(); + let mut other_events = Vec::new(); + + for (msg, receipts, is_live) in msgs.into_iter() { let sender = msg.sender().to_owned(); let _ = presences.get_or_default(sender); @@ -343,29 +354,140 @@ fn load_insert( match msg { AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomEncrypted(msg)) => { - info.insert_encrypted(msg); + other_events.push(AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomEncrypted(msg))); }, AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomMessage(msg)) => { - info.insert_with_preview( - room_id.clone(), - store.clone(), - picker.clone(), - msg, - settings, - client.media(), - ); + // Check if this is a replacement/edit event + if let RoomMessageEvent::Original(ref orig) = msg { + if let Some(Relation::Replacement(_)) = &orig.content.relates_to { + // This is an edit - save it for later + // We'll process live status only for the LATEST edit + replacement_messages.push((msg, is_live)); + } else { + // Normal message - for history loading, we shouldn't mark as live + // Live messages should always have at least one replacement + normal_messages.push(msg); + } + } else { + // Other message types + normal_messages.push(msg); + } }, AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::Reaction(ev)) => { - info.insert_reaction(ev); + other_events.push(AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::Reaction(ev))); }, AnyTimelineEvent::MessageLike(_) => { continue; }, AnyTimelineEvent::State(msg) => { if settings.tunables.state_event_display { - info.insert_any_state(msg.into()); + other_events.push(AnyTimelineEvent::State(msg)); + } + }, + } + } + + // Second pass: insert normal messages first + info!("[LOADING] Processing {} normal messages", normal_messages.len()); + for msg in normal_messages { + let event_id = msg.event_id(); + let body = if let RoomMessageEvent::Original(ref orig) = msg { + Some(orig.content.body()) + } else { + None + }; + info!( + "[MESSAGE] Loading normal message {} with body: {:?}", + event_id, + body + ); + info.insert_with_preview( + room_id.clone(), + store.clone(), + picker.clone(), + msg, + settings, + client.media(), + ); + } + + // Third pass: apply replacements after all messages are loaded + // Group replacements by the event they're replacing and keep only the latest + info!("[LOADING] Processing {} replacement messages", replacement_messages.len()); + + // Group replacements by target event ID, keeping track of the latest one + let mut latest_replacements: HashMap = HashMap::new(); + + for (msg, is_live) in replacement_messages { + if let RoomMessageEvent::Original(ref orig) = msg { + if let Some(Relation::Replacement(ref repl)) = &orig.content.relates_to { + let target_id = repl.event_id.clone(); + let timestamp = orig.origin_server_ts; + + // Only keep this replacement if it's newer than what we have + match latest_replacements.get(&target_id) { + Some((existing_ts, _, _)) if existing_ts > ×tamp => { + // We already have a newer replacement, skip this one + info!( + "[M.REPLACE] Skipping older replacement {} -> {} (ts: {:?}, is_live: {})", + msg.event_id(), + target_id, + timestamp, + is_live + ); + continue; + } + _ => { + // This is newer or first, keep it + info!( + "[M.REPLACE] Keeping replacement {} -> {} (ts: {:?}, is_live: {})", + msg.event_id(), + target_id, + timestamp, + is_live + ); + latest_replacements.insert(target_id.clone(), (timestamp, msg.clone(), is_live)); + } } + } + } + } + + // Apply only the latest replacement for each message and update live status + info!("[LOADING] Applying {} latest replacements", latest_replacements.len()); + for (target_id, (_timestamp, msg, is_live)) in latest_replacements { + if let RoomMessageEvent::Original(ref orig) = msg { + info!( + "[M.REPLACE] Applying LATEST replacement {} -> {} with body: {:?}, is_live: {}", + msg.event_id(), + target_id, + orig.content.body(), + is_live + ); + + // Update live_message_ids based on the LATEST edit's live status + if is_live { + info.live_message_ids.insert(target_id.clone()); + } else { + info.live_message_ids.remove(&target_id); + } + } + info.insert(msg); + } + + // Fourth pass: handle other events + for event in other_events { + match event { + AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::RoomEncrypted(msg)) => { + info.insert_encrypted(msg); + }, + AnyTimelineEvent::MessageLike(AnyMessageLikeEvent::Reaction(ev)) => { + info.insert_reaction(ev); + }, + AnyTimelineEvent::State(msg) => { + info.insert_any_state(msg.into()); }, + _ => {} } } @@ -976,18 +1098,26 @@ impl ClientWorker { ); let _ = self.client.add_event_handler( - |ev: SyncMessageLikeEvent, + |ev: Raw>, room: MatrixRoom, client: Client, store: Ctx| { async move { let room_id = room.room_id(); - if let Some(msg) = ev.as_original() { - if let MessageType::VerificationRequest(_) = msg.content.msgtype { + // Check for live marker in raw JSON + let is_live = crate::message::live_detection::has_live_marker_in_raw(&ev); + + // Deserialize the event + let Ok(msg_ev) = ev.deserialize() else { + return; + }; + + if let Some(orig) = msg_ev.as_original() { + if let MessageType::VerificationRequest(_) = orig.content.msgtype { if let Some(request) = client .encryption() - .get_verification_request(ev.sender(), ev.event_id()) + .get_verification_request(msg_ev.sender(), msg_ev.event_id()) .await { request.accept().await.expect("Failed to accept request"); @@ -997,15 +1127,30 @@ impl ClientWorker { let mut locked = store.lock().await; - let sender = ev.sender().to_owned(); + let sender = msg_ev.sender().to_owned(); let _ = locked.application.presences.get_or_default(sender); let ChatStore { rooms, picker, settings, .. } = &mut locked.application; let info = rooms.get_or_default(room_id.to_owned()); - update_event_receipts(info, &room, ev.event_id()).await; + update_event_receipts(info, &room, msg_ev.event_id()).await; + + // Update live_message_ids based on event type + if let Some(orig) = msg_ev.as_original() { + if let Some(Relation::Replacement(ref repl)) = &orig.content.relates_to { + // Update live status for the target message + if is_live { + info.live_message_ids.insert(repl.event_id.clone()); + } else { + info.live_message_ids.remove(&repl.event_id); + } + } else if is_live { + // New live message + info.live_message_ids.insert(msg_ev.event_id().to_owned()); + } + } - let full_ev = ev.into_full_event(room_id.to_owned()); + let full_ev = msg_ev.into_full_event(room_id.to_owned()); info.insert_with_preview( room_id.to_owned(), store.clone(), From af6994f90b8d685023dd1d717c7b7cde840bcb55 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Fri, 19 Sep 2025 10:48:32 -0300 Subject: [PATCH 2/3] Exclude live messages from notifications Live messages (MSC4357) are now filtered out in the notification handler to prevent notifications for incomplete messages that are still being typed. This fixes the issue where users would receive notifications for messages with just a few letters. Also simplified the worker.rs load_insert logic by clarifying comments and relying on the centralized live detection. --- src/notifications.rs | 6 ++++++ src/worker.rs | 23 +++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/notifications.rs b/src/notifications.rs index 2a1098cc..713da3d0 100644 --- a/src/notifications.rs +++ b/src/notifications.rs @@ -76,6 +76,12 @@ pub async fn register_notifications( let room_id = room.room_id().to_owned(); match notification.event { RawAnySyncOrStrippedTimelineEvent::Sync(e) => { + // Skip notifications for live messages + if crate::message::live_detection::has_live_marker_in_raw(&e) { + tracing::debug!("Skipping notification for live message"); + return; + } + match parse_full_notification(e, room, show_message).await { Ok((summary, body, server_ts)) => { if server_ts < startup_ts { diff --git a/src/worker.rs b/src/worker.rs index 514c9f00..0fc6a527 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -339,7 +339,7 @@ fn load_insert( match res { Ok((fetch_id, msgs)) => { - // First pass: collect all messages and replacements separately + // First pass: collect all messages and categorize them let mut normal_messages = Vec::new(); let mut replacement_messages = Vec::new(); let mut other_events = Vec::new(); @@ -360,16 +360,15 @@ fn load_insert( // Check if this is a replacement/edit event if let RoomMessageEvent::Original(ref orig) = msg { if let Some(Relation::Replacement(_)) = &orig.content.relates_to { - // This is an edit - save it for later - // We'll process live status only for the LATEST edit + // This is an edit - save it for later processing + // The is_live flag is already determined from the raw event replacement_messages.push((msg, is_live)); } else { - // Normal message - for history loading, we shouldn't mark as live - // Live messages should always have at least one replacement + // Normal message normal_messages.push(msg); } } else { - // Other message types + // Other message types (redacted, etc.) normal_messages.push(msg); } }, @@ -415,7 +414,7 @@ fn load_insert( // Group replacements by the event they're replacing and keep only the latest info!("[LOADING] Processing {} replacement messages", replacement_messages.len()); - // Group replacements by target event ID, keeping track of the latest one + // Group replacements by target event ID, keeping only the latest one let mut latest_replacements: HashMap = HashMap::new(); for (msg, is_live) in replacement_messages { @@ -424,10 +423,10 @@ fn load_insert( let target_id = repl.event_id.clone(); let timestamp = orig.origin_server_ts; - // Only keep this replacement if it's newer than what we have + // Keep only the latest replacement for each target match latest_replacements.get(&target_id) { Some((existing_ts, _, _)) if existing_ts > ×tamp => { - // We already have a newer replacement, skip this one + // Skip older replacement info!( "[M.REPLACE] Skipping older replacement {} -> {} (ts: {:?}, is_live: {})", msg.event_id(), @@ -435,7 +434,6 @@ fn load_insert( timestamp, is_live ); - continue; } _ => { // This is newer or first, keep it @@ -453,7 +451,7 @@ fn load_insert( } } - // Apply only the latest replacement for each message and update live status + // Apply only the latest replacement for each message info!("[LOADING] Applying {} latest replacements", latest_replacements.len()); for (target_id, (_timestamp, msg, is_live)) in latest_replacements { if let RoomMessageEvent::Original(ref orig) = msg { @@ -465,7 +463,8 @@ fn load_insert( is_live ); - // Update live_message_ids based on the LATEST edit's live status + // Update live status based on the replacement's is_live flag + // The is_live flag comes from centralized detection in load_older_one if is_live { info.live_message_ids.insert(target_id.clone()); } else { From 194e758bb846f9e9bb324bd5b6a3ee50ee3ddea1 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Sun, 21 Sep 2025 18:23:14 -0300 Subject: [PATCH 3/3] Optimize message loading by filtering redundant edits Since messages come in reverse chronological order, the first edit we encounter is the latest. This commit: - Tracks which edits we've already seen the latest version of - Skips loading older edits when we already have a newer one - Skips edits for messages that have bundled replacements - Significantly reduces the number of events to process, especially for live messages This optimization works for both encrypted and unencrypted rooms, as we're just filtering duplicate edits regardless of content. --- src/worker.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/src/worker.rs b/src/worker.rs index 0fc6a527..8ff5ea56 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -2,7 +2,7 @@ //! //! The worker thread handles asynchronous work, and can receive messages from the main thread that //! block on a reply from the async worker. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::fmt::{Debug, Formatter}; use std::ops::{Deref, DerefMut}; @@ -59,7 +59,9 @@ use matrix_sdk::{ typing::SyncTypingEvent, AnyInitialStateEvent, AnyMessageLikeEvent, + AnySyncMessageLikeEvent, AnySyncStateEvent, + AnySyncTimelineEvent, AnyTimelineEvent, EmptyStateKey, InitialStateEvent, @@ -293,6 +295,11 @@ async fn load_older_one( let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?; let mut msgs = vec![]; + // Track which events we've already seen the latest version of + // Since events come in reverse chronological order, the first edit we see is the latest + let mut seen_latest_edits: HashMap = HashMap::new(); + // Track original events that have bundled replacements + let mut has_bundled_replacement: HashSet = HashSet::new(); for ev in chunk.into_iter() { let raw = ev.into_raw(); @@ -300,10 +307,56 @@ async fn load_older_one( // Check if this is a live message using centralized detection let is_live = crate::message::live_detection::has_live_marker_in_raw(&raw); + // Check for bundled relations + let raw_json_str = raw.json().get(); + if let Ok(json_value) = serde_json::from_str::(raw_json_str) { + if let Some(unsigned) = json_value.get("unsigned") { + if let Some(relations) = unsigned.get("m.relations") { + if relations.get("m.replace").is_some() { + // This event has bundled replacement - track it + if let Some(event_id_val) = json_value.get("event_id") { + if let Some(event_id_str) = event_id_val.as_str() { + if let Ok(event_id) = OwnedEventId::try_from(event_id_str) { + has_bundled_replacement.insert(event_id); + } + } + } + } + } + } + } + let Ok(msg) = raw.deserialize() else { continue; }; + // Skip obsolete edits + if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ref room_msg)) = msg { + if let SyncMessageLikeEvent::Original(ref orig) = room_msg { + if let Some(Relation::Replacement(ref repl)) = &orig.content.relates_to { + let target_id = &repl.event_id; + + // Check if we already have a newer edit for this target + if seen_latest_edits.contains_key(target_id) { + // Skip this older edit + tracing::debug!("Skipping obsolete edit {} for {}", msg.event_id(), target_id); + continue; + } + + // Check if the original has bundled replacement + if has_bundled_replacement.contains(target_id) { + // Skip this edit as the original already has bundled latest version + tracing::debug!("Skipping edit {} - original {} has bundled replacement", + msg.event_id(), target_id); + continue; + } + + // This is the latest edit we've seen for this target + seen_latest_edits.insert(target_id.clone(), msg.event_id().to_owned()); + } + } + } + let event_id = msg.event_id(); let receipts = match room .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)