Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion moq-relay-ietf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ url = "2"

# Async stuff
tokio = { version = "1", features = ["full"] }
# tokio-util = "0.7"
tokio-util = "0.7"
futures = "0.3"
async-trait = "0.1"

Expand Down
61 changes: 27 additions & 34 deletions moq-relay-ietf/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use moq_transport::{

use crate::{
metrics::{GaugeGuard, TimingGuard},
Locals, RemotesConsumer,
Locals, RemoteManager,
};

/// Producer of tracks to a remote Subscriber
#[derive(Clone)]
pub struct Producer {
publisher: Publisher,
locals: Locals,
remotes: Option<RemotesConsumer>,
remotes: RemoteManager,
/// The resolved scope identity for this session, if any.
/// Produced by `Coordinator::resolve_scope()` from the connection path.
/// Passed to locals/remotes to isolate namespace lookups.
Expand All @@ -28,7 +28,7 @@ impl Producer {
pub fn new(
publisher: Publisher,
locals: Locals,
remotes: Option<RemotesConsumer>,
remotes: RemoteManager,
scope: Option<String>,
) -> Self {
Self {
Expand Down Expand Up @@ -122,40 +122,33 @@ impl Producer {
}
}

if let Some(remotes) = self.remotes {
// Check remote tracks second, and serve from remote if possible
match remotes.route(self.scope.as_deref(), &namespace).await {
Ok(remote) => {
if let Some(remote) = remote {
if let Some(track) = remote.subscribe(&namespace, &track_name)? {
let ns = namespace.to_utf8_path();
tracing::info!(namespace = %ns, track = %track_name, source = "remote", "serving subscribe from remote: {:?}", track.info);
// Update label to indicate remote source, timing recorded on drop
timing_guard.set_label("source", "remote");
// Track active tracks - decrements when serve completes
let _track_guard = GaugeGuard::new("moq_relay_active_tracks");
return Ok(subscribed.serve(track.reader).await?);
}
}
}
Err(e) => {
// Route error = infrastructure failure (couldn't reach coordinator/upstream)
// This is different from "not found" - we don't know if the track exists
match self
.remotes
.subscribe(self.scope.as_deref(), &namespace, &track_name)
.await
{
Ok(track) => {
if let Some(track) = track {
let ns = namespace.to_utf8_path();
tracing::error!(namespace = %ns, track = %track_name, error = %e, "failed to route to remote: {}", e);
timing_guard.set_label("source", "route_error");
metrics::counter!("moq_relay_subscribe_route_errors_total").increment(1);

// Return an internal error rather than "not found" since we couldn't check
// TODO: Consider returning a more specific error to the subscriber
let err = ServeError::internal_ctx(format!(
"route error for namespace '{}': {}",
namespace, e
));
subscribed.close(err.clone())?;
return Err(err.into());
tracing::info!(namespace = %ns, track = %track_name, source = "remote", "serving subscribe from remote: {:?}", track.info);
timing_guard.set_label("source", "remote");
let _track_guard = GaugeGuard::new("moq_relay_active_tracks");
return Ok(subscribed.serve(track).await?);
}
}
Err(e) => {
let ns = namespace.to_utf8_path();
tracing::error!(namespace = %ns, track = %track_name, error = %e, "failed to route to remote: {}", e);
timing_guard.set_label("source", "route_error");
metrics::counter!("moq_relay_subscribe_route_errors_total").increment(1);

let err = ServeError::internal_ctx(format!(
"route error for namespace '{}': {}",
namespace, e
));
subscribed.close(err.clone())?;
return Err(err.into());
}
}

// Track not found - we checked all sources and the track doesn't exist
Expand Down
20 changes: 5 additions & 15 deletions moq-relay-ietf/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_native_ietf::quic::{self, Endpoint};
use url::Url;

use crate::{
metrics::GaugeGuard, Consumer, Coordinator, Locals, Producer, Remotes, RemotesConsumer,
RemotesProducer, Session,
};
use crate::{metrics::GaugeGuard, Consumer, Coordinator, Locals, Producer, RemoteManager, Session};

// A type alias for boxed future
type ServerFuture = Pin<
Expand Down Expand Up @@ -64,7 +61,7 @@ pub struct Relay {
announce_url: Option<Url>,
mlog_dir: Option<PathBuf>,
locals: Locals,
remotes: Option<(RemotesProducer, RemotesConsumer)>,
remotes: RemoteManager,
coordinator: Arc<dyn Coordinator>,
}

Expand Down Expand Up @@ -109,18 +106,14 @@ impl Relay {
.collect::<Vec<_>>();

// Create remote manager - uses coordinator for namespace lookups
let remotes = Remotes {
coordinator: config.coordinator.clone(),
quic: remote_clients[0].clone(),
}
.produce();
let remotes = RemoteManager::new(config.coordinator.clone(), remote_clients);

Ok(Self {
quic_endpoints: endpoints,
announce_url: config.announce,
mlog_dir: config.mlog_dir,
locals,
remotes: Some(remotes),
remotes,
coordinator: config.coordinator,
})
}
Expand All @@ -130,10 +123,7 @@ impl Relay {
let mut tasks = FuturesUnordered::new();

// Split remotes producer/consumer and spawn producer task
let remotes = self.remotes.map(|(producer, consumer)| {
tasks.push(producer.run().boxed());
consumer
});
let remotes = self.remotes;

// Start the forwarder, if any
let forward_producer = if let Some(url) = &self.announce_url {
Expand Down
Loading
Loading