diff --git a/moq-transport/src/serve/datagram.rs b/moq-transport/src/serve/datagram.rs index f7ff2697..f4131866 100644 --- a/moq-transport/src/serve/datagram.rs +++ b/moq-transport/src/serve/datagram.rs @@ -1,9 +1,13 @@ -use std::{fmt, sync::Arc}; +use std::{collections::VecDeque, fmt, sync::Arc}; use crate::watch::State; use super::{ServeError, Track}; +/// Maximum number of datagrams to buffer before dropping old ones. +/// This prevents unbounded memory growth while allowing burst handling. +const MAX_DATAGRAM_BUFFER: usize = 1024; + pub struct Datagrams { pub track: Arc, } @@ -20,11 +24,14 @@ impl Datagrams { } struct DatagramsState { - // The latest datagram - latest: Option, + // Queue of pending datagrams (FIFO) + queue: VecDeque, + + // Global write counter - incremented each time a datagram is added + write_count: u64, - // Increased each time datagram changes. - epoch: u64, + // Number of datagrams dropped from front due to buffer overflow + dropped_count: u64, // Set when the writer or all readers are dropped. closed: Result<(), ServeError>, @@ -33,8 +40,9 @@ struct DatagramsState { impl Default for DatagramsState { fn default() -> Self { Self { - latest: None, - epoch: 0, + queue: VecDeque::with_capacity(256), + write_count: 0, + dropped_count: 0, closed: Ok(()), } } @@ -53,8 +61,14 @@ impl DatagramsWriter { pub fn write(&mut self, datagram: Datagram) -> Result<(), ServeError> { let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?; - state.latest = Some(datagram); - state.epoch += 1; + // If queue is full, drop oldest datagram to make room + if state.queue.len() >= MAX_DATAGRAM_BUFFER { + state.queue.pop_front(); + state.dropped_count += 1; + } + + state.queue.push_back(datagram); + state.write_count += 1; Ok(()) } @@ -75,15 +89,23 @@ pub struct DatagramsReader { state: State, pub track: Arc, - epoch: u64, + // Track how many datagrams this reader has consumed (absolute count) + // This allows us to calculate our position in the queue + consumed_count: u64, } impl DatagramsReader { fn new(state: State, track: Arc) -> Self { + // Initialize consumed_count to current dropped_count so we start from current position + let initial_dropped = { + let state = state.lock(); + state.dropped_count + }; + Self { state, track, - epoch: 0, + consumed_count: initial_dropped, } } @@ -91,9 +113,24 @@ impl DatagramsReader { loop { { let state = self.state.lock(); - if self.epoch < state.epoch { - self.epoch = state.epoch; - return Ok(state.latest.clone()); + + // Calculate our index in the current queue + // queue index = consumed_count - dropped_count + // If consumed_count < dropped_count, we missed some datagrams (they were dropped) + let queue_index = if self.consumed_count >= state.dropped_count { + (self.consumed_count - state.dropped_count) as usize + } else { + // We're behind - some datagrams were dropped that we haven't seen + // Skip to the beginning of current queue + self.consumed_count = state.dropped_count; + 0 + }; + + // Check if there's a datagram we haven't read yet + if queue_index < state.queue.len() { + let datagram = state.queue.get(queue_index).cloned(); + self.consumed_count += 1; + return Ok(datagram); } state.closed.clone()?; @@ -106,12 +143,12 @@ impl DatagramsReader { } } - // Returns the largest group/sequence + // Returns the largest group/sequence from the most recent datagram pub fn latest(&self) -> Option<(u64, u64)> { let state = self.state.lock(); state - .latest - .as_ref() + .queue + .back() .map(|datagram| (datagram.group_id, datagram.object_id)) } } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 49c59e8d..0ee2021f 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -207,18 +207,50 @@ impl Subscriber { /// Handle the reception of a SubscribeOk message from the publisher. fn recv_subscribe_ok(&mut self, msg: &message::SubscribeOk) -> Result<(), SessionError> { - if let Some(subscribe) = self.subscribes.lock().unwrap().get_mut(&msg.id) { + let subscribes = self.subscribes.lock().unwrap(); + let subscribe_ids: Vec = subscribes.keys().cloned().collect(); + log::debug!( + "[SUBSCRIBER] recv_subscribe_ok: id={}, track_alias={}, existing_subscribe_ids={:?}", + msg.id, + msg.track_alias, + subscribe_ids + ); + + if let Some(subscribe) = subscribes.get(&msg.id) { // Map track alias to subscription id for quick lookup when receiving streams/datagrams + log::debug!( + "[SUBSCRIBER] recv_subscribe_ok: MAPPING track_alias={} -> subscribe_id={}", + msg.track_alias, + msg.id + ); + drop(subscribes); // Release the lock before acquiring another + self.subscribe_alias_map .lock() .unwrap() .insert(msg.track_alias, msg.id); + // Log current alias map state + let alias_map = self.subscribe_alias_map.lock().unwrap(); + log::debug!( + "[SUBSCRIBER] recv_subscribe_ok: alias_map now contains: {:?}", + alias_map.iter().collect::>() + ); + drop(alias_map); + // Notify waiting tasks that the alias map has been updated self.subscribe_alias_notify.notify_waiters(); // Notify the subscribe of the successful subscription - subscribe.ok(msg.track_alias)?; + let mut subscribes = self.subscribes.lock().unwrap(); + if let Some(subscribe) = subscribes.get_mut(&msg.id) { + subscribe.ok(msg.track_alias)?; + } + } else { + log::warn!( + "[SUBSCRIBER] recv_subscribe_ok: subscribe_id={} NOT FOUND in subscribes map!", + msg.id + ); } Ok(()) @@ -487,7 +519,7 @@ impl Subscriber { // Check for Immutable Extensions (type 0xB = 11) if object.extension_headers.has(0xB) { - log::info!( + log::debug!( "[SUBSCRIBER] recv_subgroup: object #{} contains IMMUTABLE EXTENSIONS (type 0xB) - will be forwarded", object_count + 1 ); @@ -501,7 +533,7 @@ impl Subscriber { // Check for Prior Group ID Gap (type 0x3C = 60) if object.extension_headers.has(0x3C) { - log::info!( + log::debug!( "[SUBSCRIBER] recv_subgroup: object #{} contains PRIOR GROUP ID GAP (type 0x3C)", object_count + 1 ); @@ -624,7 +656,7 @@ impl Subscriber { object_count += 1; } - log::info!( + log::debug!( "[SUBSCRIBER] recv_subgroup: completed subgroup (group_id={}, subgroup_id={}, {} objects received)", subgroup_writer.info.group_id, subgroup_writer.info.subgroup_id, @@ -659,7 +691,7 @@ impl Subscriber { // Check for Immutable Extensions (type 0xB = 11) if ext_headers.has(0xB) { - log::info!( + log::debug!( "[SUBSCRIBER] recv_datagram: datagram contains IMMUTABLE EXTENSIONS (type 0xB)" ); if let Some(immutable_ext) = ext_headers.get(0xB) { @@ -672,7 +704,7 @@ impl Subscriber { // Check for Prior Group ID Gap (type 0x3C = 60) if ext_headers.has(0x3C) { - log::info!( + log::debug!( "[SUBSCRIBER] recv_datagram: datagram contains PRIOR GROUP ID GAP (type 0x3C)" ); if let Some(gap_ext) = ext_headers.get(0x3C) { @@ -684,6 +716,16 @@ impl Subscriber { } } + // Log current alias map state before lookup + { + let alias_map = self.subscribe_alias_map.lock().unwrap(); + log::debug!( + "[SUBSCRIBER] recv_datagram: looking up track_alias={}, current alias_map={:?}", + datagram.track_alias, + alias_map.iter().collect::>() + ); + } + // Look up the subscribe id for this track alias if let Some(subscribe_id) = self .get_subscribe_id_by_alias(datagram.track_alias, Some(DEFAULT_ALIAS_WAIT_TIME_MS)) @@ -691,25 +733,23 @@ impl Subscriber { { // Look up the subscribe by id if let Some(subscribe) = self.subscribes.lock().unwrap().get_mut(&subscribe_id) { - log::trace!( - "[SUBSCRIBER] recv_datagram: track_alias={}, group_id={}, object_id={}, publisher_priority={}, status={}, payload_length={}", + log::debug!( + "[SUBSCRIBER] recv_datagram: FOUND track_alias={} -> subscribe_id={}, forwarding datagram", datagram.track_alias, - datagram.group_id, - datagram.object_id.unwrap_or(0), - datagram.publisher_priority, - datagram.status.as_ref().map_or("None".to_string(), |s| format!("{:?}", s)), - datagram.payload.as_ref().map_or(0, |p| p.len())); + subscribe_id + ); subscribe.datagram(datagram)?; } } else { + // Log the full state for debugging + let alias_map = self.subscribe_alias_map.lock().unwrap(); + let subscribes = self.subscribes.lock().unwrap(); log::warn!( - "[SUBSCRIBER] recv_datagram: discarded due to unknown track_alias: track_alias={}, group_id={}, object_id={}, publisher_priority={}, status={}, payload_length={}", + "[SUBSCRIBER] recv_datagram: UNKNOWN track_alias={}, alias_map={:?}, subscribe_ids={:?}", datagram.track_alias, - datagram.group_id, - datagram.object_id.unwrap_or(0), - datagram.publisher_priority, - datagram.status.as_ref().map_or("None".to_string(), |s| format!("{:?}", s)), - datagram.payload.as_ref().map_or(0, |p| p.len())); + alias_map.iter().collect::>(), + subscribes.keys().collect::>() + ); } Ok(())