diff --git a/.idea/workspace.xml b/.idea/workspace.xml
new file mode 100644
index 00000000..359cff88
--- /dev/null
+++ b/.idea/workspace.xml
@@ -0,0 +1,237 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {
+ "associatedIndex": 6
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1772062719090
+
+
+ 1772062719090
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 8de6eb4c..ec77f290 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -746,7 +746,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http",
- "indexmap 2.2.6",
+ "indexmap 2.13.0",
"slab",
"tokio",
"tokio-util",
@@ -761,9 +761,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
-version = "0.14.5"
+version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
+checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]]
name = "heck"
@@ -972,13 +972,14 @@ dependencies = [
[[package]]
name = "indexmap"
-version = "2.2.6"
+version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26"
+checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017"
dependencies = [
"equivalent",
- "hashbrown 0.14.5",
+ "hashbrown 0.16.1",
"serde",
+ "serde_core",
]
[[package]]
@@ -1041,9 +1042,9 @@ dependencies = [
[[package]]
name = "js-sys"
-version = "0.3.81"
+version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec48937a97411dcb524a265206ccd4c90bb711fca92b2792c407f268825b9305"
+checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c"
dependencies = [
"once_cell",
"wasm-bindgen",
@@ -1188,6 +1189,7 @@ dependencies = [
"chrono",
"clap",
"env_logger",
+ "futures",
"log",
"moq-native-ietf",
"moq-transport",
@@ -1228,6 +1230,7 @@ dependencies = [
"bytes",
"clap",
"env_logger",
+ "futures",
"log",
"moq-catalog",
"moq-native-ietf",
@@ -1580,6 +1583,7 @@ version = "0.11.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31"
dependencies = [
+ "aws-lc-rs",
"bytes",
"fastbloom",
"getrandom 0.3.3",
@@ -2125,7 +2129,7 @@ version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
dependencies = [
- "indexmap 2.2.6",
+ "indexmap 2.13.0",
"itoa",
"memchr",
"ryu",
@@ -2165,7 +2169,7 @@ dependencies = [
"chrono",
"hex",
"indexmap 1.9.3",
- "indexmap 2.2.6",
+ "indexmap 2.13.0",
"schemars 0.9.0",
"schemars 1.0.4",
"serde",
@@ -2187,6 +2191,17 @@ dependencies = [
"syn",
]
+[[package]]
+name = "sfv"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0d471eaefb14f4b30032525bdb124b36e55ba9cb1292080e06f1a236cd10fe87"
+dependencies = [
+ "base64",
+ "indexmap 2.13.0",
+ "ref-cast",
+]
+
[[package]]
name = "sha1_smol"
version = "1.0.0"
@@ -2670,9 +2685,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen"
-version = "0.2.104"
+version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c1da10c01ae9f1ae40cbfac0bac3b1e724b320abfcf52229f80b547c0d250e2d"
+checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e"
dependencies = [
"cfg-if",
"once_cell",
@@ -2681,20 +2696,6 @@ dependencies = [
"wasm-bindgen-shared",
]
-[[package]]
-name = "wasm-bindgen-backend"
-version = "0.2.104"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "671c9a5a66f49d8a47345ab942e2cb93c7d1d0339065d4f8139c486121b43b19"
-dependencies = [
- "bumpalo",
- "log",
- "proc-macro2",
- "quote",
- "syn",
- "wasm-bindgen-shared",
-]
-
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.42"
@@ -2709,9 +2710,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
-version = "0.2.104"
+version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ca60477e4c59f5f2986c50191cd972e3a50d8a95603bc9434501cf156a9a119"
+checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -2719,31 +2720,43 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
-version = "0.2.104"
+version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7"
+checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3"
dependencies = [
+ "bumpalo",
"proc-macro2",
"quote",
"syn",
- "wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
-version = "0.2.104"
+version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1"
+checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16"
dependencies = [
"unicode-ident",
]
+[[package]]
+name = "web-streams"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "48465a648c14f53f6d8319b95bc336a44627f6aa6bd94270463777af8ed65deb"
+dependencies = [
+ "thiserror 2.0.17",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
[[package]]
name = "web-sys"
-version = "0.3.69"
+version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef"
+checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -2761,56 +2774,73 @@ dependencies = [
[[package]]
name = "web-transport"
-version = "0.3.0"
+version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5793aee9b4cf993212042c6b1656d877de9ad32b9eb3281d7bc95f4dce3f6591"
+checksum = "23c3f78eca5afa10eb7b8ab64b4e5e521a006f0cbd88de09e44d55ef37e8855a"
dependencies = [
"bytes",
- "thiserror 1.0.61",
+ "thiserror 2.0.17",
+ "url",
"web-transport-quinn",
"web-transport-wasm",
]
[[package]]
name = "web-transport-proto"
-version = "0.2.0"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "df0922f754c890ceb9741c00a0f5c730aaa4b52fe8772934a0ad19a03daee0ca"
+checksum = "0225d295c8ac00a2e9a498aefeaf3f3c6186da12a251c938189b15b82ea22808"
dependencies = [
"bytes",
"http",
- "thiserror 1.0.61",
+ "sfv",
+ "thiserror 2.0.17",
+ "tokio",
"url",
]
[[package]]
name = "web-transport-quinn"
-version = "0.3.0"
+version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d248fb83873166e1fce7e91370deb15bd5213cf4352242e32ccd4abc8aeb2cef"
+checksum = "82e77c81fe4cf56c1049e07c6ed9c00862a967010fe9da4f5e02dc7f4d71fdac"
dependencies = [
"bytes",
"futures",
"http",
- "log",
"quinn",
- "quinn-proto",
- "thiserror 1.0.61",
+ "rustls 0.23.31",
+ "rustls-native-certs 0.8.1",
+ "thiserror 2.0.17",
"tokio",
+ "tracing",
"url",
"web-transport-proto",
+ "web-transport-trait",
+]
+
+[[package]]
+name = "web-transport-trait"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb67841c4a481ca3c1412ee4c9f463987401991e1ddc000903df2124f3dc85e9"
+dependencies = [
+ "bytes",
]
[[package]]
name = "web-transport-wasm"
-version = "0.1.0"
+version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "64be28348e18cb1f44e4c8733dc2bd9520d782be840b2b978724dfd1b1bdefa3"
+checksum = "6816176def6e8df1636c8fc2c401f37add41ccad1518705e209d9a7ada3d144c"
dependencies = [
"bytes",
"js-sys",
+ "thiserror 2.0.17",
+ "url",
"wasm-bindgen",
"wasm-bindgen-futures",
+ "web-streams",
"web-sys",
]
diff --git a/Cargo.toml b/Cargo.toml
index c903feaa..93663988 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@ members = [
resolver = "2"
[workspace.dependencies]
-web-transport = "0.3"
+web-transport = "0.10"
env_logger = "0.11"
log = { version = "0.4", features = ["std"] }
diff --git a/moq-clock-ietf/Cargo.toml b/moq-clock-ietf/Cargo.toml
index b717051b..854687a7 100644
--- a/moq-clock-ietf/Cargo.toml
+++ b/moq-clock-ietf/Cargo.toml
@@ -22,6 +22,7 @@ url = "2"
# Async stuff
tokio = { version = "1", features = ["full"] }
+futures = "0.3"
# CLI, logging, error handling
clap = { version = "4", features = ["derive"] }
diff --git a/moq-clock-ietf/src/clock.rs b/moq-clock-ietf/src/clock.rs
index a96863ee..ada2bbd6 100644
--- a/moq-clock-ietf/src/clock.rs
+++ b/moq-clock-ietf/src/clock.rs
@@ -45,6 +45,7 @@ impl Publisher {
group_id: next_group_id as u64,
subgroup_id: 0,
priority: 0,
+ header_type: None,
})
.context("failed to create minute segment")?;
@@ -66,6 +67,7 @@ impl Publisher {
priority: 127,
payload: time_str.clone().into_bytes().into(),
extension_headers: Default::default(),
+ status: None,
})
.context("failed to write datagram")?;
diff --git a/moq-clock-ietf/src/main.rs b/moq-clock-ietf/src/main.rs
index 7d0f6951..eab274b7 100644
--- a/moq-clock-ietf/src/main.rs
+++ b/moq-clock-ietf/src/main.rs
@@ -1,6 +1,7 @@
use moq_native_ietf::quic;
use anyhow::Context;
+use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
mod cli;
mod clock;
@@ -10,11 +11,36 @@ use cli::Cli;
use moq_transport::{
coding::TrackNamespace,
- serve,
- session::{Publisher, Subscriber},
+ serve::{self, TracksReader},
+ session::{Publisher, SessionError, Subscriber},
};
-/// The main entry point for the MoQ Clock IETF example.
+async fn serve_subscriptions(
+ mut publisher: Publisher,
+ tracks: TracksReader,
+) -> Result<(), SessionError> {
+ let mut tasks: FuturesUnordered> =
+ FuturesUnordered::new();
+
+ loop {
+ tokio::select! {
+ Some(subscribed) = publisher.subscribed() => {
+ let info = subscribed.info.clone();
+ let tracks = tracks.clone();
+ log::info!("serving subscribe: {:?}", info);
+
+ tasks.push(async move {
+ if let Err(err) = Publisher::serve_subscribe(subscribed, tracks).await {
+ log::warn!("failed serving subscribe: {:?}, error: {}", info, err);
+ }
+ }.boxed());
+ }
+ _ = tasks.next(), if !tasks.is_empty() => {}
+ else => return Ok(()),
+ }
+ }
+}
+
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
@@ -59,10 +85,16 @@ async fn main() -> anyhow::Result<()> {
let track_writer = tracks_writer.create(&config.track).unwrap();
let clock_publisher = clock::Publisher::new_datagram(track_writer.datagrams()?);
+ let publish_ns = publisher
+ .publish_namespace(tracks_reader.namespace.clone())
+ .await
+ .context("failed to register namespace")?;
+
tokio::select! {
res = session.run() => res.context("session error")?,
res = clock_publisher.run() => res.context("clock error")?,
- res = publisher.announce(tracks_reader) => res.context("failed to serve tracks")?,
+ res = serve_subscriptions(publisher, tracks_reader) => res.context("failed to serve tracks")?,
+ res = publish_ns.closed() => res.context("namespace closed")?,
}
} else {
log::info!("publishing clock via streams");
@@ -75,10 +107,16 @@ async fn main() -> anyhow::Result<()> {
let track_writer = tracks_writer.create(&config.track).unwrap();
let clock_publisher = clock::Publisher::new(track_writer.subgroups()?);
+ let publish_ns = publisher
+ .publish_namespace(tracks_reader.namespace.clone())
+ .await
+ .context("failed to register namespace")?;
+
tokio::select! {
res = session.run() => res.context("session error")?,
res = clock_publisher.run() => res.context("clock error")?,
- res = publisher.announce(tracks_reader) => res.context("failed to serve tracks")?,
+ res = serve_subscriptions(publisher, tracks_reader) => res.context("failed to serve tracks")?,
+ res = publish_ns.closed() => res.context("namespace closed")?,
}
}
} else {
diff --git a/moq-native-ietf/Cargo.toml b/moq-native-ietf/Cargo.toml
index adc5b302..41eeb1c9 100644
--- a/moq-native-ietf/Cargo.toml
+++ b/moq-native-ietf/Cargo.toml
@@ -14,7 +14,7 @@ categories = ["multimedia", "network-programming", "web-programming"]
[dependencies]
moq-transport = { path = "../moq-transport", version = "0.12" }
web-transport = { workspace = true }
-web-transport-quinn = "0.3"
+web-transport-quinn = { version = "0.11", default-features = false, features = ["ring"] }
rustls = { version = "0.23", features = ["ring"] }
rustls-pemfile = "2"
diff --git a/moq-native-ietf/src/quic.rs b/moq-native-ietf/src/quic.rs
index bed04d83..e5d6ac8d 100644
--- a/moq-native-ietf/src/quic.rs
+++ b/moq-native-ietf/src/quic.rs
@@ -160,7 +160,7 @@ impl Endpoint {
if let Some(mut config) = config.tls.server {
config.alpn_protocols = vec![
- web_transport_quinn::ALPN.to_vec(),
+ web_transport_quinn::ALPN.as_bytes().to_vec(),
moq_transport::setup::ALPN.to_vec(),
];
config.key_log = Arc::new(rustls::KeyLogFile::new());
@@ -305,22 +305,24 @@ impl Server {
server_name,
);
- let session = match alpn.as_bytes() {
- web_transport_quinn::ALPN => {
- // Wait for the CONNECT request.
- let request = web_transport_quinn::accept(conn)
- .await
- .context("failed to receive WebTransport request")?;
-
- // Accept the CONNECT request.
- request
- .ok()
- .await
- .context("failed to respond to WebTransport request")?
- }
- // A bit of a hack to pretend like we're a WebTransport session
- moq_transport::setup::ALPN => conn.into(),
- _ => anyhow::bail!("unsupported ALPN: {}", alpn),
+ let alpn_bytes = alpn.as_bytes();
+ let session = if alpn_bytes == web_transport_quinn::ALPN.as_bytes() {
+ // Wait for the WebTransport CONNECT request (includes H3 SETTINGS exchange).
+ let request = web_transport_quinn::Request::accept(conn)
+ .await
+ .context("failed to receive WebTransport request")?;
+
+ // Accept the CONNECT request.
+ request
+ .ok()
+ .await
+ .context("failed to respond to WebTransport request")?
+ } else if alpn_bytes == moq_transport::setup::ALPN {
+ // Raw QUIC mode — create a session with no H3 framing.
+ let request = url::Url::parse("moqt://localhost").unwrap();
+ web_transport_quinn::Session::raw(conn, request, web_transport_quinn::proto::ConnectResponse::default())
+ } else {
+ anyhow::bail!("unsupported ALPN: {}", alpn)
};
Ok((session.into(), connection_id_hex))
@@ -373,7 +375,7 @@ impl Client {
// TODO support connecting to both ALPNs at the same time
config.alpn_protocols = vec![match url.scheme() {
- "https" => web_transport_quinn::ALPN.to_vec(),
+ "https" => web_transport_quinn::ALPN.as_bytes().to_vec(),
"moqt" => moq_transport::setup::ALPN.to_vec(),
_ => anyhow::bail!("url scheme must be 'https' or 'moqt'"),
}];
@@ -426,8 +428,15 @@ impl Client {
.to_string();
let session = match url.scheme() {
- "https" => web_transport_quinn::connect_with(connection, url).await?,
- "moqt" => connection.into(),
+ "https" => {
+ // Build a ConnectRequest with the MoQT version as the WebTransport subprotocol.
+ // Per draft-15+, version negotiation uses ALPN (raw QUIC) or
+ // wt-available-protocols (WebTransport) instead of CLIENT_SETUP versions.
+ let request = web_transport_quinn::proto::ConnectRequest::new(url.clone())
+ .with_protocol(std::str::from_utf8(moq_transport::setup::ALPN).unwrap());
+ web_transport_quinn::Session::connect(connection, request).await?
+ }
+ "moqt" => web_transport_quinn::Session::raw(connection, url.clone(), web_transport_quinn::proto::ConnectResponse::default()),
_ => unreachable!(),
};
diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml
index 13085ca3..d37ebbaf 100644
--- a/moq-pub/Cargo.toml
+++ b/moq-pub/Cargo.toml
@@ -23,6 +23,7 @@ bytes = "1"
# Async stuff
tokio = { version = "1", features = ["full"] }
+futures = "0.3"
# CLI, logging, error handling
clap = { version = "4", features = ["derive"] }
diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs
index cd9350fc..c259b440 100644
--- a/moq-pub/src/main.rs
+++ b/moq-pub/src/main.rs
@@ -4,11 +4,16 @@ use url::Url;
use anyhow::Context;
use clap::Parser;
+use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::io::AsyncReadExt;
use moq_native_ietf::quic;
use moq_pub::Media;
-use moq_transport::{coding::TrackNamespace, serve, session::Publisher};
+use moq_transport::{
+ coding::TrackNamespace,
+ serve::{self, TracksReader},
+ session::{Publisher, SessionError},
+};
#[derive(Parser, Clone)]
pub struct Cli {
@@ -39,6 +44,32 @@ pub struct Cli {
pub tls: moq_native_ietf::tls::Args,
}
+async fn serve_subscriptions(
+ mut publisher: Publisher,
+ tracks: TracksReader,
+) -> Result<(), SessionError> {
+ let mut tasks: FuturesUnordered> =
+ FuturesUnordered::new();
+
+ loop {
+ tokio::select! {
+ Some(subscribed) = publisher.subscribed() => {
+ let info = subscribed.info.clone();
+ let tracks = tracks.clone();
+ log::info!("serving subscribe: {:?}", info);
+
+ tasks.push(async move {
+ if let Err(err) = Publisher::serve_subscribe(subscribed, tracks).await {
+ log::warn!("failed serving subscribe: {:?}, error: {}", info, err);
+ }
+ }.boxed());
+ }
+ _ = tasks.next(), if !tasks.is_empty() => {}
+ else => return Ok(()),
+ }
+ }
+}
+
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
@@ -71,16 +102,25 @@ async fn main() -> anyhow::Result<()> {
connection_id
);
- let (session, mut publisher) = Publisher::connect(session)
+ let (session, publisher) = Publisher::connect(session)
.await
.context("failed to create MoQ Transport publisher")?;
+ let namespace = reader.namespace.clone();
+
+ let publish_ns = publisher
+ .clone()
+ .publish_namespace(namespace)
+ .await
+ .context("failed to register namespace")?;
+
+ log::info!("namespace registered, starting media and subscription handling");
+
tokio::select! {
res = session.run() => res.context("session error")?,
- res = run_media(media) => {
- res.context("media error")?
- },
- res = publisher.announce(reader) => res.context("publisher error")?,
+ res = run_media(media) => res.context("media error")?,
+ res = serve_subscriptions(publisher, reader) => res.context("publisher error")?,
+ res = publish_ns.closed() => res.context("publisher error")?,
}
Ok(())
diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs
index b46f5473..f1952781 100644
--- a/moq-pub/src/media.rs
+++ b/moq-pub/src/media.rs
@@ -384,7 +384,12 @@ impl Track {
}
pub fn end_group(&mut self) {
- self.current = None;
+ // Send EndOfGroup marker before dropping the writer
+ if let Some(mut current) = self.current.take() {
+ if let Err(e) = current.end_of_group() {
+ log::warn!("failed to send EndOfGroup marker: {}", e);
+ }
+ }
}
}
diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs
index 8d636912..d0641740 100644
--- a/moq-relay-ietf/src/consumer.rs
+++ b/moq-relay-ietf/src/consumer.rs
@@ -3,11 +3,13 @@ use std::sync::Arc;
use anyhow::Context;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_transport::{
- serve::Tracks,
- session::{Announced, SessionError, Subscriber},
+ coding::KeyValuePairs,
+ message::PublishOk,
+ serve::{ServeError, Tracks},
+ session::{PublishNamespaceReceived, PublishReceived, SessionError, Subscriber},
};
-use crate::{Coordinator, Locals, Producer};
+use crate::{Coordinator, Locals, Producer, SubscriberRegistry};
/// Consumer of tracks from a remote Publisher
#[derive(Clone)]
@@ -16,6 +18,8 @@ pub struct Consumer {
locals: Locals,
coordinator: Arc,
forward: Option, // Forward all announcements to this subscriber
+ subscriber_registry: Option,
+ session_id: u64,
}
impl Consumer {
@@ -30,28 +34,63 @@ impl Consumer {
locals,
coordinator,
forward,
+ subscriber_registry: None,
+ session_id: 0,
}
}
- /// Run the consumer to serve announce requests.
- pub async fn run(mut self) -> Result<(), SessionError> {
- let mut tasks = FuturesUnordered::new();
+ /// Creates a consumer with a subscriber registry for PUBLISH notifications.
+ pub fn with_registry(
+ subscriber: Subscriber,
+ locals: Locals,
+ coordinator: Arc,
+ forward: Option,
+ subscriber_registry: SubscriberRegistry,
+ session_id: u64,
+ ) -> Self {
+ Self {
+ subscriber,
+ locals,
+ coordinator,
+ forward,
+ subscriber_registry: Some(subscriber_registry),
+ session_id,
+ }
+ }
+
+ /// Run the consumer to serve announce requests and track-level publish messages.
+ pub async fn run(self) -> Result<(), SessionError> {
+ let mut tasks: FuturesUnordered> =
+ FuturesUnordered::new();
loop {
+ let mut subscriber_ns = self.subscriber.clone();
+ let mut subscriber_publish = self.subscriber.clone();
+
tokio::select! {
- // Handle a new announce request
- Some(announce) = self.subscriber.announced() => {
+ Some(publish_ns) = subscriber_ns.publish_ns_recvd() => {
let this = self.clone();
tasks.push(async move {
- let info = announce.clone();
- log::info!("serving announce: {:?}", info);
+ let info = publish_ns.clone();
+ log::info!("serving publish_namespace: {:?}", info);
- // Serve the announce request
- if let Err(err) = this.serve(announce).await {
- log::warn!("failed serving announce: {:?}, error: {}", info, err)
+ if let Err(err) = this.serve_publish_namespace(publish_ns).await {
+ log::warn!("failed serving publish_namespace: {:?}, error: {}", info, err)
}
- });
+ }.boxed());
+ },
+ Some(publish) = subscriber_publish.publish_received() => {
+ let this = self.clone();
+
+ tasks.push(async move {
+ let info = publish.info.clone();
+ log::info!("serving publish (track-level): {:?}", info);
+
+ if let Err(err) = this.serve_publish(publish).await {
+ log::warn!("failed serving publish: {:?}, error: {}", info, err)
+ }
+ }.boxed());
},
_ = tasks.next(), if !tasks.is_empty() => {},
else => return Ok(()),
@@ -59,12 +98,14 @@ impl Consumer {
}
}
- /// Serve an announce request.
- async fn serve(mut self, mut announce: Announced) -> Result<(), anyhow::Error> {
- let mut tasks = FuturesUnordered::new();
+ async fn serve_publish_namespace(
+ mut self,
+ mut publish_ns: PublishNamespaceReceived,
+ ) -> Result<(), anyhow::Error> {
+ let mut tasks: FuturesUnordered>> =
+ FuturesUnordered::new();
- // Produce the tracks for this announce and return the reader
- let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce();
+ let (writer, mut request, reader) = Tracks::new(publish_ns.namespace.clone()).produce();
// NOTE(mpandit): once the track is pulled from origin, internally it will be relayed
// from this metal only, because now coordinator will have entry for the namespace.
@@ -78,30 +119,54 @@ impl Consumer {
.await?;
// Register the local tracks, unregister on drop
- let _register = self.locals.register(reader.clone()).await?;
-
- // Accept the announce with an OK response
- announce.ok()?;
-
- // Forward the announce, if needed
- if let Some(mut forward) = self.forward {
- tasks.push(
- async move {
- log::info!("forwarding announce: {:?}", reader.info);
- forward
- .announce(reader)
- .await
- .context("failed forwarding announce")
- }
- .boxed(),
- );
+ let _register = self.locals.register(reader.clone(), writer).await?;
+
+ publish_ns.ok()?;
+
+ // Notify subscriber registry of the new PUBLISH_NAMESPACE
+ // This will trigger forwarding to matching SUBSCRIBE_NAMESPACE subscriptions
+ // Uses session_id for self-exclusion
+ if let Some(ref registry) = self.subscriber_registry {
+ let notified = registry.notify_publish_namespace(&publish_ns.namespace, self.session_id);
+ if notified > 0 {
+ log::info!(
+ "notified {} SUBSCRIBE_NAMESPACE subscriptions of PUBLISH_NAMESPACE {:?}",
+ notified,
+ publish_ns.namespace
+ );
+ }
}
+ // Forward publish_namespace upstream - keep handle alive in this scope
+ let _forwarded_publish_ns = if let Some(mut forward) = self.forward.clone() {
+ let reader_clone = reader.clone();
+ log::info!("forwarding publish_namespace: {:?}", reader_clone.info);
+ match forward.publish_namespace(reader_clone).await {
+ Ok(publish_ns) => {
+ if let Err(e) = publish_ns.ok().await {
+ log::warn!("publish_namespace not accepted: {}", e);
+ None
+ } else {
+ log::info!(
+ "publish_namespace forwarded and accepted: {:?}",
+ publish_ns.info.namespace
+ );
+ Some(publish_ns)
+ }
+ }
+ Err(e) => {
+ log::warn!("failed forwarding publish_namespace: {}", e);
+ None
+ }
+ }
+ } else {
+ None
+ };
+
// Serve subscribe requests
loop {
tokio::select! {
- // If the announce is closed, return the error
- Err(err) = announce.closed() => return Err(err.into()),
+ Err(err) = publish_ns.closed() => return Err(err.into()),
// Wait for the next subscriber and serve the track.
Some(track) = request.next() => {
@@ -125,4 +190,144 @@ impl Consumer {
}
}
}
+
+ async fn serve_publish(mut self, publish: PublishReceived) -> Result<(), anyhow::Error> {
+ let namespace = publish.info.track_namespace.clone();
+ let track_name = publish.info.track_name.clone();
+ let track_alias = publish.info.track_alias;
+ let initial_forward = publish.info.forward;
+ let publish_request_id = publish.info.id;
+ let track_extensions = publish.info.track_extensions.clone();
+
+ log::info!(
+ "received PUBLISH for track: {}/{} (forward={}, extensions={:?})",
+ namespace,
+ track_name,
+ initial_forward,
+ track_extensions
+ );
+
+ // Use auto-register variant to support SUBSCRIBE_NAMESPACE flow
+ // where PUBLISH can arrive without prior PUBLISH_NAMESPACE
+ let track_info = self
+ .locals
+ .get_or_create_track_info_auto_register(&namespace, &track_name);
+
+ let writer = match track_info.publish_arrived() {
+ Ok(w) => w,
+ Err(ServeError::Uninterested) => {
+ log::info!(
+ "PUBLISH rejected: already subscribed to {}/{}",
+ namespace,
+ track_name
+ );
+ publish.reject(ServeError::Uninterested.code(), "Already subscribed")?;
+ return Err(ServeError::Uninterested.into());
+ }
+ Err(ServeError::Duplicate) => {
+ log::info!(
+ "PUBLISH rejected: already publishing {}/{}",
+ namespace,
+ track_name
+ );
+ publish.reject(ServeError::Duplicate.code(), "Already publishing")?;
+ return Err(ServeError::Duplicate.into());
+ }
+ Err(e) => {
+ publish.reject(e.code(), &e.to_string())?;
+ return Err(e.into());
+ }
+ };
+
+ let reader = track_info.get_reader();
+
+ self.locals
+ .insert_track(&namespace, reader)
+ .context("failed to insert track into namespace")?;
+
+ // Store publish info for forward state management
+ track_info.set_publish_info(publish_request_id, initial_forward);
+
+ // Store track extensions for forwarding to subscribers
+ track_info.set_track_extensions(track_extensions);
+
+ // Include forward=1 in PUBLISH_OK to tell publisher to start sending immediately
+ let mut params = KeyValuePairs::default();
+ params.set_intvalue(0x10, 1); // Forward = 1
+
+ let msg = PublishOk {
+ id: publish.info.id,
+ params,
+ };
+
+ publish.accept(writer, msg)?;
+
+ log::info!(
+ "PUBLISH accepted, track {}/{} now in Publishing state (forward={})",
+ namespace,
+ track_name,
+ initial_forward
+ );
+
+ // Notify subscriber registry of the new PUBLISH
+ // This will trigger forwarding to matching SUBSCRIBE_NAMESPACE subscriptions
+ // Uses session_id for self-exclusion (don't notify the same session that sent the PUBLISH)
+ if let Some(ref registry) = self.subscriber_registry {
+ let notified = registry.notify_publish(&namespace, &track_name, track_alias, self.session_id);
+ if notified > 0 {
+ log::info!(
+ "notified {} SUBSCRIBE_NAMESPACE subscriptions of PUBLISH {}/{}",
+ notified,
+ namespace,
+ track_name
+ );
+ }
+ }
+
+ // If forward=0 (paused), wait for subscribers to request forwarding
+ // When forward state changes to 1, send REQUEST_UPDATE to publisher
+ if !initial_forward {
+ let forward_rx = track_info.forward_receiver();
+ if let Some(mut rx) = forward_rx {
+ log::info!(
+ "track {}/{} is paused (forward=0), waiting for subscriber to request forwarding",
+ namespace,
+ track_name
+ );
+
+ // Wait for forward state to change to true
+ loop {
+ rx.changed().await.ok();
+ if *rx.borrow() {
+ // Forward state changed to true, send REQUEST_UPDATE
+ log::info!(
+ "subscriber arrived for paused track {}/{}, sending REQUEST_UPDATE with forward=1",
+ namespace,
+ track_name
+ );
+
+ let mut params = KeyValuePairs::default();
+ params.set_intvalue(0x10, 1); // Forward = 1
+
+ let request_update = moq_transport::message::SubscribeUpdate {
+ id: self.subscriber.get_next_request_id(),
+ existing_request_id: publish_request_id,
+ params,
+ };
+
+ self.subscriber.send_message(request_update);
+ log::info!(
+ "sent REQUEST_UPDATE for track {}/{} (existing_request_id={})",
+ namespace,
+ track_name,
+ publish_request_id
+ );
+ break;
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/moq-relay-ietf/src/lib.rs b/moq-relay-ietf/src/lib.rs
index aac39326..11a9456f 100644
--- a/moq-relay-ietf/src/lib.rs
+++ b/moq-relay-ietf/src/lib.rs
@@ -36,6 +36,7 @@ mod producer;
mod relay;
mod remote;
mod session;
+mod subscriber_registry;
mod web;
pub use api::*;
@@ -46,4 +47,5 @@ pub use producer::*;
pub use relay::*;
pub use remote::*;
pub use session::*;
+pub use subscriber_registry::*;
pub use web::*;
diff --git a/moq-relay-ietf/src/local.rs b/moq-relay-ietf/src/local.rs
index 406e6650..e56211b3 100644
--- a/moq-relay-ietf/src/local.rs
+++ b/moq-relay-ietf/src/local.rs
@@ -1,17 +1,252 @@
use std::collections::hash_map;
use std::collections::HashMap;
-
-use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
+use std::sync::{Arc, Mutex, OnceLock};
use moq_transport::{
coding::TrackNamespace,
- serve::{ServeError, TracksReader},
+ data::ExtensionHeaders,
+ serve::{ServeError, Track, TrackReader, TrackWriter, TracksReader, TracksWriter},
};
+use tokio::sync::watch;
+
+#[repr(u8)]
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub enum TrackState {
+ Pending = 0,
+ Subscribing = 1,
+ Subscribed = 2,
+ Publishing = 3,
+ Closed = 4,
+}
+
+impl TrackState {
+ fn from_u8(v: u8) -> Self {
+ match v {
+ 0 => TrackState::Pending,
+ 1 => TrackState::Subscribing,
+ 2 => TrackState::Subscribed,
+ 3 => TrackState::Publishing,
+ _ => TrackState::Closed,
+ }
+ }
+}
+
+pub struct TrackInfo {
+ pub namespace: TrackNamespace,
+ pub name: String,
+
+ state: AtomicU8,
+ track_reader: OnceLock,
+ track_writer: Mutex