From 3bf68da50cf789bff299475f33129b8bdff3c9c4 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 24 Apr 2026 22:06:14 +0200 Subject: [PATCH 1/4] test(p2p/sync): import in place of fully qualified name --- crates/p2p/src/sync/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/p2p/src/sync/tests.rs b/crates/p2p/src/sync/tests.rs index e9fce54591..d9b1394c82 100644 --- a/crates/p2p/src/sync/tests.rs +++ b/crates/p2p/src/sync/tests.rs @@ -11,10 +11,10 @@ use crate::sync::behaviour::Behaviour; use crate::sync::client::Client; use crate::sync::protocol::codec; use crate::sync::{Config, Event}; -use crate::test_utils::peer::TestPeerBuilder; +use crate::test_utils::peer::{TestPeer, TestPeerBuilder}; use crate::test_utils::{consume_all_events_forever, filter_events}; -type SyncTestPeer = crate::test_utils::peer::TestPeer; +type SyncTestPeer = TestPeer; fn create_peer() -> SyncTestPeer { TestPeerBuilder::new() From 804488cbeb2b459598d93d0edb00efbb5b570e39 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 24 Apr 2026 22:41:34 +0200 Subject: [PATCH 2/4] test(p2p): use unbounded channel for test events --- crates/p2p/src/core/tests.rs | 31 ++------------------------ crates/p2p/src/main_loop.rs | 22 ++++++------------ crates/p2p/src/sync/tests.rs | 8 +------ crates/p2p/src/test_utils.rs | 26 ++------------------- crates/p2p/src/test_utils/main_loop.rs | 7 +++--- crates/p2p/src/test_utils/peer.rs | 15 +++---------- 6 files changed, 18 insertions(+), 91 deletions(-) diff --git a/crates/p2p/src/core/tests.rs b/crates/p2p/src/core/tests.rs index 3fd17d3664..e2067265da 100644 --- a/crates/p2p/src/core/tests.rs +++ b/crates/p2p/src/core/tests.rs @@ -8,12 +8,12 @@ use super::TestEvent; use crate::core::config::RateLimit; use crate::core::Config; use crate::test_utils::peer::TestPeer; -use crate::test_utils::{consume_accumulated_events, consume_all_events_forever, wait_for_event}; +use crate::test_utils::wait_for_event; #[test_log::test(tokio::test)] async fn dial() { // tokio::time::pause() does not make a difference - let mut peer1 = TestPeer::default(); + let peer1 = TestPeer::default(); let mut peer2 = TestPeer::default(); let addr2 = peer2.start_listening().await.unwrap(); @@ -21,8 +21,6 @@ async fn dial() { peer1.client.dial(peer2.peer_id, addr2).await.unwrap(); - consume_accumulated_events(&mut peer1.test_event_receiver).await; - let peers_of1: Vec<_> = peer1.connected().await.into_keys().collect(); let peers_of2: Vec<_> = peer2.connected().await.into_keys().collect(); @@ -40,8 +38,6 @@ async fn disconnect() { peer1.client.dial(peer2.peer_id, addr2).await.unwrap(); - consume_accumulated_events(&mut peer1.test_event_receiver).await; - let peers_of1: Vec<_> = peer1.connected().await.into_keys().collect(); let peers_of2: Vec<_> = peer2.connected().await.into_keys().collect(); @@ -103,8 +99,6 @@ async fn periodic_bootstrap() { _ => None, }; - consume_all_events_forever(boot.test_event_receiver); - let peer_id2 = peer2.peer_id; let peer2_added_to_dht_of_peer1 = @@ -127,8 +121,6 @@ async fn periodic_bootstrap() { }) .await; - consume_all_events_forever(peer1.test_event_receiver); - assert_eq!( boot.client.for_test().get_peers_from_dht().await, [peer1.peer_id, peer2.peer_id].into() @@ -316,12 +308,6 @@ async fn outbound_peer_eviction() { let outbound_addr4 = outbound4.start_listening().await.unwrap(); tracing::info!(%outbound4.peer_id, %outbound_addr4); - consume_all_events_forever(outbound1.test_event_receiver); - consume_all_events_forever(outbound2.test_event_receiver); - consume_all_events_forever(outbound3.test_event_receiver); - consume_all_events_forever(outbound4.test_event_receiver); - consume_all_events_forever(inbound1.test_event_receiver); - // Open one inbound connection. This connection is never touched. inbound1 .client @@ -339,8 +325,6 @@ async fn outbound_peer_eviction() { .await .unwrap(); - consume_accumulated_events(&mut peer.test_event_receiver).await; - // Trying to open another one fails, because no peers are marked as not useful, // and hence no peer can be evicted. let result = peer @@ -442,8 +426,6 @@ async fn inbound_peer_eviction() { assert_eq!(connected.len(), 26); assert!(connected.contains_key(&outbound1.peer_id)); - consume_accumulated_events(&mut peer.test_event_receiver).await; - // Trying to open another one causes an eviction. inbound_peers .last() @@ -519,8 +501,6 @@ async fn evicted_peer_reconnection() { let result = peer1.client.dial(peer2.peer_id, addr2.clone()).await; assert!(result.is_err()); - consume_accumulated_events(&mut peer2.test_event_receiver).await; - // In this case there is no peer ID when connecting, so the connection gets // closed after being established. peer2 @@ -559,8 +539,6 @@ async fn ip_whitelist() { let addr1 = peer1.start_listening().await.unwrap(); tracing::info!(%peer1.peer_id, %addr1); - consume_all_events_forever(peer2.test_event_receiver); - // Can't open the connection because peer2 is bound to 127.0.0.1 and peer1 only // allows 127.0.0.2. let result = peer2.client.dial(peer1.peer_id, addr1.clone()).await; @@ -602,11 +580,6 @@ async fn rate_limit() { let addr1 = peer1.start_listening().await.unwrap(); tracing::info!(%peer1.peer_id, %addr1); - consume_all_events_forever(peer1.test_event_receiver); - consume_all_events_forever(peer2.test_event_receiver); - consume_all_events_forever(peer3.test_event_receiver); - consume_all_events_forever(peer4.test_event_receiver); - // Two connections can be opened, but the third one is rate limited. peer2 diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index dee8fdcea7..695342390f 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -41,10 +41,6 @@ const COMMAND_CHANNEL_SIZE_LIMIT: usize = 1024; /// free however we must ensure that there is rate limiting employed on the /// network layer side so that the event channel does not actually grow /// indefinitely in some situations. -/// -/// TODO Determine a safe maximum size for the channels using stress tests with -/// network layer rate limiting in place and replace them with fixed size -/// channels of sufficient size. pub struct MainLoop where B: ApplicationBehaviour, @@ -65,8 +61,8 @@ where data_directory: PathBuf, /// State of the application behaviour. state: State, - _test_event_sender: mpsc::Sender, - _test_event_receiver: Option>, + _test_event_sender: mpsc::UnboundedSender, + _test_event_receiver: Option>, _pending_test_queries: TestQueries, /// We keep a single command sender instance at all times so that receiver /// can be polled even without any client instance available without @@ -112,13 +108,9 @@ where Self, mpsc::UnboundedSender::Command>>, ) { - // Test event buffer is not used outside tests, so we can make it as small as - // possible - #[cfg(not(test))] - const TEST_EVENT_BUFFER_SIZE: usize = 1; - #[cfg(test)] - const TEST_EVENT_BUFFER_SIZE: usize = 1000; - let (_test_event_sender, rx) = mpsc::channel(TEST_EVENT_BUFFER_SIZE); + // Test event buffer is not used outside tests, so it is safe to make it + // unbounded as it will never contain any items in production. + let (_test_event_sender, rx) = mpsc::unbounded_channel(); let (command_sender, command_receiver) = mpsc::unbounded_channel(); @@ -679,14 +671,14 @@ where B: ApplicationBehaviour, { #[cfg(test)] - pub fn take_test_event_receiver(&mut self) -> mpsc::Receiver { + pub fn take_test_event_receiver(&mut self) -> mpsc::UnboundedReceiver { Option::take(&mut self._test_event_receiver) .expect("Test event receiver not to have been taken before") } } /// No-op outside tests -async fn send_test_event(_event_sender: &mpsc::Sender, _event: TestEvent) { +async fn send_test_event(_event_sender: &mpsc::UnboundedSender, _event: TestEvent) { #[cfg(test)] test_utils::main_loop::send_event(_event_sender, _event).await } diff --git a/crates/p2p/src/sync/tests.rs b/crates/p2p/src/sync/tests.rs index d9b1394c82..02897ed09f 100644 --- a/crates/p2p/src/sync/tests.rs +++ b/crates/p2p/src/sync/tests.rs @@ -11,8 +11,8 @@ use crate::sync::behaviour::Behaviour; use crate::sync::client::Client; use crate::sync::protocol::codec; use crate::sync::{Config, Event}; +use crate::test_utils::filter_events; use crate::test_utils::peer::{TestPeer, TestPeerBuilder}; -use crate::test_utils::{consume_all_events_forever, filter_events}; type SyncTestPeer = TestPeer; @@ -89,9 +89,6 @@ mod successful_sync { _ => None, }); - // This is to keep peer2's event loop going - consume_all_events_forever(peer2.app_event_receiver); - // Peer2 sends the request to peer1, and waits for the response receiver let mut rx = Client::from(peer2.client.as_pair()) .$req_fn(peer1.peer_id, expected_request) @@ -304,9 +301,6 @@ mod propagate_codec_errors_to_caller { _ => None, }); - // This is to keep peer2's event loop going - consume_all_events_forever(peer2.app_event_receiver); - // Peer2 sends the request to peer1, and waits for the response receiver let mut rx = Client::from(peer2.client.as_pair()) .$req_fn(peer1.peer_id, expected_request) diff --git a/crates/p2p/src/test_utils.rs b/crates/p2p/src/test_utils.rs index b3dfa98df9..4079ca103c 100644 --- a/crates/p2p/src/test_utils.rs +++ b/crates/p2p/src/test_utils.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc; /// `f` should return `None`. This function returns a receiver to the filtered /// events' data channel. pub(crate) fn filter_events( - mut event_receiver: mpsc::Receiver, + mut event_receiver: mpsc::UnboundedReceiver, f: impl FnOnce(Event) -> Option + Copy + Send + 'static, ) -> mpsc::Receiver where @@ -36,7 +36,7 @@ where /// Wait for a specific event to happen. pub(crate) async fn wait_for_event( - event_receiver: &mut mpsc::Receiver, + event_receiver: &mut mpsc::UnboundedReceiver, mut f: impl FnMut(Event) -> Option, ) -> Option where @@ -50,25 +50,3 @@ where } None } - -/// Consume all events that have accumulated for the peer so far. You don't care -/// about any of those events in the queue __right now__, but later you may do -/// something that triggers new events for this peer, which you may care for. -pub(crate) async fn consume_accumulated_events(event_receiver: &mut mpsc::Receiver) -where - Event: Send + 'static, -{ - while event_receiver.try_recv().is_ok() {} -} - -/// Consume all events from a peer to keep its main loop going. You don't care -/// about any of those events. -/// -/// [`MainLoop`](p2p::MainLoop)'s event channel size is 1, so we need to consume -/// all events as soon as they're sent otherwise the main loop will stall -pub(crate) fn consume_all_events_forever(mut event_receiver: mpsc::Receiver) -where - Event: Send + 'static, -{ - tokio::spawn(async move { while (event_receiver.recv().await).is_some() {} }); -} diff --git a/crates/p2p/src/test_utils/main_loop.rs b/crates/p2p/src/test_utils/main_loop.rs index 43b26fee7f..caab85d7cd 100644 --- a/crates/p2p/src/test_utils/main_loop.rs +++ b/crates/p2p/src/test_utils/main_loop.rs @@ -7,7 +7,7 @@ use tokio::sync::mpsc; use crate::core::{Behaviour, Event, TestCommand, TestEvent}; pub async fn handle_event( - event_sender: &mpsc::Sender, + event_sender: &mpsc::UnboundedSender, event: SwarmEvent>, ) { if let SwarmEvent::NewListenAddr { address, .. } = event { @@ -58,16 +58,15 @@ pub async fn handle_command( } } -pub async fn send_event(event_sender: &mpsc::Sender, event: TestEvent) { +pub async fn send_event(event_sender: &mpsc::UnboundedSender, event: TestEvent) { event_sender .send(event) - .await .expect("Event receiver not to be dropped"); } pub async fn query_completed( _pending_test_queries: &mut PendingQueries, - _event_sender: &mpsc::Sender, + _event_sender: &mpsc::UnboundedSender, _id: QueryId, _result: QueryResult, ) { diff --git a/crates/p2p/src/test_utils/peer.rs b/crates/p2p/src/test_utils/peer.rs index 47f9606d9c..21bab5abf1 100644 --- a/crates/p2p/src/test_utils/peer.rs +++ b/crates/p2p/src/test_utils/peer.rs @@ -27,8 +27,8 @@ where pub keypair: Keypair, pub peer_id: PeerId, pub client: Client<::Command>, - pub app_event_receiver: mpsc::Receiver<::Event>, - pub test_event_receiver: mpsc::Receiver, + pub app_event_receiver: mpsc::UnboundedReceiver<::Event>, + pub test_event_receiver: mpsc::UnboundedReceiver, pub main_loop_jh: JoinHandle<()>, } @@ -97,19 +97,10 @@ where p2p_builder.disable_kademlia_for_test() }; - let (client, mut event_receiver, mut main_loop) = p2p_builder + let (client, app_event_receiver, mut main_loop) = p2p_builder .app_behaviour(app_behaviour.expect("App behaviour to be set in this phase")) .build(); - // Ensure that the channel keeps being polled to move the main loop forward. - // Store the polled events into a buffered channel instead. - let (buf_sender, app_event_receiver) = tokio::sync::mpsc::channel(1024); - tokio::spawn(async move { - while let Some(event) = event_receiver.recv().await { - buf_sender.send(event).await.unwrap(); - } - }); - let test_event_receiver = main_loop.take_test_event_receiver(); let main_loop_jh = tokio::spawn(main_loop.run()); From d47719feade571204ce914b90fe37d3785ef619d Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 24 Apr 2026 22:57:53 +0200 Subject: [PATCH 3/4] test(p2p): move wait_for_events into TestPeer --- crates/p2p/src/core/tests.rs | 214 ++++++++++++++++-------------- crates/p2p/src/test_utils.rs | 17 --- crates/p2p/src/test_utils/peer.rs | 17 +++ 3 files changed, 129 insertions(+), 119 deletions(-) diff --git a/crates/p2p/src/core/tests.rs b/crates/p2p/src/core/tests.rs index e2067265da..16f8e77629 100644 --- a/crates/p2p/src/core/tests.rs +++ b/crates/p2p/src/core/tests.rs @@ -8,7 +8,6 @@ use super::TestEvent; use crate::core::config::RateLimit; use crate::core::Config; use crate::test_utils::peer::TestPeer; -use crate::test_utils::wait_for_event; #[test_log::test(tokio::test)] async fn dial() { @@ -46,17 +45,19 @@ async fn disconnect() { peer2.client.disconnect(peer1.peer_id).await.unwrap(); - wait_for_event(&mut peer1.test_event_receiver, move |event| match event { - TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; - wait_for_event(&mut peer2.test_event_receiver, move |event| match event { - TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()), - _ => None, - }) - .await; + peer2 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()), + _ => None, + }) + .await; assert!(peer1.connected().await.is_empty()); assert!(peer2.connected().await.is_empty()); @@ -101,23 +102,18 @@ async fn periodic_bootstrap() { let peer_id2 = peer2.peer_id; - let peer2_added_to_dht_of_peer1 = - wait_for_event(&mut peer1.test_event_receiver, move |event| match event { - TestEvent::PeerAddedToDHT { remote } if remote == peer_id2 => Some(()), - _ => None, - }); + let peer2_added_to_dht_of_peer1 = peer1.wait_for_test_event(move |e| match e { + TestEvent::PeerAddedToDHT { remote } if remote == peer_id2 => Some(()), + _ => None, + }); join(peer2_added_to_dht_of_peer1, async { - wait_for_event( - &mut peer2.test_event_receiver, - filter_kademlia_bootstrap_completed, - ) - .await; - wait_for_event( - &mut peer2.test_event_receiver, - filter_kademlia_bootstrap_completed, - ) - .await; + peer2 + .wait_for_test_event(filter_kademlia_bootstrap_completed) + .await; + peer2 + .wait_for_test_event(filter_kademlia_bootstrap_completed) + .await; }) .await; @@ -158,17 +154,19 @@ async fn reconnect_too_quickly() { .await .unwrap(); - wait_for_event(&mut peer1.test_event_receiver, |event| match event { - TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; - wait_for_event(&mut peer2.test_event_receiver, |event| match event { - TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), - _ => None, - }) - .await; + peer2 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), + _ => None, + }) + .await; let peers_of1: Vec<_> = peer1.connected().await.into_keys().collect(); let peers_of2: Vec<_> = peer2.connected().await.into_keys().collect(); @@ -179,17 +177,19 @@ async fn reconnect_too_quickly() { // Close the connection. peer1.client.disconnect(peer2.peer_id).await.unwrap(); - wait_for_event(&mut peer1.test_event_receiver, |event| match event { - TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; - wait_for_event(&mut peer2.test_event_receiver, |event| match event { - TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()), - _ => None, - }) - .await; + peer2 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()), + _ => None, + }) + .await; // Attempt to immediately reconnect. let result = peer1.client.dial(peer2.peer_id, addr2.clone()).await; @@ -201,17 +201,19 @@ async fn reconnect_too_quickly() { assert!(result.is_ok()); // The connection is established. - wait_for_event(&mut peer1.test_event_receiver, |event| match event { - TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; - wait_for_event(&mut peer2.test_event_receiver, |event| match event { - TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), - _ => None, - }) - .await; + peer2 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), + _ => None, + }) + .await; } /// Test that each peer accepts at most one connection from any other peer, and @@ -238,17 +240,19 @@ async fn duplicate_connection() { .await .unwrap(); - wait_for_event(&mut peer1.test_event_receiver, |event| match event { - TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; - wait_for_event(&mut peer2.test_event_receiver, |event| match event { - TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), - _ => None, - }) - .await; + peer2 + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), + _ => None, + }) + .await; // Ensure that the connection timeout has passed, so this is not the reason why // the connection is getting closed. @@ -262,17 +266,19 @@ async fn duplicate_connection() { .await .unwrap(); - wait_for_event(&mut peer1_copy.test_event_receiver, |event| match event { - TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1_copy + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; - wait_for_event(&mut peer1_copy.test_event_receiver, |event| match event { - TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1_copy + .wait_for_test_event(move |e| match e { + TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; assert!(peer1_copy.connected().await.is_empty()); assert!(peer1.connected().await.contains_key(&peer2.peer_id)); @@ -358,7 +364,7 @@ async fn outbound_peer_eviction() { assert!(peers.contains_key(&inbound1.peer_id)); // Ensure that outbound1 actually got disconnected. - wait_for_event(&mut peer.test_event_receiver, |event| match event { + peer.wait_for_test_event(move |e| match e { TestEvent::ConnectionClosed { remote, .. } if remote == outbound1.peer_id => Some(()), _ => None, }) @@ -436,16 +442,17 @@ async fn inbound_peer_eviction() { .unwrap(); // Ensure that a peer got disconnected. - let disconnected = wait_for_event(&mut peer.test_event_receiver, |event| match event { - TestEvent::ConnectionClosed { remote, .. } - if inbound_peers.iter().take(25).any(|p| p.peer_id == remote) => - { - Some(remote) - } - _ => None, - }) - .await - .unwrap(); + let disconnected = peer + .wait_for_test_event(|e| match e { + TestEvent::ConnectionClosed { remote, .. } + if inbound_peers.iter().take(25).any(|p| p.peer_id == remote) => + { + Some(remote) + } + _ => None, + }) + .await + .unwrap(); let connected = peer.connected().await; // 25 inbound and 1 outbound peer. @@ -488,11 +495,12 @@ async fn evicted_peer_reconnection() { peer1.client.dial(peer3.peer_id, addr3).await.unwrap(); // Check that peer2 got evicted. - wait_for_event(&mut peer1.test_event_receiver, |event| match event { - TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; + peer1 + .wait_for_test_event(|e| match e { + TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()), + _ => None, + }) + .await; // Mark peer3 as not useful, and hence a candidate for eviction. peer1.client.not_useful(peer3.peer_id).await; @@ -508,22 +516,24 @@ async fn evicted_peer_reconnection() { .dial(peer1.peer_id, addr1.clone()) .await .unwrap(); - wait_for_event(&mut peer2.test_event_receiver, |event| match event { - TestEvent::ConnectionClosed { remote, .. } if remote == peer1.peer_id => Some(()), - _ => None, - }) - .await; + peer2 + .wait_for_test_event(|e| match e { + TestEvent::ConnectionClosed { remote, .. } if remote == peer1.peer_id => Some(()), + _ => None, + }) + .await; // peer2 can be reconnected after a timeout. tokio::time::sleep(Duration::from_millis(500)).await; peer1.client.dial(peer2.peer_id, addr2).await.unwrap(); // peer3 gets evicted. - wait_for_event(&mut peer1.test_event_receiver, |event| match event { - TestEvent::ConnectionClosed { remote, .. } if remote == peer3.peer_id => Some(()), - _ => None, - }) - .await; + peer1 + .wait_for_test_event(|e| match e { + TestEvent::ConnectionClosed { remote, .. } if remote == peer3.peer_id => Some(()), + _ => None, + }) + .await; } /// Test that peers can only connect if they are whitelisted. diff --git a/crates/p2p/src/test_utils.rs b/crates/p2p/src/test_utils.rs index 4079ca103c..bf95122bd9 100644 --- a/crates/p2p/src/test_utils.rs +++ b/crates/p2p/src/test_utils.rs @@ -33,20 +33,3 @@ where rx } - -/// Wait for a specific event to happen. -pub(crate) async fn wait_for_event( - event_receiver: &mut mpsc::UnboundedReceiver, - mut f: impl FnMut(Event) -> Option, -) -> Option -where - Event: Send + 'static, - Data: Debug + Send + 'static, -{ - while let Some(event) = event_receiver.recv().await { - if let Some(data) = f(event) { - return Some(data); - } - } - None -} diff --git a/crates/p2p/src/test_utils/peer.rs b/crates/p2p/src/test_utils/peer.rs index 21bab5abf1..37cf7374ce 100644 --- a/crates/p2p/src/test_utils/peer.rs +++ b/crates/p2p/src/test_utils/peer.rs @@ -132,6 +132,23 @@ impl TestPeer { pub fn with_keypair(keypair: Keypair, cfg: Config) -> Self { Self::builder().keypair(keypair).build(cfg) } + + /// Wait for a specific test event to happen. Extract data from the event + /// using the provided function `f`. + pub async fn wait_for_test_event( + &mut self, + mut f: impl FnMut(TestEvent) -> Option, + ) -> Option + where + Data: Debug + Send + 'static, + { + while let Some(event) = self.test_event_receiver.recv().await { + if let Some(data) = f(event) { + return Some(data); + } + } + None + } } impl TestPeer From 6950607c19ba5eaa34746d6049d24e868cdb7f4b Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Fri, 24 Apr 2026 23:15:31 +0200 Subject: [PATCH 4/4] test(p2p): remove filter_events --- crates/p2p/src/sync/tests.rs | 79 +++++++++++++++---------------- crates/p2p/src/test_utils.rs | 31 ------------ crates/p2p/src/test_utils/peer.rs | 61 +++++++++++++++++------- 3 files changed, 81 insertions(+), 90 deletions(-) diff --git a/crates/p2p/src/sync/tests.rs b/crates/p2p/src/sync/tests.rs index 02897ed09f..d68e2cdfa6 100644 --- a/crates/p2p/src/sync/tests.rs +++ b/crates/p2p/src/sync/tests.rs @@ -11,7 +11,6 @@ use crate::sync::behaviour::Behaviour; use crate::sync::client::Client; use crate::sync::protocol::codec; use crate::sync::{Config, Event}; -use crate::test_utils::filter_events; use crate::test_utils::peer::{TestPeer, TestPeerBuilder}; type SyncTestPeer = TestPeer; @@ -66,15 +65,24 @@ mod successful_sync { #[case::client_to_server(client_to_server().await)] #[test_log::test(tokio::test)] async fn $test_name(#[case] peers: (SyncTestPeer, SyncTestPeer)) { - let (peer1, peer2) = peers; + let (mut peer1, peer2) = peers; // Fake some request for peer2 to send to peer1 let expected_request = Faker.fake::<$req_type>(); + // Peer2 sends the request to peer1, and waits for the response receiver + let mut rx = Client::from(peer2.client.as_pair()) + .$req_fn(peer1.peer_id, expected_request) + .await + .expect(&format!( + "sending request using: {}, line: {}", + std::stringify!($req_fn), + line!() + )); + // Filter peer1's events to fish out the request from peer2 and the channel that // peer1 will use to send the responses - // This is also to keep peer1's event loop going - let mut tx_ready = - filter_events(peer1.app_event_receiver, move |event| match event { + let mut tx = peer1 + .wait_for_event(|e| match e { Event::$event_variant { from, channel, @@ -87,24 +95,13 @@ mod successful_sync { Some(channel) } _ => None, - }); - - // Peer2 sends the request to peer1, and waits for the response receiver - let mut rx = Client::from(peer2.client.as_pair()) - .$req_fn(peer1.peer_id, expected_request) + }) .await .expect(&format!( - "sending request using: {}, line: {}", - std::stringify!($req_fn), + "waiting for response channel to be ready, line: {}", line!() )); - // Peer1 waits for response channel to be ready - let mut tx = tx_ready.recv().await.expect(&format!( - "waiting for response channel to be ready, line: {}", - line!() - )); - // Peer1 sends a random number of responses to Peer2 for _ in 0usize..(1..100).fake() { let expected_response = Faker.fake::<$res_type>(); @@ -278,29 +275,11 @@ mod propagate_codec_errors_to_caller { #[case::client_to_server(client_to_bad_server($bad_codec).await)] #[test_log::test(tokio::test)] async fn $test_name(#[case] peers: (SyncTestPeer, SyncTestPeer)) { - let (peer1, peer2) = peers; + let (mut peer1, peer2) = peers; // Fake some request for peer2 to send to peer1 let expected_request = Faker.fake::<$req_type>(); - // Filter peer1's events to fish out the request from peer2 and the channel that - // peer1 will use to send the responses - // This is also to keep peer1's event loop going - let mut tx_ready = filter_events(peer1.app_event_receiver, move |event| match event { - Event::$event_variant { - from, - channel, - request: actual_request, - } => { - // Peer 1 should receive the request from peer2 - assert_eq!(from, peer2.peer_id); - // Received request should match what peer2 sent - assert_eq!(expected_request, actual_request); - Some(channel) - } - _ => None, - }); - // Peer2 sends the request to peer1, and waits for the response receiver let mut rx = Client::from(peer2.client.as_pair()) .$req_fn(peer1.peer_id, expected_request) @@ -313,13 +292,29 @@ mod propagate_codec_errors_to_caller { ) }); - // Peer1 waits for response channel to be ready - let mut tx = tx_ready.recv().await.unwrap_or_else(|| { - panic!( + // Filter peer1's events to fish out the request from peer2 and the channel that + // peer1 will use to send the responses + let mut tx = peer1 + .wait_for_event(|e| match e { + Event::$event_variant { + from, + channel, + request: actual_request, + } => { + // Peer 1 should receive the request from peer2 + assert_eq!(from, peer2.peer_id); + // Received request should match what peer2 sent + assert_eq!(expected_request, actual_request); + Some(channel) + } + _ => None, + }) + .await + .expect(&format!( "waiting for response channel to be ready, line: {}", line!() - ) - }); + )); + let expected_response = Faker.fake::<$res_type>(); // Peer1 sends 1 response, but peer2's codec is mocked to fail upon reception diff --git a/crates/p2p/src/test_utils.rs b/crates/p2p/src/test_utils.rs index bf95122bd9..7c89f18cb1 100644 --- a/crates/p2p/src/test_utils.rs +++ b/crates/p2p/src/test_utils.rs @@ -2,34 +2,3 @@ pub mod core; pub mod main_loop; pub mod peer; pub mod sync; - -use std::fmt::Debug; - -use tokio::sync::mpsc; - -/// [`MainLoop`](p2p::MainLoop)'s event channel size is 1, so we need to consume -/// all events as soon as they're sent otherwise the main loop will stall. -/// `f` should return `Some(data)` where `data` is extracted from -/// the event type of interest. For other events that should be ignored -/// `f` should return `None`. This function returns a receiver to the filtered -/// events' data channel. -pub(crate) fn filter_events( - mut event_receiver: mpsc::UnboundedReceiver, - f: impl FnOnce(Event) -> Option + Copy + Send + 'static, -) -> mpsc::Receiver -where - Event: Send + 'static, - Data: Debug + Send + 'static, -{ - let (tx, rx) = mpsc::channel(1000); - - tokio::spawn(async move { - while let Some(event) = event_receiver.recv().await { - if let Some(data) = f(event) { - tx.try_send(data).unwrap(); - } - } - }); - - rx -} diff --git a/crates/p2p/src/test_utils/peer.rs b/crates/p2p/src/test_utils/peer.rs index 37cf7374ce..4264113eec 100644 --- a/crates/p2p/src/test_utils/peer.rs +++ b/crates/p2p/src/test_utils/peer.rs @@ -132,23 +132,6 @@ impl TestPeer { pub fn with_keypair(keypair: Keypair, cfg: Config) -> Self { Self::builder().keypair(keypair).build(cfg) } - - /// Wait for a specific test event to happen. Extract data from the event - /// using the provided function `f`. - pub async fn wait_for_test_event( - &mut self, - mut f: impl FnMut(TestEvent) -> Option, - ) -> Option - where - Data: Debug + Send + 'static, - { - while let Some(event) = self.test_event_receiver.recv().await { - if let Some(data) = f(event) { - return Some(data); - } - } - None - } } impl TestPeer @@ -192,6 +175,50 @@ where pub async fn connected(&self) -> HashMap { self.client.for_test().get_connected_peers().await } + + /// Wait for a specific test event to happen. Extract data from the event + /// using the provided function `f`. + pub async fn wait_for_event( + &mut self, + f: impl FnMut(::Event) -> Option, + ) -> Option + where + Data: Debug + Send + 'static, + { + Self::wait_for_event_impl::<::Event, Data>( + &mut self.app_event_receiver, + f, + ) + .await + } + + /// Wait for a specific test event to happen. Extract data from the event + /// using the provided function `f`. + pub async fn wait_for_test_event( + &mut self, + f: impl FnMut(TestEvent) -> Option, + ) -> Option + where + Data: Debug + Send + 'static, + { + Self::wait_for_event_impl::(&mut self.test_event_receiver, f).await + } + + async fn wait_for_event_impl( + receiver: &mut mpsc::UnboundedReceiver, + mut f: impl FnMut(Event) -> Option, + ) -> Option + where + Event: Send + 'static, + Data: Debug + Send + 'static, + { + while let Some(event) = receiver.recv().await { + if let Some(data) = f(event) { + return Some(data); + } + } + None + } } impl Default for TestPeer {