diff --git a/src/net.rs b/src/net.rs index 6098ec04..b32ad948 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1357,10 +1357,13 @@ impl Dialer { mod test { use std::time::Duration; + use anyhow::{anyhow, bail}; use bytes::Bytes; use futures_concurrency::future::TryJoin; use iroh::{protocol::Router, RelayMap, RelayMode, SecretKey}; + use n0_future::{FuturesUnordered, StreamExt}; use rand::Rng; + use testresult::TestResult; use tokio::{spawn, time::timeout}; use tokio_util::sync::CancellationToken; use tracing::{info, instrument}; @@ -2022,4 +2025,265 @@ mod test { Ok(()) } + + #[tokio::test(flavor = "multi_thread")] + async fn gossip_net_big() -> TestResult { + tracing_subscriber::fmt::try_init().ok(); + let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1); + let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap(); + let dns = iroh::test_utils::DnsPkarrServer::run().await?; + + let node_count: usize = std::env::var("NODE_COUNT") + .map(|x| x.parse().unwrap()) + .unwrap_or(100); + let message_count: usize = std::env::var("MESSAGE_COUNT") + .map(|x| x.parse().unwrap()) + .unwrap_or(2); + + let warmup_sleep_s = std::env::var("WARMUP_SLEEP") + .map(|x| x.parse().unwrap()) + .unwrap_or(1); + + let send_interval_ms = std::env::var("SEND_INTERVAL") + .map(|x| x.parse().unwrap()) + .unwrap_or(50); + + let timeout_ms = std::env::var("TIMEOUT") + .map(|x| x.parse().unwrap()) + .unwrap_or(10000); + let timeout = Duration::from_millis(timeout_ms); + info!("recv timeout: {timeout:?}"); + + // spawn + info!("spawn {node_count} nodes"); + let secret_keys = (0..node_count).map(|_i| SecretKey::generate(&mut rng)); + let spawning = FuturesUnordered::from_iter(secret_keys.map(|secret_key| { + let relay_map = relay_map.clone(); + let discovery = dns.discovery(secret_key.clone()); + let dns_resolver = dns.dns_resolver(); + task(async move { + let endpoint = Endpoint::builder() + .secret_key(secret_key) + .alpns(vec![GOSSIP_ALPN.to_vec()]) + .relay_mode(RelayMode::Custom(relay_map)) + .discovery(discovery) + .dns_resolver(dns_resolver) + .insecure_skip_relay_cert_verify(true) + .bind() + .await?; + let gossip = Gossip::builder().spawn(endpoint.clone()).await?; + let router = Router::builder(endpoint) + .accept(GOSSIP_ALPN, gossip.clone()) + .spawn(); + anyhow::Ok((router, gossip)) + }) + })); + let spawned: Vec<_> = spawning.try_collect().await?; + let (routers, gossips): (Vec<_>, Vec<_>) = spawned.into_iter().unzip(); + info!("all spawned"); + + // wait for all nodes to be visible on the router + for router in routers.iter() { + let node_id = router.endpoint().node_id(); + dns.on_node(&node_id, Duration::from_secs(1)).await?; + } + + info!("all published to discovery"); + + // bootstrap + let topic_id = TopicId::from_bytes([0u8; 32]); + + let bootstrap_node = routers[0].endpoint().node_id(); + + let mut senders = vec![]; + + let bootstrap_count = node_count.min(10).max(node_count / 50); + info!("start with {bootstrap_count} bootstrap nodes"); + let mut joining = FuturesUnordered::new(); + #[allow(clippy::needless_range_loop)] + for i in 0..bootstrap_count { + let bootstrap = if i == 0 { vec![] } else { vec![bootstrap_node] }; + let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split(); + let endpoint = routers[i].endpoint().clone(); + senders.push((sender, endpoint.node_id())); + joining.push( + async move { + receiver.joined().await?; + Ok((receiver, endpoint)) + } + .boxed(), + ); + } + + let joined: anyhow::Result> = joining.try_collect().await; + let mut receivers = joined.context("failed to join all nodes")?; + info!("bootstrap nodes joined"); + + info!("sleep {warmup_sleep_s}s for swarm to stabilize"); + tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await; + + info!("join {} remaining nodes", node_count - bootstrap_count); + let chunks = node_count / bootstrap_count; + for chunk in 1..chunks { + let mut joining = FuturesUnordered::new(); + #[allow(clippy::needless_range_loop)] + for j in 0..bootstrap_count { + let i = (chunk * bootstrap_count) + j; + if i >= node_count { + break; + } + let bootstrap = vec![routers[i % bootstrap_count].endpoint().node_id()]; + let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split(); + let endpoint = routers[i].endpoint().clone(); + senders.push((sender, endpoint.node_id())); + joining.push( + async move { + receiver.joined().await?; + Ok((receiver, endpoint)) + } + .boxed(), + ); + } + + let joined: anyhow::Result> = joining.try_collect().await; + receivers.extend(joined.context("failed to join all nodes")?); + info!("joined chunk {chunk} of {chunks} with {bootstrap_count}"); + } + + info!("sleep {warmup_sleep_s}s for swarm to stabilize"); + tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await; + + info!("sending & receiving {message_count} messages on each node"); + // spawn send tasks + let sending = senders.into_iter().enumerate().map(|(i, (sender, me))| { + task(async move { + for j in 0..message_count { + let message = format!("{}:{}", me.fmt_short(), j); + let message: Bytes = message.as_bytes().to_vec().into(); + sender.broadcast(message).await?; + tokio::time::sleep(Duration::from_millis(send_interval_ms)).await + } + debug!("{i}: sent all"); + anyhow::Ok((me, sender)) + }) + }); + let sending = FuturesUnordered::from_iter(sending); + + let all_messages: BTreeSet = routers + .iter() + .map(|r| r.endpoint().node_id()) + .flat_map(|node_id| { + (0..message_count) + .map(move |i| format!("{}:{}", node_id.fmt_short(), i).into_bytes().into()) + }) + .collect(); + let all_messages = Arc::new(all_messages); + + // closure to create a set of expected messages at a peer + let expected = move |all_messages: &BTreeSet, me: NodeId| -> BTreeSet { + let me = me.fmt_short(); + all_messages + .iter() + .filter(|m| !m.starts_with(me.as_bytes())) + .cloned() + .collect() + }; + + // spawn recv tasks + let receiving = receivers.into_iter().map(|(mut receiver, endpoint)| { + let all_messages = Arc::clone(&all_messages); + let me = endpoint.node_id(); + task(async move { + let mut missing = expected(&all_messages, endpoint.node_id()); + let timeout = tokio::time::sleep(timeout); + tokio::pin!(timeout); + let res = loop { + let event = tokio::select! { + res = receiver.next() => { + match res { + None => break Err(anyhow!("receiver closed")), + Some(Err(err)) => break Err(err.into()), + Some(Ok(event)) => event, + } + }, + _ = &mut timeout => break Err(anyhow!("timeout")) + }; + if let Event::Gossip(GossipEvent::Received(message)) = event { + if !missing.remove(&message.content) { + break Err(anyhow!( + "duplicate message: {:?} delivered from {}", + String::from_utf8_lossy(&message.content), + message.delivered_from.fmt_short() + )); + } + if missing.is_empty() { + break Ok(()); + } + } + }; + (receiver, missing, res) + }) + .map(move |res| (me, res)) + }); + let mut receiving = FuturesUnordered::from_iter(receiving); + + let senders_fut = async move { + let senders: Vec<_> = sending.try_collect().await?; + anyhow::Ok(senders) + }; + let expected_count = message_count * (node_count - 1); + let receivers_fut = task(async move { + let mut failed = 0; + let mut missing_total = 0; + let mut receivers = vec![]; + while let Some(res) = receiving.next().await { + let (node_id, (receiver, missing, res)) = res; + receivers.push(receiver); + match res { + Err(err) => { + missing_total += missing.len(); + failed += 1; + warn!(me=%node_id.fmt_short(), ?missing, "recv task failed: {err:#}"); + for m in missing { + let hash = blake3::hash(&m); + warn!(me=%node_id.fmt_short(), ?hash, "missing"); + } + } + Ok(()) => { + assert!(missing.is_empty()); + } + } + } + if failed > 0 { + bail!("Receive side failed: {failed} nodes together missed {missing_total} messages of {expected_count}"); + } else { + Ok(receivers) + } + }); + + let (senders, receivers) = (senders_fut, receivers_fut).try_join().await?; + info!("all done"); + assert_eq!(senders.len(), node_count); + assert_eq!(receivers.len(), node_count); + drop(senders); + drop(receivers); + let _ = FuturesUnordered::from_iter(gossips.iter().map(|gossip| gossip.shutdown())) + .count() + .await; + let mut shutdown = + FuturesUnordered::from_iter(routers.into_iter().map(|router| async move { + (router.endpoint().node_id(), router.shutdown().await) + })); + while let Some((node_id, res)) = shutdown.next().await { + res.with_context(|| format!("shutdown failed for {}", node_id.fmt_short()))?; + } + + Ok(()) + } + + async fn task( + fut: impl std::future::Future + Send + 'static, + ) -> T { + n0_future::task::spawn(fut).await.unwrap() + } }