Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
02b232a
refactor: restructure relay into lib/bin and add coordinator interface
itzmanish Nov 26, 2025
492309a
feat: add file-based coordinator and rewrote remote for handling remo…
itzmanish Nov 28, 2025
767097f
refactor: simplify QUIC config initialization using new Config::new c…
itzmanish Dec 1, 2025
02bf324
fix: return clients on lookup for coordinator and misc fix
itzmanish Dec 2, 2025
9d4e76d
docs: clarify coordinator file usage in CLI help text and add FIXME f…
itzmanish Dec 2, 2025
838b5ce
fix: add lifetime parameter to lookup method signature for proper bor…
itzmanish Dec 2, 2025
9b91003
fix: prevent file truncation when opening for read/write in FileCoord…
itzmanish Dec 2, 2025
ecc63ee
refactor: remove track registration from coordinator interface and fi…
itzmanish Dec 2, 2025
f690c31
fix: update lookup signature to return owned Client instead of reference
itzmanish Dec 2, 2025
395ef6a
style: format track namespace test by removing unnecessary line breaks
itzmanish Dec 2, 2025
b556a97
fix: if host is IpAddr construct socket addr else resolve dns
itzmanish Dec 8, 2025
37b0c5d
fix: race and proper task shutdown
itzmanish Dec 18, 2025
af96cd6
fix: clippy warnings
itzmanish Dec 18, 2025
778d6f4
fix: clippy unused imports
itzmanish Dec 18, 2025
bd89b4a
fix: remove once_cell to pass the test
itzmanish Dec 18, 2025
88e413c
fix: seperate RemoteManager rewrite to different PR
itzmanish Dec 18, 2025
f331f23
fix: linter
itzmanish Dec 18, 2025
afc9cb0
fix: ci
itzmanish Dec 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

182 changes: 167 additions & 15 deletions moq-native-ietf/src/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
fmt,
fs::File,
io::BufWriter,
net,
Expand All @@ -17,6 +18,25 @@ use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::FutureExt;

/// Represents the address family of the local QUIC socket.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddressFamily {
Ipv4,
Ipv6,
/// IPv6 with dual-stack support (Linux)
Ipv6DualStack,
}

impl fmt::Display for AddressFamily {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AddressFamily::Ipv4 => write!(f, "IPv4"),
AddressFamily::Ipv6 => write!(f, "IPv6"),
AddressFamily::Ipv6DualStack => write!(f, "IPv6 (dual stack)"),
}
}
}

/// Build a TransportConfig with our standard settings
///
/// This is used both for the base endpoint config and when creating
Expand Down Expand Up @@ -57,20 +77,43 @@ impl Default for Args {
impl Args {
pub fn load(&self) -> anyhow::Result<Config> {
let tls = self.tls.load()?;
Ok(Config {
bind: self.bind,
qlog_dir: self.qlog_dir.clone(),
tls,
})
Ok(Config::new(self.bind, self.qlog_dir.clone(), tls))
}
}

pub struct Config {
pub bind: net::SocketAddr,
pub bind: Option<net::SocketAddr>,
pub socket: net::UdpSocket,
pub qlog_dir: Option<PathBuf>,
pub tls: tls::Config,
}

impl Config {
pub fn new(bind: net::SocketAddr, qlog_dir: Option<PathBuf>, tls: tls::Config) -> Self {
Self {
bind: Some(bind),
socket: net::UdpSocket::bind(bind)
.context("failed to bind socket")
.unwrap(),
qlog_dir,
tls,
}
}

pub fn with_socket(
socket: net::UdpSocket,
qlog_dir: Option<PathBuf>,
tls: tls::Config,
) -> Self {
Self {
bind: None,
socket,
qlog_dir,
tls,
}
}
}

pub struct Endpoint {
pub client: Client,
pub server: Option<Server>,
Expand Down Expand Up @@ -111,13 +154,13 @@ impl Endpoint {
// There's a bit more boilerplate to make a generic endpoint.
let runtime = quinn::default_runtime().context("no async runtime")?;
let endpoint_config = quinn::EndpointConfig::default();
let socket = std::net::UdpSocket::bind(config.bind).context("failed to bind UDP socket")?;
let socket = config.socket;

// Create the generic QUIC endpoint.
let quic = quinn::Endpoint::new(endpoint_config, server_config.clone(), socket, runtime)
.context("failed to create QUIC endpoint")?;

let server = server_config.clone().map(|base_server_config| Server {
let server = server_config.map(|base_server_config| Server {
quic: quic.clone(),
accept: Default::default(),
qlog_dir: config.qlog_dir.map(Arc::new),
Expand Down Expand Up @@ -270,7 +313,34 @@ pub struct Client {
}

impl Client {
pub async fn connect(&self, url: &Url) -> anyhow::Result<(web_transport::Session, String)> {
/// Returns the local address of the QUIC socket.
pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
self.quic
.local_addr()
.context("failed to get local address")
}

/// Returns the address family of the local QUIC socket.
pub fn address_family(&self) -> anyhow::Result<AddressFamily> {
let local_addr = self
.quic
.local_addr()
.context("failed to get local socket address")?;

if local_addr.is_ipv4() {
Ok(AddressFamily::Ipv4)
} else if cfg!(target_os = "linux") {
Ok(AddressFamily::Ipv6DualStack)
} else {
Ok(AddressFamily::Ipv6)
}
}

pub async fn connect(
&self,
url: &Url,
socket_addr: Option<net::SocketAddr>,
) -> anyhow::Result<(web_transport::Session, String)> {
let mut config = self.config.clone();

// TODO support connecting to both ALPNs at the same time
Expand Down Expand Up @@ -303,12 +373,15 @@ impl Client {
let host = url.host().context("invalid DNS name")?.to_string();
let port = url.port().unwrap_or(443);

// Look up the DNS entry.
let addr = tokio::net::lookup_host((host.clone(), port))
.await
.context("failed DNS lookup")?
.next()
.context("no DNS entries")?;
// Look up the DNS entry and filter by socket address family.
let addr = match socket_addr {
Some(addr) => addr,
None => {
// Default DNS resolution logic
self.resolve_dns(&host, port, self.address_family()?)
.await?
}
};

let connection = self.quic.connect_with(config, addr, &host)?.await?;

Expand All @@ -328,4 +401,83 @@ impl Client {

Ok((session.into(), connection_id_hex))
}

/// Default DNS resolution logic that filters results by address family.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool - this appears to accomplish the same things I did in this PR: #111

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Sorry I noticed this later after doing the PR. But yeah this now covers for dns resolution by considering address family. Also I am planning to pass custom dns resolution logic in a seperate PR later.

async fn resolve_dns(
&self,
host: &str,
port: u16,
address_family: AddressFamily,
) -> anyhow::Result<net::SocketAddr> {
let local_addr = self.local_addr()?;

// Collect all DNS results
let addrs: Vec<net::SocketAddr> = tokio::net::lookup_host((host, port))
.await
.context("failed DNS lookup")?
.collect();

if addrs.is_empty() {
anyhow::bail!("DNS lookup for host '{}' returned no addresses", host);
}

// Log all DNS results for debugging
log::debug!(
"DNS lookup for {}, family {:?}: found {} results",
host,
address_family,
addrs.len()
);
for (i, addr) in addrs.iter().enumerate() {
log::debug!(
" DNS[{}]: {} ({})",
i,
addr,
if addr.is_ipv4() { "IPv4" } else { "IPv6" }
);
}

// Filter DNS results to match our local socket's address family
let compatible_addr = match address_family {
AddressFamily::Ipv4 => {
// IPv4 socket: filter to IPv4 addresses
addrs
.iter()
.find(|a| a.is_ipv4())
.cloned()
.context(format!(
"No IPv4 address found for host '{}' (local socket is IPv4: {})",
host, local_addr
))?
}
AddressFamily::Ipv6DualStack => {
// IPv6 socket on Linux: dual-stack, use first result
log::debug!(
"Using first DNS result (Linux IPv6 dual-stack): {}",
addrs[0]
);
addrs[0]
}
AddressFamily::Ipv6 => {
// IPv6 socket non-Linux: filter to IPv6 addresses
addrs
.iter()
.find(|a| a.is_ipv6())
.cloned()
.context(format!(
"No IPv6 address found for host '{}' (local socket is IPv6: {})",
host, local_addr
))?
}
};

log::debug!(
"Connecting from {} to {} (selected from {} DNS results)",
local_addr,
compatible_addr,
addrs.len()
);

Ok(compatible_addr)
}
}
20 changes: 18 additions & 2 deletions moq-relay-ietf/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "moq-relay-ietf"
description = "Media over QUIC"
authors = ["Luke Curley"]
repository = "https://github.com/englishm/moq-rs"
authors = ["Luke Curley", "Manish Kumar Pandit"]
repository = "https://github.com/cloudflare/moq-rs"
license = "MIT OR Apache-2.0"

version = "0.7.5"
Expand All @@ -11,6 +11,14 @@ edition = "2021"
keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

[lib]
name = "moq_relay_ietf"
path = "src/lib.rs"

[[bin]]
name = "moq-relay-ietf"
path = "src/bin/moq-relay-ietf/main.rs"

[dependencies]
moq-transport = { path = "../moq-transport", version = "0.11" }
moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" }
Expand All @@ -22,6 +30,7 @@ url = "2"
# Async stuff
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"

# Web server to serve the fingerprint
axum = { version = "0.7", features = ["tokio"] }
Expand All @@ -31,6 +40,13 @@ hyper-serve = { version = "0.6", features = [
tower-http = { version = "0.5", features = ["cors"] }
hex = "0.4"

# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"

# File locking
fs2 = "0.4"

# Error handling
anyhow = { version = "1", features = ["backtrace"] }

Expand Down
Loading
Loading