Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 264 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,10 +1357,13 @@ impl Dialer {
mod test {
use std::time::Duration;

use anyhow::{anyhow, bail};
use bytes::Bytes;
use futures_concurrency::future::TryJoin;
use iroh::{protocol::Router, RelayMap, RelayMode, SecretKey};
use n0_future::{FuturesUnordered, StreamExt};
use rand::Rng;
use testresult::TestResult;
use tokio::{spawn, time::timeout};
use tokio_util::sync::CancellationToken;
use tracing::{info, instrument};
Expand Down Expand Up @@ -2022,4 +2025,265 @@ mod test {

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn gossip_net_big() -> TestResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a large enough test to merit some docs on its own. Could you describe first the setup (topology, number of nodes, churn, loss, etc, - you know the deal - plus listing what the env parameters are and what they mean in the context of the test) and then what it's asserting happens or does not happen?

tracing_subscriber::fmt::try_init().ok();
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
let dns = iroh::test_utils::DnsPkarrServer::run().await?;

let node_count: usize = std::env::var("NODE_COUNT")
.map(|x| x.parse().unwrap())
.unwrap_or(100);
let message_count: usize = std::env::var("MESSAGE_COUNT")
.map(|x| x.parse().unwrap())
.unwrap_or(2);

let warmup_sleep_s = std::env::var("WARMUP_SLEEP")
.map(|x| x.parse().unwrap())
.unwrap_or(1);

let send_interval_ms = std::env::var("SEND_INTERVAL")
.map(|x| x.parse().unwrap())
.unwrap_or(50);

let timeout_ms = std::env::var("TIMEOUT")
.map(|x| x.parse().unwrap())
.unwrap_or(10000);
let timeout = Duration::from_millis(timeout_ms);
info!("recv timeout: {timeout:?}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to have logs?I don't see anything initializing the logger and I ran this with RUST_LOG=debug, --nocapture too and got nothing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I readded the logging. I guess I removed it on accident inbetween.


// spawn
info!("spawn {node_count} nodes");
let secret_keys = (0..node_count).map(|_i| SecretKey::generate(&mut rng));
let spawning = FuturesUnordered::from_iter(secret_keys.map(|secret_key| {
let relay_map = relay_map.clone();
let discovery = dns.discovery(secret_key.clone());
let dns_resolver = dns.dns_resolver();
task(async move {
let endpoint = Endpoint::builder()
.secret_key(secret_key)
.alpns(vec![GOSSIP_ALPN.to_vec()])
.relay_mode(RelayMode::Custom(relay_map))
.discovery(discovery)
.dns_resolver(dns_resolver)
.insecure_skip_relay_cert_verify(true)
.bind()
.await?;
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
let router = Router::builder(endpoint)
.accept(GOSSIP_ALPN, gossip.clone())
.spawn();
anyhow::Ok((router, gossip))
})
}));
let spawned: Vec<_> = spawning.try_collect().await?;
let (routers, gossips): (Vec<_>, Vec<_>) = spawned.into_iter().unzip();
info!("all spawned");

// wait for all nodes to be visible on the router
for router in routers.iter() {
let node_id = router.endpoint().node_id();
dns.on_node(&node_id, Duration::from_secs(1)).await?;
}

info!("all published to discovery");

// bootstrap
let topic_id = TopicId::from_bytes([0u8; 32]);

let bootstrap_node = routers[0].endpoint().node_id();

let mut senders = vec![];

let bootstrap_count = node_count.min(10).max(node_count / 50);
info!("start with {bootstrap_count} bootstrap nodes");
let mut joining = FuturesUnordered::new();
#[allow(clippy::needless_range_loop)]
for i in 0..bootstrap_count {
let bootstrap = if i == 0 { vec![] } else { vec![bootstrap_node] };
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
let endpoint = routers[i].endpoint().clone();
senders.push((sender, endpoint.node_id()));
joining.push(
async move {
receiver.joined().await?;
Ok((receiver, endpoint))
}
.boxed(),
);
}

let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
let mut receivers = joined.context("failed to join all nodes")?;
info!("bootstrap nodes joined");

info!("sleep {warmup_sleep_s}s for swarm to stabilize");
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;

info!("join {} remaining nodes", node_count - bootstrap_count);
let chunks = node_count / bootstrap_count;
for chunk in 1..chunks {
let mut joining = FuturesUnordered::new();
#[allow(clippy::needless_range_loop)]
for j in 0..bootstrap_count {
let i = (chunk * bootstrap_count) + j;
if i >= node_count {
break;
}
let bootstrap = vec![routers[i % bootstrap_count].endpoint().node_id()];
let (sender, mut receiver) = gossips[i].subscribe(topic_id, bootstrap)?.split();
let endpoint = routers[i].endpoint().clone();
senders.push((sender, endpoint.node_id()));
joining.push(
async move {
receiver.joined().await?;
Ok((receiver, endpoint))
}
.boxed(),
);
}

let joined: anyhow::Result<Vec<_>> = joining.try_collect().await;
receivers.extend(joined.context("failed to join all nodes")?);
info!("joined chunk {chunk} of {chunks} with {bootstrap_count}");
}

info!("sleep {warmup_sleep_s}s for swarm to stabilize");
tokio::time::sleep(Duration::from_secs(warmup_sleep_s)).await;

info!("sending & receiving {message_count} messages on each node");
// spawn send tasks
let sending = senders.into_iter().enumerate().map(|(i, (sender, me))| {
task(async move {
for j in 0..message_count {
let message = format!("{}:{}", me.fmt_short(), j);
let message: Bytes = message.as_bytes().to_vec().into();
sender.broadcast(message).await?;
tokio::time::sleep(Duration::from_millis(send_interval_ms)).await
}
debug!("{i}: sent all");
anyhow::Ok((me, sender))
})
});
let sending = FuturesUnordered::from_iter(sending);

let all_messages: BTreeSet<Bytes> = routers
.iter()
.map(|r| r.endpoint().node_id())
.flat_map(|node_id| {
(0..message_count)
.map(move |i| format!("{}:{}", node_id.fmt_short(), i).into_bytes().into())
})
.collect();
let all_messages = Arc::new(all_messages);

// closure to create a set of expected messages at a peer
let expected = move |all_messages: &BTreeSet<Bytes>, me: NodeId| -> BTreeSet<Bytes> {
let me = me.fmt_short();
all_messages
.iter()
.filter(|m| !m.starts_with(me.as_bytes()))
.cloned()
.collect()
};

// spawn recv tasks
let receiving = receivers.into_iter().map(|(mut receiver, endpoint)| {
let all_messages = Arc::clone(&all_messages);
let me = endpoint.node_id();
task(async move {
let mut missing = expected(&all_messages, endpoint.node_id());
let timeout = tokio::time::sleep(timeout);
tokio::pin!(timeout);
let res = loop {
let event = tokio::select! {
res = receiver.next() => {
match res {
None => break Err(anyhow!("receiver closed")),
Some(Err(err)) => break Err(err.into()),
Some(Ok(event)) => event,
}
},
_ = &mut timeout => break Err(anyhow!("timeout"))
};
if let Event::Gossip(GossipEvent::Received(message)) = event {
if !missing.remove(&message.content) {
break Err(anyhow!(
"duplicate message: {:?} delivered from {}",
String::from_utf8_lossy(&message.content),
message.delivered_from.fmt_short()
));
}
if missing.is_empty() {
break Ok(());
}
}
};
(receiver, missing, res)
})
.map(move |res| (me, res))
});
let mut receiving = FuturesUnordered::from_iter(receiving);

let senders_fut = async move {
let senders: Vec<_> = sending.try_collect().await?;
anyhow::Ok(senders)
};
let expected_count = message_count * (node_count - 1);
let receivers_fut = task(async move {
let mut failed = 0;
let mut missing_total = 0;
let mut receivers = vec![];
while let Some(res) = receiving.next().await {
let (node_id, (receiver, missing, res)) = res;
receivers.push(receiver);
match res {
Err(err) => {
missing_total += missing.len();
failed += 1;
warn!(me=%node_id.fmt_short(), ?missing, "recv task failed: {err:#}");
for m in missing {
let hash = blake3::hash(&m);
warn!(me=%node_id.fmt_short(), ?hash, "missing");
}
}
Ok(()) => {
assert!(missing.is_empty());
}
}
}
if failed > 0 {
bail!("Receive side failed: {failed} nodes together missed {missing_total} messages of {expected_count}");
} else {
Ok(receivers)
}
});

let (senders, receivers) = (senders_fut, receivers_fut).try_join().await?;
info!("all done");
assert_eq!(senders.len(), node_count);
assert_eq!(receivers.len(), node_count);
drop(senders);
drop(receivers);
let _ = FuturesUnordered::from_iter(gossips.iter().map(|gossip| gossip.shutdown()))
.count()
.await;
let mut shutdown =
FuturesUnordered::from_iter(routers.into_iter().map(|router| async move {
(router.endpoint().node_id(), router.shutdown().await)
}));
while let Some((node_id, res)) = shutdown.next().await {
res.with_context(|| format!("shutdown failed for {}", node_id.fmt_short()))?;
}

Ok(())
}

async fn task<T: Send + 'static>(
fut: impl std::future::Future<Output = T> + Send + 'static,
) -> T {
n0_future::task::spawn(fut).await.unwrap()
}
}
Loading