From 5d1f42461c4719d2bde7ad40d33d1aa9e83abfef Mon Sep 17 00:00:00 2001 From: Manish Date: Fri, 19 Dec 2025 00:12:42 +0530 Subject: [PATCH 1/6] refactor: simplified remote manager --- Cargo.lock | 1 + moq-relay-ietf/Cargo.toml | 2 +- moq-relay-ietf/src/producer.rs | 27 +- moq-relay-ietf/src/relay.rs | 19 +- moq-relay-ietf/src/remote.rs | 567 +++++++++++++-------------------- 5 files changed, 237 insertions(+), 379 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65af2d8c..cfc98acc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1308,6 +1308,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", + "tokio-util", "tower-http", "tracing", "tracing-subscriber", diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index 0df99745..773f0ba4 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -30,7 +30,7 @@ url = "2" # Async stuff tokio = { version = "1", features = ["full"] } -# tokio-util = "0.7" +tokio-util = "0.7" futures = "0.3" async-trait = "0.1" diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index 23ea49f3..8f17352a 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -4,18 +4,18 @@ use moq_transport::{ session::{Publisher, SessionError, Subscribed, TrackStatusRequested}, }; -use crate::{Locals, RemotesConsumer}; +use crate::{Locals, RemoteManager}; /// Producer of tracks to a remote Subscriber #[derive(Clone)] pub struct Producer { publisher: Publisher, locals: Locals, - remotes: Option, + remotes: RemoteManager, } impl Producer { - pub fn new(publisher: Publisher, locals: Locals, remotes: Option) -> Self { + pub fn new(publisher: Publisher, locals: Locals, remotes: RemoteManager) -> Self { Self { publisher, locals, @@ -89,21 +89,16 @@ impl Producer { } } - if let Some(remotes) = self.remotes { - // Check remote tracks second, and serve from remote if possible - match remotes.route(&namespace).await { - Ok(remote) => { - if let Some(remote) = remote { - if let Some(track) = remote.subscribe(&namespace, &track_name)? { - log::info!("serving subscribe from remote: {:?}", track.info); - return Ok(subscribed.serve(track.reader).await?); - } - } - } - Err(e) => { - log::error!("failed to route to remote: {}", e); + match self.remotes.subscribe(&namespace, &track_name).await { + Ok(track) => { + if let Some(track) = track { + log::info!("serving subscribe from remote: {:?}", track.info); + return Ok(subscribed.serve(track).await?); } } + Err(e) => { + log::error!("failed to route to remote: {}", e); + } } // Track not found - close the subscription with not found error let err = ServeError::not_found_ctx(format!( diff --git a/moq-relay-ietf/src/relay.rs b/moq-relay-ietf/src/relay.rs index f40e576f..5d913ec2 100644 --- a/moq-relay-ietf/src/relay.rs +++ b/moq-relay-ietf/src/relay.rs @@ -6,9 +6,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_native_ietf::quic::{self, Endpoint}; use url::Url; -use crate::{ - Consumer, Coordinator, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, Session, -}; +use crate::{Consumer, Coordinator, Locals, Producer, RemoteManager, Session}; // A type alias for boxed future type ServerFuture = Pin< @@ -56,7 +54,7 @@ pub struct Relay { announce_url: Option, mlog_dir: Option, locals: Locals, - remotes: Option<(RemotesProducer, RemotesConsumer)>, + remotes: RemoteManager, coordinator: Arc, } @@ -101,18 +99,14 @@ impl Relay { .collect::>(); // Create remote manager - uses coordinator for namespace lookups - let remotes = Remotes { - coordinator: config.coordinator.clone(), - quic: remote_clients[0].clone(), - } - .produce(); + let remotes = RemoteManager::new(config.coordinator.clone(), remote_clients); Ok(Self { quic_endpoints: endpoints, announce_url: config.announce, mlog_dir: config.mlog_dir, locals, - remotes: Some(remotes), + remotes, coordinator: config.coordinator, }) } @@ -122,10 +116,7 @@ impl Relay { let mut tasks = FuturesUnordered::new(); // Split remotes producer/consumer and spawn producer task - let remotes = self.remotes.map(|(producer, consumer)| { - tasks.push(producer.run().boxed()); - consumer - }); + let remotes = self.remotes; // Start the forwarder, if any let forward_producer = if let Some(url) = &self.announce_url { diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index 05fe407c..1d69bcb0 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -1,416 +1,287 @@ use std::collections::HashMap; - -use std::collections::VecDeque; -use std::fmt; -use std::ops; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::sync::Weak; -use futures::stream::FuturesUnordered; -use futures::FutureExt; -use futures::StreamExt; use moq_native_ietf::quic; use moq_transport::coding::TrackNamespace; -use moq_transport::serve::{Track, TrackReader, TrackWriter}; -use moq_transport::watch::State; +use moq_transport::serve::{Track, TrackReader}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use url::Url; use crate::Coordinator; -/// Information about remote origins. -pub struct Remotes { - /// The client we use to fetch/store origin information. - pub coordinator: Arc, - - // A QUIC endpoint we'll use to fetch from other origins. - pub quic: quic::Client, -} - -impl Remotes { - pub fn produce(self) -> (RemotesProducer, RemotesConsumer) { - let (send, recv) = State::default().split(); - let info = Arc::new(self); - - let producer = RemotesProducer::new(info.clone(), send); - let consumer = RemotesConsumer::new(info, recv); - - (producer, consumer) - } -} - -#[derive(Default)] -struct RemotesState { - lookup: HashMap, - requested: VecDeque, -} - -// Clone for convenience, but there should only be one instance of this +/// Manages connections to remote relays. +/// +/// When a subscription request comes in for a namespace that isn't local, +/// RemoteManager uses the coordinator to find which remote relay serves it, +/// establishes a connection if needed, and subscribes to the track. #[derive(Clone)] -pub struct RemotesProducer { - info: Arc, - state: State, +pub struct RemoteManager { + coordinator: Arc, + clients: Vec, + remotes: Arc>>, } -impl RemotesProducer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - /// Block until the next remote requested by a consumer. - async fn next(&mut self) -> Option { - loop { - { - let state = self.state.lock(); - if !state.requested.is_empty() { - return state.into_mut()?.requested.pop_front(); - } - - state.modified()? - } - .await; +impl RemoteManager { + /// Create a new RemoteManager. + pub fn new(coordinator: Arc, clients: Vec) -> Self { + Self { + coordinator, + clients, + remotes: Arc::new(Mutex::new(HashMap::new())), } } - /// Run the remotes producer to serve remote requests. - pub async fn run(mut self) -> anyhow::Result<()> { - let mut tasks = FuturesUnordered::new(); - - loop { - tokio::select! { - Some(mut remote) = self.next() => { - let url = remote.url.clone(); - - // Spawn a task to serve the remote - tasks.push(async move { - let info = remote.info.clone(); + /// Subscribe to a track from a remote relay. + /// + /// This will: + /// 1. Use the coordinator to lookup which relay serves the namespace + /// 2. Connect to that relay if not already connected + /// 3. Subscribe to the specific track + /// + /// Returns None if the namespace isn't found in any remote relay. + pub async fn subscribe( + &self, + namespace: &TrackNamespace, + track_name: &str, + ) -> anyhow::Result> { + // Ask coordinator where this namespace lives + let (origin, client) = match self.coordinator.lookup(namespace).await { + Ok((origin, client)) => (origin, client), + Err(e) => { + log::error!("failed to lookup namespace: {}", e); + return Ok(None); // Namespace not found anywhere + } + }; - log::warn!("serving remote: {:?}", info); + let url = origin.url(); - // Run the remote producer - if let Err(err) = remote.run().await { - log::warn!("failed serving remote: {:?}, error: {}", info, err); - } + // Get or create a connection to the remote relay + let remote = match self.get_or_connect(&url, client.as_ref()).await { + Ok(remote) => remote, + Err(e) => { + log::error!("failed to connect to remote relay {}: {}", url, e); + // Remove failed connection from cache + self.remove(&url).await; + return Err(e); + } + }; - url - }); + // Subscribe to the track on the remote + match remote + .subscribe(namespace.clone(), track_name.to_string()) + .await + { + Ok(reader) => Ok(reader), + Err(e) => { + // If subscription fails, check if connection is dead and remove it + if !remote.is_connected() { + log::warn!("remote connection {} is dead, removing from cache", url); + self.remove(&url).await; } - - // Handle finished remote producers - res = tasks.next(), if !tasks.is_empty() => { - let url = res.unwrap(); - - if let Some(mut state) = self.state.lock_mut() { - state.lookup.remove(&url); - } - }, - else => return Ok(()), + Err(e) } } } -} - -impl ops::Deref for RemotesProducer { - type Target = Remotes; - - fn deref(&self) -> &Self::Target { - &self.info - } -} - -#[derive(Clone)] -pub struct RemotesConsumer { - pub info: Arc, - state: State, -} -impl RemotesConsumer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - /// Route to a remote origin based on the namespace. - pub async fn route( + /// Get an existing remote connection or create a new one. + async fn get_or_connect( &self, - namespace: &TrackNamespace, - ) -> anyhow::Result> { - // Always fetch the origin instead of using the (potentially invalid) cache. - let (origin, client) = self.coordinator.lookup(namespace).await?; - - // Check if we already have a remote for this origin - let state = self.state.lock(); - if let Some(remote) = state.lookup.get(&origin.url()).cloned() { - return Ok(Some(remote)); + url: &Url, + client: Option<&quic::Client>, + ) -> anyhow::Result { + let mut remotes = self.remotes.lock().await; + + // Check if we already have a connection + if let Some(remote) = remotes.get(url) { + if remote.is_connected() { + return Ok(remote.clone()); + } + // Connection is dead, remove it + log::info!("removing dead connection to remote relay: {}", url); + remotes.remove(url); } - // Create a new remote for this origin - let mut state = match state.into_mut() { - Some(state) => state, - None => return Ok(None), + // Get client, with proper error handling for empty clients vec + let client = match client { + Some(c) => c, + None => self.clients.first().ok_or_else(|| { + anyhow::anyhow!("no QUIC clients configured for remote connections") + })?, }; - let remote = Remote { - url: origin.url(), - remotes: self.info.clone(), - client, - }; - - // Produce the remote - let (writer, reader) = remote.produce(); - state.requested.push_back(writer); + // Create a new connection with its own QUIC client + log::info!("connecting to remote relay: {}", url); + let remote = Remote::connect(url.clone(), client).await?; - // Insert the remote into our Map - state.lookup.insert(origin.url(), reader.clone()); + remotes.insert(url.clone(), remote.clone()); - Ok(Some(reader)) + Ok(remote) } -} -impl ops::Deref for RemotesConsumer { - type Target = Remotes; - - fn deref(&self) -> &Self::Target { - &self.info + /// Remove a remote connection (called when connection fails). + pub async fn remove(&self, url: &Url) { + let mut remotes = self.remotes.lock().await; + if let Some(remote) = remotes.remove(url) { + // Cancel the session task when removing + remote.shutdown(); + } } -} - -pub struct Remote { - pub remotes: Arc, - pub url: Url, - pub client: Option, -} -impl fmt::Debug for Remote { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Remote") - .field("url", &self.url.to_string()) - .finish() + /// Shutdown all remote connections. + pub async fn shutdown(&self) { + let mut remotes = self.remotes.lock().await; + for (url, remote) in remotes.drain() { + log::info!("shutting down remote connection: {}", url); + remote.shutdown(); + } } } -impl ops::Deref for Remote { - type Target = Remotes; - - fn deref(&self) -> &Self::Target { - &self.remotes - } +/// A connection to a single remote relay with its own QUIC client. +#[derive(Clone)] +pub struct Remote { + url: Url, + subscriber: moq_transport::session::Subscriber, + /// Track subscriptions - maps (namespace, track_name) to track reader + tracks: Arc>>, + /// Flag indicating if the connection is still alive + connected: Arc, + /// Cancellation token for the session task + cancel: CancellationToken, } impl Remote { - /// Create a new broadcast. - pub fn produce(self) -> (RemoteProducer, RemoteConsumer) { - let (send, recv) = State::default().split(); - let info = Arc::new(self); - - let consumer = RemoteConsumer::new(info.clone(), recv); - let producer = RemoteProducer::new(info, send); - - (producer, consumer) - } -} - -#[derive(Default)] -struct RemoteState { - tracks: HashMap<(TrackNamespace, String), RemoteTrackWeak>, - requested: VecDeque, -} - -pub struct RemoteProducer { - pub info: Arc, - state: State, -} - -impl RemoteProducer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - pub async fn run(&mut self) -> anyhow::Result<()> { - let client = if let Some(client) = &self.info.client { - client - } else { - &self.quic - }; - // TODO reuse QUIC and MoQ sessions - let (session, _quic_client_initial_cid) = client.connect(&self.url, None).await?; + /// Connect to a remote relay with a dedicated QUIC client. + async fn connect(url: Url, client: &quic::Client) -> anyhow::Result { + // Connect to the remote relay (DNS resolution happens inside connect) + let (session, _cid) = client.connect(&url, None).await?; let (session, subscriber) = moq_transport::session::Subscriber::connect(session).await?; - // Run the session - let mut session = session.run().boxed(); - let mut tasks = FuturesUnordered::new(); + let connected = Arc::new(AtomicBool::new(true)); + let cancel = CancellationToken::new(); - let mut done = None; + // Spawn a task to run the session + let session_url = url.clone(); + let session_connected = connected.clone(); + let session_cancel = cancel.clone(); - // Serve requested tracks - loop { + tokio::spawn(async move { tokio::select! { - track = self.next(), if done.is_none() => { - let track = match track { - Ok(Some(track)) => track, - Ok(None) => { done = Some(Ok(())); continue }, - Err(err) => { done = Some(Err(err)); continue }, - }; - - let info = track.info.clone(); - let mut subscriber = subscriber.clone(); - - tasks.push(async move { - if let Err(err) = subscriber.subscribe(track).await { - log::warn!("failed serving track: {:?}, error: {}", info, err); - } - }); - } - _ = tasks.next(), if !tasks.is_empty() => {}, - - // Keep running the session - res = &mut session, if !tasks.is_empty() || done.is_none() => return Ok(res?), - - else => return done.unwrap(), - } - } - } - - /// Block until the next track requested by a consumer. - async fn next(&self) -> anyhow::Result> { - loop { - let notify = { - let state = self.state.lock(); - - // Check if we have any requested tracks - if !state.requested.is_empty() { - return Ok(state - .into_mut() - .and_then(|mut state| state.requested.pop_front())); + result = session.run() => { + if let Err(err) = result { + log::warn!("remote session closed: {} - {}", session_url, err); + } else { + log::info!("remote session closed normally: {}", session_url); + } } - - match state.modified() { - Some(notified) => notified, - None => return Ok(None), + _ = session_cancel.cancelled() => { + log::info!("remote session cancelled: {}", session_url); } - }; + } + // Mark connection as dead + session_connected.store(false, Ordering::Release); + }); - notify.await - } + Ok(Self { + url, + subscriber, + tracks: Arc::new(Mutex::new(HashMap::new())), + connected, + cancel, + }) } -} -impl ops::Deref for RemoteProducer { - type Target = Remote; - - fn deref(&self) -> &Self::Target { - &self.info + /// Check if the connection is still alive. + pub fn is_connected(&self) -> bool { + self.connected.load(Ordering::Acquire) } -} - -#[derive(Clone)] -pub struct RemoteConsumer { - pub info: Arc, - state: State, -} -impl RemoteConsumer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } + /// Shutdown the remote connection. + pub fn shutdown(&self) { + self.cancel.cancel(); + self.connected.store(false, Ordering::Release); } - /// Request a track from the broadcast. - pub fn subscribe( + /// Subscribe to a track on this remote relay. + pub async fn subscribe( &self, - namespace: &TrackNamespace, - name: &str, - ) -> anyhow::Result> { - let key = (namespace.clone(), name.to_string()); - let state = self.state.lock(); - if let Some(track) = state.tracks.get(&key) { - if let Some(track) = track.upgrade() { - return Ok(Some(track)); - } + namespace: TrackNamespace, + track_name: String, + ) -> anyhow::Result> { + // Check connection state first + if !self.is_connected() { + return Err(anyhow::anyhow!( + "remote connection to {} is closed", + self.url + )); } - let mut state = match state.into_mut() { - Some(state) => state, - None => return Ok(None), - }; - - let (writer, reader) = Track::new(namespace.clone(), name.to_string()).produce(); - let reader = RemoteTrackReader::new(reader, self.state.clone()); + let key = (namespace.clone(), track_name.clone()); - // Insert the track into our Map so we deduplicate future requests. - state.tracks.insert(key, reader.downgrade()); - state.requested.push_back(writer); + // Hold lock for entire check-and-insert to prevent race conditions + let mut tracks = self.tracks.lock().await; - Ok(Some(reader)) - } -} - -impl ops::Deref for RemoteConsumer { - type Target = Remote; - - fn deref(&self) -> &Self::Target { - &self.info - } -} - -#[derive(Clone)] -pub struct RemoteTrackReader { - pub reader: TrackReader, - drop: Arc, -} - -impl RemoteTrackReader { - fn new(reader: TrackReader, parent: State) -> Self { - let drop = Arc::new(RemoteTrackDrop { - parent, - key: (reader.namespace.clone(), reader.name.clone()), - }); - - Self { reader, drop } - } - - fn downgrade(&self) -> RemoteTrackWeak { - RemoteTrackWeak { - reader: self.reader.clone(), - drop: Arc::downgrade(&self.drop), + // Check if we already have this track + if let Some(reader) = tracks.get(&key) { + return Ok(Some(reader.clone())); } - } -} -impl ops::Deref for RemoteTrackReader { - type Target = TrackReader; - - fn deref(&self) -> &Self::Target { - &self.reader - } -} - -impl ops::DerefMut for RemoteTrackReader { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.reader - } -} + // Create a new track and subscribe + let (writer, reader) = Track::new(namespace.clone(), track_name.clone()).produce(); + + // Insert BEFORE spawning to prevent race with removal in the spawned task + tracks.insert(key.clone(), reader.clone()); + + // Drop lock before spawning async task + drop(tracks); + + // Subscribe to the track on the remote + let mut subscriber = self.subscriber.clone(); + let track_key = key; + let tracks_clone = self.tracks.clone(); + let url = self.url.clone(); + + tokio::spawn(async move { + log::info!( + "subscribing to remote track: {} - {}/{}", + url, + track_key.0, + track_key.1 + ); + + if let Err(err) = subscriber.subscribe(writer).await { + log::warn!( + "failed subscribing to remote track: {} - {}/{} - {}", + url, + track_key.0, + track_key.1, + err + ); + // NOTE(itzmanish): should we assume the connection is bad? + // connected.store(false, Ordering::Release); + } -struct RemoteTrackWeak { - reader: TrackReader, - drop: Weak, -} + // Remove track from map when subscription ends + tracks_clone.lock().await.remove(&track_key); + log::debug!( + "remote track subscription ended: {} - {}/{}", + url, + track_key.0, + track_key.1 + ); + }); -impl RemoteTrackWeak { - fn upgrade(&self) -> Option { - Some(RemoteTrackReader { - reader: self.reader.clone(), - drop: self.drop.upgrade()?, - }) + Ok(Some(reader)) } } -struct RemoteTrackDrop { - parent: State, - key: (TrackNamespace, String), -} - -impl Drop for RemoteTrackDrop { - fn drop(&mut self) { - if let Some(mut parent) = self.parent.lock_mut() { - parent.tracks.remove(&self.key); - } +impl std::fmt::Debug for Remote { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Remote") + .field("url", &self.url.to_string()) + .field("connected", &self.is_connected()) + .finish() } } From cc4f63b754975427d06901a8ffe1c1bc51e6510d Mon Sep 17 00:00:00 2001 From: Scott Godin Date: Mon, 15 Dec 2025 16:47:17 -0500 Subject: [PATCH 2/6] If we receive a stream header with an unknown track alias, then wait for up to 1 second for SubscribeOk to arrive -remove panic if Fetch stream header --- moq-transport/src/session/mod.rs | 3 +- moq-transport/src/session/subscriber.rs | 83 +++++++++++++++++++++---- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index e5d9efb6..3b01dbd1 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -385,7 +385,8 @@ impl Session { subscriber .as_mut() .ok_or(SessionError::RoleViolation)? - .recv_datagram(datagram)?; + .recv_datagram(datagram) + .await?; } } } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index a9fa2e16..955a33cc 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -2,8 +2,11 @@ use std::{ collections::{hash_map, HashMap}, io, sync::{atomic, Arc, Mutex}, + time::Duration, }; +use tokio::sync::Notify; + use crate::{ coding::{Decode, TrackNamespace}, data, @@ -16,6 +19,9 @@ use crate::watch::Queue; use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeRecv}; +// Default timeout for waiting for subscribe aliases to become available via SUBSCRIBE_OK (1 second) +const DEFAULT_ALIAS_WAIT_TIME_MS: u64 = 1000; + // TODO remove Clone. #[derive(Clone)] pub struct Subscriber { @@ -31,6 +37,9 @@ pub struct Subscriber { /// Map of track alias to subscription id for quick lookup when receiving streams/datagrams. subscribe_alias_map: Arc>>, + /// Notify when subscribe alias map is updated + subscribe_alias_notify: Arc, + /// The queue we will write any outbound control messages we want to send, the session run_send task /// will process the queue and send the message on the control stream. outgoing: Queue, @@ -60,6 +69,7 @@ impl Subscriber { outgoing, next_requestid, mlog, + subscribe_alias_notify: Arc::new(Notify::new()), } } @@ -204,6 +214,9 @@ impl Subscriber { .unwrap() .insert(msg.track_alias, msg.id); + // Notify waiting tasks that the alias map has been updated + self.subscribe_alias_notify.notify_waiters(); + // Notify the subscribe of the successful subscription subscribe.ok(msg.track_alias)?; } @@ -258,13 +271,47 @@ impl Subscriber { self.announced.lock().unwrap().remove(namespace); } - /// Get a subscribe id by track alias. - fn get_subscribe_id_by_alias(&mut self, track_alias: u64) -> Option { - self.subscribe_alias_map - .lock() - .unwrap() - .get(&track_alias) - .cloned() + /// Get a subscribe id by track alias, waiting up to the specified timeout if not present. + /// If timeout_ms is None, only check if already present and return None if not. + async fn get_subscribe_id_by_alias( + &self, + track_alias: u64, + timeout_ms: Option, + ) -> Option { + // If no timeout specified, don't wait + let timeout_ms = match timeout_ms { + Some(ms) => ms, + None => { + // Just check once + return self + .subscribe_alias_map + .lock() + .unwrap() + .get(&track_alias) + .cloned(); + } + }; + + // Wait for it to appear, checking after each notification + let timeout_duration = Duration::from_millis(timeout_ms); + tokio::time::timeout(timeout_duration, async { + loop { + // Check first + if let Some(id) = self + .subscribe_alias_map + .lock() + .unwrap() + .get(&track_alias) + .cloned() + { + return id; + } + // Then wait for notification + self.subscribe_alias_notify.notified().await; + } + }) + .await + .ok() } /// Handle reception of a new stream from the QUIC session. @@ -282,6 +329,11 @@ impl Subscriber { stream_header.header_type ); + // No fetch support yet + if !stream_header.header_type.is_subgroup() { + return Err(SessionError::unimplemented("non-SUBGROUP stream types")); + } + // Log subgroup header parsed/received if let Some(ref subgroup_header) = stream_header.subgroup_header { if let Some(ref mlog) = self.mlog { @@ -294,7 +346,6 @@ impl Subscriber { } } - // No fetch support yet, so panic if fetch_header for now (via unwrap below) let track_alias = stream_header.subgroup_header.as_ref().unwrap().track_alias; log::trace!( "[SUBSCRIBER] recv_stream: stream for subscription track_alias={}", @@ -309,9 +360,9 @@ impl Subscriber { track_alias, err ); - // The writer is closed, so we should teriminate. + // The writer is closed, so we should terminate. // TODO it would be nice to do this immediately when the Writer is closed. - if let Some(subscribe_id) = self.get_subscribe_id_by_alias(track_alias) { + if let Some(subscribe_id) = self.get_subscribe_id_by_alias(track_alias, None).await { if let Some(subscribe) = self.remove_subscribe(subscribe_id) { subscribe.error(err.clone())?; } @@ -342,7 +393,10 @@ impl Subscriber { let writer = { // Look up the subscribe id for this track alias - if let Some(subscribe_id) = self.get_subscribe_id_by_alias(track_alias) { + if let Some(subscribe_id) = self + .get_subscribe_id_by_alias(track_alias, Some(DEFAULT_ALIAS_WAIT_TIME_MS)) + .await + { // Look up the subscribe by id let mut subscribes = self.subscribes.lock().unwrap(); let subscribe = subscribes.get_mut(&subscribe_id).ok_or_else(|| { @@ -577,7 +631,7 @@ impl Subscriber { } /// Handle reception of a datagram from the QUIC session. - pub fn recv_datagram(&mut self, datagram: bytes::Bytes) -> Result<(), SessionError> { + pub async fn recv_datagram(&mut self, datagram: bytes::Bytes) -> Result<(), SessionError> { let mut cursor = io::Cursor::new(datagram); let datagram = data::Datagram::decode(&mut cursor)?; @@ -627,7 +681,10 @@ impl Subscriber { } // Look up the subscribe id for this track alias - if let Some(subscribe_id) = self.get_subscribe_id_by_alias(datagram.track_alias) { + if let Some(subscribe_id) = self + .get_subscribe_id_by_alias(datagram.track_alias, Some(DEFAULT_ALIAS_WAIT_TIME_MS)) + .await + { // Look up the subscribe by id if let Some(subscribe) = self.subscribes.lock().unwrap().get_mut(&subscribe_id) { log::trace!( From 2fe94606e43842a7927c9c8e038f869e844b75e2 Mon Sep 17 00:00:00 2001 From: Scott Godin Date: Wed, 17 Dec 2025 14:40:17 -0500 Subject: [PATCH 3/6] - register for notification before checking map to avoid race --- moq-transport/src/session/subscriber.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 955a33cc..49c59e8d 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -296,7 +296,10 @@ impl Subscriber { let timeout_duration = Duration::from_millis(timeout_ms); tokio::time::timeout(timeout_duration, async { loop { - // Check first + // Register for notification before checking map + let notified = self.subscribe_alias_notify.notified(); + + // Check Map for alias if let Some(id) = self .subscribe_alias_map .lock() @@ -306,8 +309,9 @@ impl Subscriber { { return id; } - // Then wait for notification - self.subscribe_alias_notify.notified().await; + + // Alias not present yet, wait for notification + notified.await; } }) .await From 11d189196a23896eb25f80adb4ae964d2776a600 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Dec 2025 00:43:07 +0000 Subject: [PATCH 4/6] chore: release --- Cargo.lock | 14 +++---- moq-api/CHANGELOG.md | 6 +++ moq-api/Cargo.toml | 2 +- moq-clock-ietf/CHANGELOG.md | 12 ++++++ moq-clock-ietf/Cargo.toml | 6 +-- moq-native-ietf/CHANGELOG.md | 18 ++++++++ moq-native-ietf/Cargo.toml | 4 +- moq-pub/CHANGELOG.md | 9 ++++ moq-pub/Cargo.toml | 6 +-- moq-relay-ietf/CHANGELOG.md | 23 +++++++++++ moq-relay-ietf/Cargo.toml | 6 +-- moq-sub/CHANGELOG.md | 10 +++++ moq-sub/Cargo.toml | 6 +-- moq-transport/CHANGELOG.md | 80 ++++++++++++++++++++++++++++++++++++ moq-transport/Cargo.toml | 2 +- 15 files changed, 181 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfc98acc..a53c8b33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1203,7 +1203,7 @@ dependencies = [ [[package]] name = "moq-api" -version = "0.2.4" +version = "0.2.5" dependencies = [ "axum", "clap", @@ -1228,7 +1228,7 @@ dependencies = [ [[package]] name = "moq-clock-ietf" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "chrono", @@ -1245,7 +1245,7 @@ dependencies = [ [[package]] name = "moq-native-ietf" -version = "0.5.5" +version = "0.6.0" dependencies = [ "anyhow", "clap", @@ -1268,7 +1268,7 @@ dependencies = [ [[package]] name = "moq-pub" -version = "0.8.5" +version = "0.8.6" dependencies = [ "anyhow", "bytes", @@ -1289,7 +1289,7 @@ dependencies = [ [[package]] name = "moq-relay-ietf" -version = "0.7.5" +version = "0.7.6" dependencies = [ "anyhow", "async-trait", @@ -1318,7 +1318,7 @@ dependencies = [ [[package]] name = "moq-sub" -version = "0.3.4" +version = "0.4.0" dependencies = [ "anyhow", "clap", @@ -1337,7 +1337,7 @@ dependencies = [ [[package]] name = "moq-transport" -version = "0.11.0" +version = "0.12.0" dependencies = [ "bytes", "futures", diff --git a/moq-api/CHANGELOG.md b/moq-api/CHANGELOG.md index 391451bc..93bc6109 100644 --- a/moq-api/CHANGELOG.md +++ b/moq-api/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.5](https://github.com/cloudflare/moq-rs/compare/moq-api-v0.2.4...moq-api-v0.2.5) - 2025-12-18 + +### Other + +- Bump redis to fix deprecation warning during build + ## [0.2.4](https://github.com/englishm/moq-rs/compare/moq-api-v0.2.3...moq-api-v0.2.4) - 2025-09-15 ### Other diff --git a/moq-api/Cargo.toml b/moq-api/Cargo.toml index 7d4072c7..7ea2747a 100644 --- a/moq-api/Cargo.toml +++ b/moq-api/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.2.4" +version = "0.2.5" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/moq-clock-ietf/CHANGELOG.md b/moq-clock-ietf/CHANGELOG.md index 1d22f19a..9b285228 100644 --- a/moq-clock-ietf/CHANGELOG.md +++ b/moq-clock-ietf/CHANGELOG.md @@ -6,6 +6,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.6.6](https://github.com/cloudflare/moq-rs/compare/moq-clock-ietf-v0.6.5...moq-clock-ietf-v0.6.6) - 2025-12-18 + +### Other + +- Add extension header support to datagrams +- Fix Datagram Support +- Wire Up Track Status Handling +- moq-clock-ietf variable renames and comments added +- Print CID for clock sessions +- Add --qlog-dir CLI argument to QUIC configuration +- -clock demo - task out reception of new streams so we don't need to wait for previous stream to end + ## [0.6.5](https://github.com/englishm/moq-rs/compare/moq-clock-ietf-v0.6.4...moq-clock-ietf-v0.6.5) - 2025-09-15 ### Other diff --git a/moq-clock-ietf/Cargo.toml b/moq-clock-ietf/Cargo.toml index bc97649d..73a52b46 100644 --- a/moq-clock-ietf/Cargo.toml +++ b/moq-clock-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.5" +version = "0.6.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } -moq-transport = { path = "../moq-transport", version = "0.11" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-transport = { path = "../moq-transport", version = "0.12" } # QUIC url = "2" diff --git a/moq-native-ietf/CHANGELOG.md b/moq-native-ietf/CHANGELOG.md index d9420bee..faa28e8d 100644 --- a/moq-native-ietf/CHANGELOG.md +++ b/moq-native-ietf/CHANGELOG.md @@ -6,6 +6,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.6.0](https://github.com/cloudflare/moq-rs/compare/moq-native-ietf-v0.5.5...moq-native-ietf-v0.6.0) - 2025-12-18 + +### Other + +- Update moq-native-ietf/src/quic.rs +- Print CID for clock sessions +- cargo fmt +- Refactor mlog feature for better layering +- First pass of 'mlog' support +- Implement per-connection qlog file generation +- Thread qlog_dir and base_server_config to accept_session +- Store qlog_dir and base_server_config in Server struct +- Validate qlog directory exists at startup +- Add --qlog-dir CLI argument to QUIC configuration +- Enable qlog feature of quinn +- Log QUIC CIDs for accepted connections +- Use newer quinn + ## [0.5.5](https://github.com/englishm/moq-rs/compare/moq-native-ietf-v0.5.4...moq-native-ietf-v0.5.5) - 2025-09-15 ### Other diff --git a/moq-native-ietf/Cargo.toml b/moq-native-ietf/Cargo.toml index 7815678d..cdb22d35 100644 --- a/moq-native-ietf/Cargo.toml +++ b/moq-native-ietf/Cargo.toml @@ -5,14 +5,14 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.5.5" +version = "0.6.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] -moq-transport = { path = "../moq-transport", version = "0.11" } +moq-transport = { path = "../moq-transport", version = "0.12" } web-transport = { workspace = true } web-transport-quinn = "0.3" diff --git a/moq-pub/CHANGELOG.md b/moq-pub/CHANGELOG.md index 453dccce..5ececa26 100644 --- a/moq-pub/CHANGELOG.md +++ b/moq-pub/CHANGELOG.md @@ -6,6 +6,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.8.6](https://github.com/cloudflare/moq-rs/compare/moq-pub-v0.8.5...moq-pub-v0.8.6) - 2025-12-18 + +### Other + +- moq-transport variable renames and comments added +- Log CID +- Print CID for clock sessions +- Add --qlog-dir CLI argument to QUIC configuration + ## [0.8.5](https://github.com/englishm/moq-rs/compare/moq-pub-v0.8.4...moq-pub-v0.8.5) - 2025-09-15 ### Other diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index 23af605d..24198d1e 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Mike English", "Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.8.5" +version = "0.8.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } -moq-transport = { path = "../moq-transport", version = "0.11" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-transport = { path = "../moq-transport", version = "0.12" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-relay-ietf/CHANGELOG.md b/moq-relay-ietf/CHANGELOG.md index 65ace645..c0e14e1c 100644 --- a/moq-relay-ietf/CHANGELOG.md +++ b/moq-relay-ietf/CHANGELOG.md @@ -6,6 +6,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.6](https://github.com/cloudflare/moq-rs/compare/moq-relay-ietf-v0.7.5...moq-relay-ietf-v0.7.6) - 2025-12-18 + +### Other + +- Use correlation IDs in errors +- cargo fmt +- Add support for nested namespaces +- Revert "Add support for namespace hierachies" +- Address PR feedback +- cargo fmt +- Add support for namespace hierachies +- Wire Up Track Status Handling +- moq-relay-ietf variable renames and comments added +- Update moq-relay-ietf/src/relay.rs +- Print CID for clock sessions +- Add --mlog-serve +- Refactor mlog feature for better layering +- First pass of 'mlog' support +- Allow either CID or CID_server.qlog paths +- Add --qlog-serve +- Wire qlog_dir CLI argument through moq-relay-ietf +- Add --qlog-dir CLI argument to QUIC configuration + ## [0.7.5](https://github.com/englishm/moq-rs/compare/moq-relay-ietf-v0.7.4...moq-relay-ietf-v0.7.5) - 2025-09-15 ### Other diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index 773f0ba4..e8a4406f 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley", "Manish Kumar Pandit"] repository = "https://github.com/cloudflare/moq-rs" license = "MIT OR Apache-2.0" -version = "0.7.5" +version = "0.7.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -20,8 +20,8 @@ name = "moq-relay-ietf" path = "src/bin/moq-relay-ietf/main.rs" [dependencies] -moq-transport = { path = "../moq-transport", version = "0.11" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } +moq-transport = { path = "../moq-transport", version = "0.12" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } moq-api = { path = "../moq-api", version = "0.2" } web-transport = { workspace = true } diff --git a/moq-sub/CHANGELOG.md b/moq-sub/CHANGELOG.md index 685698a7..9eedc258 100644 --- a/moq-sub/CHANGELOG.md +++ b/moq-sub/CHANGELOG.md @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.4.0](https://github.com/cloudflare/moq-rs/compare/moq-sub-v0.3.4...moq-sub-v0.4.0) - 2025-12-18 + +### Other + +- Add support for nested namespaces +- Log CID +- Print CID for clock sessions +- Add --qlog-dir CLI argument to QUIC configuration +- Merge branch 'main' into sub-catalog + ## [0.3.4](https://github.com/englishm/moq-rs/compare/moq-sub-v0.3.3...moq-sub-v0.3.4) - 2025-09-15 ### Other diff --git a/moq-sub/Cargo.toml b/moq-sub/Cargo.toml index e87deba6..5fc3514c 100644 --- a/moq-sub/Cargo.toml +++ b/moq-sub/Cargo.toml @@ -5,7 +5,7 @@ authors = [] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.3.4" +version = "0.4.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,8 +14,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-transport = { path = "../moq-transport", version = "0.11" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } +moq-transport = { path = "../moq-transport", version = "0.12" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-transport/CHANGELOG.md b/moq-transport/CHANGELOG.md index 86b2fd8a..fb02df4b 100644 --- a/moq-transport/CHANGELOG.md +++ b/moq-transport/CHANGELOG.md @@ -6,6 +6,86 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.12.0](https://github.com/cloudflare/moq-rs/compare/moq-transport-v0.11.0...moq-transport-v0.12.0) - 2025-12-18 + +### Fixed + +- fix issues noticed by CoPilot + +### Other + +- - register for notification before checking map to avoid race +- If we receive a stream header with an unknown track alias, then wait for up to 1 second for SubscribeOk to arrive +- cargo fmt +- Improve error expressivity and safety +- Address PR feedback +- Use correlation IDs in errors +- Add error correlation ids +- Align error codes with draft-14 +- Consistently handle unimplemented features +- Add extension header support to datagrams +- Add support for Immutable Extension headers +- Add logging for immutable extension headers +- Fix last commit +- Fixup KVPs Parsing +- Merge pull request #108 from cloudflare/me/more-kvp-parsing-logging +- Merge pull request #102 from sgodin/datagram-logging +- -fix typo in error definition +- - enable trace level logging on fly +- cargo fmt +- Add support for nested namespaces +- Revert "Add support for namespace hierachies" +- Address PR feedback +- cargo fmt +- Add support for namespace hierachies +- Fix Datagram Support +- Wire Up Track Status Handling +- cargo fmt +- Add separators when printing multiple kv pairs +- Print max 16 bytes of BytesValues when debugging +- Cleanup mlog serialization +- Appease Copilot with more comments +- Improve err handling of push_and_wait_until_popped +- Fix comment typo +- Fix comment typo +- Fixup Subscribe Alias Handling +- -fix spelling errors found by CoPilot +- moq-transport variable renames and comments added +- Use FilterType::LargestObject for subscribe +- Fix param types to match draft-14 +- cargo fmt +- Add MoQT qlog events and TODOs for remainder +- cargo fmt +- cargo clippy --fix +- Add more qlog logging to 'mlog' session logs +- Add qlog events for generic logs +- Add some events for subgroup headers and objects +- Add more moqt qlog events +- Emit subscribe and subscribe_ok moqt qlog events +- Add more moqt qlog events +- Refactor mlog feature for better layering +- cargo fmt +- First pass of 'mlog' support +- Initial mlog scaffolding +- Add/bump serde for mlog in moq-transport +- Merge pull request #78 from sgodin/moq-interim-updates-2 +- cargo fmt +- Fix lint nit +- Fix lint nit +- Add extra logging +- cargo fmt +- - send track_alias in SubscribeOk to match what is sent in the stream header +- cargo fmt +- Appease linter +- -clock demo - task out reception of new streams so we don't need to wait for previous stream to end +- Tidy versions test fixture +- Tidy track namespace test fixture +- Tidy tuple test fixture +- Setup message test formatting +- Fix comment placement in Location test +- Fix comment placement in KeyValuePair tests +- VarInt tests - use binary literals for readability + ## [0.11.0](https://github.com/englishm/moq-rs/compare/moq-transport-v0.10.0...moq-transport-v0.11.0) - 2025-09-15 ### Other diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 86756455..b499dad6 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.11.0" +version = "0.12.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] From b3a5c29871c79d7e5215fbdb8a745bd9166567c8 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Dec 2025 18:34:58 +0000 Subject: [PATCH 5/6] chore: release --- Cargo.lock | 14 +++++++------- moq-api/CHANGELOG.md | 6 ++++++ moq-api/Cargo.toml | 2 +- moq-clock-ietf/CHANGELOG.md | 6 ++++++ moq-clock-ietf/Cargo.toml | 4 ++-- moq-native-ietf/CHANGELOG.md | 6 ++++++ moq-native-ietf/Cargo.toml | 2 +- moq-pub/CHANGELOG.md | 6 ++++++ moq-pub/Cargo.toml | 4 ++-- moq-relay-ietf/CHANGELOG.md | 28 ++++++++++++++++++++++++++++ moq-relay-ietf/Cargo.toml | 4 ++-- moq-sub/CHANGELOG.md | 6 ++++++ moq-sub/Cargo.toml | 4 ++-- moq-transport/CHANGELOG.md | 6 ++++++ moq-transport/Cargo.toml | 2 +- 15 files changed, 82 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a53c8b33..e45c4da1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1203,7 +1203,7 @@ dependencies = [ [[package]] name = "moq-api" -version = "0.2.5" +version = "0.2.6" dependencies = [ "axum", "clap", @@ -1228,7 +1228,7 @@ dependencies = [ [[package]] name = "moq-clock-ietf" -version = "0.6.6" +version = "0.6.7" dependencies = [ "anyhow", "chrono", @@ -1245,7 +1245,7 @@ dependencies = [ [[package]] name = "moq-native-ietf" -version = "0.6.0" +version = "0.7.0" dependencies = [ "anyhow", "clap", @@ -1268,7 +1268,7 @@ dependencies = [ [[package]] name = "moq-pub" -version = "0.8.6" +version = "0.8.7" dependencies = [ "anyhow", "bytes", @@ -1289,7 +1289,7 @@ dependencies = [ [[package]] name = "moq-relay-ietf" -version = "0.7.6" +version = "0.7.7" dependencies = [ "anyhow", "async-trait", @@ -1318,7 +1318,7 @@ dependencies = [ [[package]] name = "moq-sub" -version = "0.4.0" +version = "0.4.1" dependencies = [ "anyhow", "clap", @@ -1337,7 +1337,7 @@ dependencies = [ [[package]] name = "moq-transport" -version = "0.12.0" +version = "0.12.1" dependencies = [ "bytes", "futures", diff --git a/moq-api/CHANGELOG.md b/moq-api/CHANGELOG.md index 93bc6109..d4ba5ede 100644 --- a/moq-api/CHANGELOG.md +++ b/moq-api/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.6](https://github.com/cloudflare/moq-rs/compare/moq-api-v0.2.5...moq-api-v0.2.6) - 2025-12-18 + +### Other + +- update Cargo.lock dependencies + ## [0.2.5](https://github.com/cloudflare/moq-rs/compare/moq-api-v0.2.4...moq-api-v0.2.5) - 2025-12-18 ### Other diff --git a/moq-api/Cargo.toml b/moq-api/Cargo.toml index 7ea2747a..0e3a9a24 100644 --- a/moq-api/Cargo.toml +++ b/moq-api/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.2.5" +version = "0.2.6" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/moq-clock-ietf/CHANGELOG.md b/moq-clock-ietf/CHANGELOG.md index 9b285228..ea65e0cf 100644 --- a/moq-clock-ietf/CHANGELOG.md +++ b/moq-clock-ietf/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.6.7](https://github.com/cloudflare/moq-rs/compare/moq-clock-ietf-v0.6.6...moq-clock-ietf-v0.6.7) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.6.6](https://github.com/cloudflare/moq-rs/compare/moq-clock-ietf-v0.6.5...moq-clock-ietf-v0.6.6) - 2025-12-18 ### Other diff --git a/moq-clock-ietf/Cargo.toml b/moq-clock-ietf/Cargo.toml index 73a52b46..9614f43a 100644 --- a/moq-clock-ietf/Cargo.toml +++ b/moq-clock-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.6" +version = "0.6.7" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,7 +14,7 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-transport = { path = "../moq-transport", version = "0.12" } # QUIC diff --git a/moq-native-ietf/CHANGELOG.md b/moq-native-ietf/CHANGELOG.md index faa28e8d..1e3773dd 100644 --- a/moq-native-ietf/CHANGELOG.md +++ b/moq-native-ietf/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.0](https://github.com/cloudflare/moq-rs/compare/moq-native-ietf-v0.6.0...moq-native-ietf-v0.7.0) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.6.0](https://github.com/cloudflare/moq-rs/compare/moq-native-ietf-v0.5.5...moq-native-ietf-v0.6.0) - 2025-12-18 ### Other diff --git a/moq-native-ietf/Cargo.toml b/moq-native-ietf/Cargo.toml index cdb22d35..39968516 100644 --- a/moq-native-ietf/Cargo.toml +++ b/moq-native-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.6.0" +version = "0.7.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/moq-pub/CHANGELOG.md b/moq-pub/CHANGELOG.md index 5ececa26..4dbd3e9f 100644 --- a/moq-pub/CHANGELOG.md +++ b/moq-pub/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.8.7](https://github.com/cloudflare/moq-rs/compare/moq-pub-v0.8.6...moq-pub-v0.8.7) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.8.6](https://github.com/cloudflare/moq-rs/compare/moq-pub-v0.8.5...moq-pub-v0.8.6) - 2025-12-18 ### Other diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index 24198d1e..cac56cc0 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Mike English", "Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.8.6" +version = "0.8.7" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -14,7 +14,7 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-transport = { path = "../moq-transport", version = "0.12" } moq-catalog = { path = "../moq-catalog", version = "0.2" } diff --git a/moq-relay-ietf/CHANGELOG.md b/moq-relay-ietf/CHANGELOG.md index c0e14e1c..e1afb5fe 100644 --- a/moq-relay-ietf/CHANGELOG.md +++ b/moq-relay-ietf/CHANGELOG.md @@ -6,6 +6,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.7](https://github.com/cloudflare/moq-rs/compare/moq-relay-ietf-v0.7.6...moq-relay-ietf-v0.7.7) - 2025-12-18 + +### Added + +- add file-based coordinator and rewrote remote for handling remote streams + +### Fixed + +- ci +- linter +- seperate RemoteManager rewrite to different PR +- remove once_cell to pass the test +- clippy unused imports +- clippy warnings +- race and proper task shutdown +- if host is IpAddr construct socket addr else resolve dns +- update lookup signature to return owned Client instead of reference +- prevent file truncation when opening for read/write in FileCoordinator +- add lifetime parameter to lookup method signature for proper borrow checking +- return clients on lookup for coordinator and misc fix + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay +- remove track registration from coordinator interface and file implementation +- clarify coordinator file usage in CLI help text and add FIXME for unregister_namespace +- restructure relay into lib/bin and add coordinator interface + ## [0.7.6](https://github.com/cloudflare/moq-rs/compare/moq-relay-ietf-v0.7.5...moq-relay-ietf-v0.7.6) - 2025-12-18 ### Other diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index e8a4406f..bfd0edcd 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley", "Manish Kumar Pandit"] repository = "https://github.com/cloudflare/moq-rs" license = "MIT OR Apache-2.0" -version = "0.7.6" +version = "0.7.7" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -21,7 +21,7 @@ path = "src/bin/moq-relay-ietf/main.rs" [dependencies] moq-transport = { path = "../moq-transport", version = "0.12" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-api = { path = "../moq-api", version = "0.2" } web-transport = { workspace = true } diff --git a/moq-sub/CHANGELOG.md b/moq-sub/CHANGELOG.md index 9eedc258..960e76e7 100644 --- a/moq-sub/CHANGELOG.md +++ b/moq-sub/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.4.1](https://github.com/cloudflare/moq-rs/compare/moq-sub-v0.4.0...moq-sub-v0.4.1) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.4.0](https://github.com/cloudflare/moq-rs/compare/moq-sub-v0.3.4...moq-sub-v0.4.0) - 2025-12-18 ### Other diff --git a/moq-sub/Cargo.toml b/moq-sub/Cargo.toml index 5fc3514c..cb9f7d71 100644 --- a/moq-sub/Cargo.toml +++ b/moq-sub/Cargo.toml @@ -5,7 +5,7 @@ authors = [] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.4.0" +version = "0.4.1" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] @@ -15,7 +15,7 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transport = { path = "../moq-transport", version = "0.12" } -moq-native-ietf = { path = "../moq-native-ietf", version = "0.6" } +moq-native-ietf = { path = "../moq-native-ietf", version = "0.7" } moq-catalog = { path = "../moq-catalog", version = "0.2" } url = "2" diff --git a/moq-transport/CHANGELOG.md b/moq-transport/CHANGELOG.md index fb02df4b..91606bad 100644 --- a/moq-transport/CHANGELOG.md +++ b/moq-transport/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.12.1](https://github.com/cloudflare/moq-rs/compare/moq-transport-v0.12.0...moq-transport-v0.12.1) - 2025-12-18 + +### Other + +- Merge pull request #118 from itzmanish/feat/multi-relay + ## [0.12.0](https://github.com/cloudflare/moq-rs/compare/moq-transport-v0.11.0...moq-transport-v0.12.0) - 2025-12-18 ### Fixed diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index b499dad6..9685a2a1 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/englishm/moq-rs" license = "MIT OR Apache-2.0" -version = "0.12.0" +version = "0.12.1" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] From 8dcd496ad56927d4e6966e6d08fc637e121c38f4 Mon Sep 17 00:00:00 2001 From: Manish Date: Fri, 19 Dec 2025 17:20:04 +0530 Subject: [PATCH 6/6] add Mike in authors list --- moq-relay-ietf/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index bfd0edcd..771d2d73 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "moq-relay-ietf" description = "Media over QUIC" -authors = ["Luke Curley", "Manish Kumar Pandit"] +authors = ["Luke Curley", "Manish Kumar Pandit", "Mike English"] repository = "https://github.com/cloudflare/moq-rs" license = "MIT OR Apache-2.0"