diff --git a/crates/p2p/src/builder.rs b/crates/p2p/src/builder.rs index 8a78b7d923..1ddc0bdb05 100644 --- a/crates/p2p/src/builder.rs +++ b/crates/p2p/src/builder.rs @@ -132,6 +132,7 @@ impl Builder { impl ApplicationBehaviour for dummy::Behaviour { type Command = (); type Event = (); + type TestEvent = (); type State = (); async fn handle_command(&mut self, _: Self::Command, _: &mut Self::State) {} @@ -140,6 +141,7 @@ impl ApplicationBehaviour for dummy::Behaviour { _: ::ToSwarm, _: &mut Self::State, _: mpsc::UnboundedSender, + _: mpsc::UnboundedSender, ) { } fn domain() -> &'static str { diff --git a/crates/p2p/src/consensus.rs b/crates/p2p/src/consensus.rs index da492a83b9..1e671d76f1 100644 --- a/crates/p2p/src/consensus.rs +++ b/crates/p2p/src/consensus.rs @@ -2,7 +2,7 @@ //! network. use std::collections::HashMap; -use libp2p::gossipsub::PublishError; +use libp2p::gossipsub::{PublishError, TopicHash}; use libp2p::PeerId; use p2p_proto::consensus::{ProposalPart, Vote}; use pathfinder_common::ContractAddress; @@ -93,6 +93,17 @@ impl EventKind { } } +#[derive(Debug, Clone)] +pub struct TestEvent { + pub source: PeerId, + pub kind: TestEventKind, +} + +#[derive(Debug, Clone)] +pub enum TestEventKind { + Subscribed(TopicHash), +} + /// The state of the consensus P2P network. #[derive(Default, Debug)] pub struct State { @@ -229,8 +240,8 @@ pub fn handle_incoming_proposal_message( #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; - use std::time::Duration; + use libp2p::gossipsub::Sha256Topic; use libp2p::identity::Keypair; use p2p_proto::common::{Address, Hash, L1DataAvailabilityMode}; use p2p_proto::consensus::{ @@ -242,15 +253,40 @@ mod tests { VoteType, }; use p2p_proto::transaction::L1HandlerV0; - use pathfinder_common::ChainId; use pathfinder_crypto::Felt; - use tokio::sync::mpsc; use super::*; use crate::consensus::{Command, EventKind}; - use crate::core::{self, Config}; - use crate::libp2p::Multiaddr; - use crate::{consensus, main_loop, new_consensus}; + use crate::core::Config; + use crate::test_utils::peer::{create_and_connect_pair, TestPeer, TestPeerBuilder}; + + type ConsensusTestPeer = TestPeer; + + fn create_peer() -> ConsensusTestPeer { + let keypair = Keypair::generate_ed25519(); + TestPeerBuilder::new() + .keypair(keypair.clone()) + .app_behaviour(Behaviour::new(keypair)) + .build(Config::for_test()) + } + + async fn create_peers() -> (ConsensusTestPeer, ConsensusTestPeer) { + create_and_connect_pair(create_peer).await + } + + async fn wait_for_subscribed( + peer: &mut ConsensusTestPeer, + expected_peer_id: PeerId, + topic: &str, + ) { + let topic_hash = Sha256Topic::new(topic).hash(); + peer.wait_for_app_test_event(|e| { + let TestEventKind::Subscribed(t) = e.kind; + (t == topic_hash && e.source == expected_peer_id).then_some(()) + }) + .await + .unwrap(); + } /// Tests creating an outgoing proposal message and updating the state. #[test] @@ -361,36 +397,13 @@ mod tests { /// streams and receives all proposals in the proper order. #[tokio::test] async fn test_proposal_stream() { - // Create two nodes with different identities - let (node1_client, _, node1_loop) = create_test_node().await; - let (node2_client, mut node2_events, node2_loop) = create_test_node().await; - - // Start the main loops - tokio::spawn(node1_loop.run()); - tokio::spawn(node2_loop.run()); - - // Start listening on node1 - let node1_addr = "/ip4/127.0.0.1/tcp/50003".parse::().unwrap(); - node1_client - .start_listening(node1_addr.clone()) - .await - .unwrap(); - - // Start listening on node2 - let node2_addr = "/ip4/127.0.0.1/tcp/50004".parse::().unwrap(); - node2_client - .start_listening(node2_addr.clone()) - .await - .unwrap(); - - // Dial node1 from node2 - node2_client - .dial(*node1_client.peer_id(), node1_addr.clone()) - .await - .unwrap(); - - // Wait for the nodes to connect - tokio::time::sleep(Duration::from_millis(500)).await; + // Create two connected nodes + let (mut server, mut client) = create_peers().await; + + // Wait for both peers to see each other subscribed to the proposals + // topic before sending — otherwise gossipsub would drop the messages. + wait_for_subscribed(&mut server, client.peer_id, TOPIC_PROPOSALS).await; + wait_for_subscribed(&mut client, server.peer_id, TOPIC_PROPOSALS).await; // Create a sequence of complete proposal streams let proposals = vec![ @@ -402,7 +415,8 @@ mod tests { // Send proposals for (height_round, proposal_stream) in &proposals { // Send the entire proposal stream with shuffle flag - node1_client + server + .client .send(Command::TestProposalStream( (*height_round).into(), proposal_stream.clone(), @@ -412,49 +426,51 @@ mod tests { .unwrap(); } - // Node 2 should receive all proposal streams + // Client should receive all proposal streams let mut received_proposals = HashMap::new(); let mut completed_proposals = HashSet::new(); while completed_proposals.len() < proposals.len() { - if let Some(Event { - kind: EventKind::Proposal(height_and_round, received_proposal), - .. - }) = node2_events.recv().await - { - // Get or create the vector for this height/round - let proposal_parts = received_proposals - .entry(height_and_round) - .or_insert_with(Vec::new); - - proposal_parts.push(received_proposal.clone()); - - // If we received a Fin message, verify the complete proposal - if let ProposalPart::Fin(_) = received_proposal { - // Find the matching proposal by height/round - let (_, expected_stream) = proposals - .iter() - .find(|((h, r), _)| { - *h == height_and_round.height() && *r == height_and_round.round() - }) - .expect("Received unknown proposal stream"); - - // Verify we have all parts and they match in order + let (height_and_round, received_proposal) = client + .wait_for_event(|e| match e.kind { + EventKind::Proposal(hnr, part) => Some((hnr, part)), + _ => None, + }) + .await + .unwrap(); + + // Get or create the vector for this height/round + let proposal_parts = received_proposals + .entry(height_and_round) + .or_insert_with(Vec::new); + + proposal_parts.push(received_proposal.clone()); + + // If we received a Fin message, verify the complete proposal + if let ProposalPart::Fin(_) = received_proposal { + // Find the matching proposal by height/round + let (_, expected_stream) = proposals + .iter() + .find(|((h, r), _)| { + *h == height_and_round.height() && *r == height_and_round.round() + }) + .expect("Received unknown proposal stream"); + + // Verify we have all parts and they match in order + assert_eq!( + proposal_parts.len(), + expected_stream.len(), + "Received wrong number of proposal parts" + ); + + for (received, expected) in proposal_parts.iter().zip(expected_stream.iter()) { assert_eq!( - proposal_parts.len(), - expected_stream.len(), - "Received wrong number of proposal parts" + received, expected, + "Proposal part content or order doesn't match" ); - - for (received, expected) in proposal_parts.iter().zip(expected_stream.iter()) { - assert_eq!( - received, expected, - "Proposal part content or order doesn't match" - ); - } - - completed_proposals.insert(height_and_round); } + + completed_proposals.insert(height_and_round); } } } @@ -466,36 +482,13 @@ mod tests { /// node correctly receives all vote messages. #[tokio::test] async fn test_vote_messages() { - // Create two nodes with different identities - let (node1_client, _, node1_loop) = create_test_node().await; - let (node2_client, mut node2_events, node2_loop) = create_test_node().await; - - // Start the main loops - tokio::spawn(node1_loop.run()); - tokio::spawn(node2_loop.run()); - - // Start listening on node1 - let node1_addr = "/ip4/127.0.0.1/tcp/50001".parse::().unwrap(); - node1_client - .start_listening(node1_addr.clone()) - .await - .unwrap(); - - // Start listening on node2 - let node2_addr = "/ip4/127.0.0.1/tcp/50002".parse::().unwrap(); - node2_client - .start_listening(node2_addr.clone()) - .await - .unwrap(); - - // Dial node1 from node2 - node2_client - .dial(*node1_client.peer_id(), node1_addr.clone()) - .await - .unwrap(); - - // Wait for the nodes to connect - tokio::time::sleep(Duration::from_millis(500)).await; + // Create two connected nodes + let (mut server, mut client) = create_peers().await; + + // Wait for both peers to see each other subscribed to the votes topic + // before sending — otherwise gossipsub would drop the messages. + wait_for_subscribed(&mut server, client.peer_id, TOPIC_VOTES).await; + wait_for_subscribed(&mut client, server.peer_id, TOPIC_VOTES).await; // Create a sequence of votes to send let votes = vec![ @@ -525,11 +518,12 @@ mod tests { }, ]; let mut rxs = Vec::new(); - // Send votes from node1 + // Send votes from the server for vote in &votes { let (tx, rx) = tokio::sync::mpsc::channel(1); rxs.push(rx); - node1_client + server + .client .send(Command::Vote { vote: vote.clone(), done_tx: tx, @@ -538,22 +532,24 @@ mod tests { .unwrap(); } - // Node 2 should receive all votes + // Client should receive all votes let mut received_votes = Vec::new(); let mut expected_votes = votes.clone(); while !expected_votes.is_empty() { - if let Some(Event { - kind: EventKind::Vote(received_vote), - .. - }) = node2_events.recv().await - { - received_votes.push(received_vote.clone()); - - // Find and remove the matching expected vote - if let Some(pos) = expected_votes.iter().position(|v| v == &received_vote) { - expected_votes.remove(pos); - } + let received_vote = client + .wait_for_event(|e| match e.kind { + EventKind::Vote(vote) => Some(vote), + _ => None, + }) + .await + .unwrap(); + + received_votes.push(received_vote.clone()); + + // Find and remove the matching expected vote + if let Some(pos) = expected_votes.iter().position(|v| v == &received_vote) { + expected_votes.remove(pos); } } @@ -573,18 +569,6 @@ mod tests { ); } - async fn create_test_node() -> ( - core::Client, - mpsc::UnboundedReceiver, - main_loop::MainLoop, - ) { - let keypair = Keypair::generate_ed25519(); - let core_config = Config::for_test(); - let chain_id = ChainId::MAINNET; - - new_consensus(keypair, core_config, chain_id) - } - fn create_proposal_stream( height: u64, round: u32, diff --git a/crates/p2p/src/consensus/behaviour.rs b/crates/p2p/src/consensus/behaviour.rs index 53daad071d..da1b4309c6 100644 --- a/crates/p2p/src/consensus/behaviour.rs +++ b/crates/p2p/src/consensus/behaviour.rs @@ -29,6 +29,7 @@ pub struct Behaviour { impl ApplicationBehaviour for Behaviour { type Command = consensus::Command; type Event = consensus::Event; + type TestEvent = consensus::TestEvent; type State = consensus::State; #[tracing::instrument(skip(self, state))] @@ -148,6 +149,7 @@ impl ApplicationBehaviour for Behaviour { event: BehaviourEvent, state: &mut Self::State, event_sender: mpsc::UnboundedSender, + test_event_sender: mpsc::UnboundedSender, ) { use gossipsub::Event::*; let BehaviourEvent::Gossipsub(e) = event; @@ -187,6 +189,17 @@ impl ApplicationBehaviour for Behaviour { } _ => {} }, + Subscribed { peer_id, topic } => { + tracing::debug!("Peer {} subscribed to topic {}", peer_id, topic); + + #[cfg(test)] + { + let _ = test_event_sender.send(consensus::TestEvent { + source: peer_id, + kind: consensus::TestEventKind::Subscribed(topic), + }); + } + } _ => { // TODO: Do we care about any other Gossipsub events? } diff --git a/crates/p2p/src/core/tests.rs b/crates/p2p/src/core/tests.rs index 16f8e77629..fe99f58a27 100644 --- a/crates/p2p/src/core/tests.rs +++ b/crates/p2p/src/core/tests.rs @@ -46,14 +46,14 @@ async fn disconnect() { peer2.client.disconnect(peer1.peer_id).await.unwrap(); peer1 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()), _ => None, }) .await; peer2 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()), _ => None, }) @@ -102,17 +102,17 @@ async fn periodic_bootstrap() { let peer_id2 = peer2.peer_id; - let peer2_added_to_dht_of_peer1 = peer1.wait_for_test_event(move |e| match e { + let peer2_added_to_dht_of_peer1 = peer1.wait_for_core_test_event(move |e| match e { TestEvent::PeerAddedToDHT { remote } if remote == peer_id2 => Some(()), _ => None, }); join(peer2_added_to_dht_of_peer1, async { peer2 - .wait_for_test_event(filter_kademlia_bootstrap_completed) + .wait_for_core_test_event(filter_kademlia_bootstrap_completed) .await; peer2 - .wait_for_test_event(filter_kademlia_bootstrap_completed) + .wait_for_core_test_event(filter_kademlia_bootstrap_completed) .await; }) .await; @@ -155,14 +155,14 @@ async fn reconnect_too_quickly() { .unwrap(); peer1 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), _ => None, }) .await; peer2 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), _ => None, }) @@ -178,14 +178,14 @@ async fn reconnect_too_quickly() { peer1.client.disconnect(peer2.peer_id).await.unwrap(); peer1 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()), _ => None, }) .await; peer2 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()), _ => None, }) @@ -202,14 +202,14 @@ async fn reconnect_too_quickly() { // The connection is established. peer1 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), _ => None, }) .await; peer2 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), _ => None, }) @@ -241,14 +241,14 @@ async fn duplicate_connection() { .unwrap(); peer1 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), _ => None, }) .await; peer2 - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()), _ => None, }) @@ -267,14 +267,14 @@ async fn duplicate_connection() { .unwrap(); peer1_copy - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()), _ => None, }) .await; peer1_copy - .wait_for_test_event(move |e| match e { + .wait_for_core_test_event(move |e| match e { TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()), _ => None, }) @@ -364,7 +364,7 @@ async fn outbound_peer_eviction() { assert!(peers.contains_key(&inbound1.peer_id)); // Ensure that outbound1 actually got disconnected. - peer.wait_for_test_event(move |e| match e { + peer.wait_for_core_test_event(move |e| match e { TestEvent::ConnectionClosed { remote, .. } if remote == outbound1.peer_id => Some(()), _ => None, }) @@ -443,7 +443,7 @@ async fn inbound_peer_eviction() { // Ensure that a peer got disconnected. let disconnected = peer - .wait_for_test_event(|e| match e { + .wait_for_core_test_event(|e| match e { TestEvent::ConnectionClosed { remote, .. } if inbound_peers.iter().take(25).any(|p| p.peer_id == remote) => { @@ -496,7 +496,7 @@ async fn evicted_peer_reconnection() { // Check that peer2 got evicted. peer1 - .wait_for_test_event(|e| match e { + .wait_for_core_test_event(|e| match e { TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()), _ => None, }) @@ -517,7 +517,7 @@ async fn evicted_peer_reconnection() { .await .unwrap(); peer2 - .wait_for_test_event(|e| match e { + .wait_for_core_test_event(|e| match e { TestEvent::ConnectionClosed { remote, .. } if remote == peer1.peer_id => Some(()), _ => None, }) @@ -529,7 +529,7 @@ async fn evicted_peer_reconnection() { // peer3 gets evicted. peer1 - .wait_for_test_event(|e| match e { + .wait_for_core_test_event(|e| match e { TestEvent::ConnectionClosed { remote, .. } if remote == peer3.peer_id => Some(()), _ => None, }) diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 10fa0a3ddd..876f6cc08f 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -88,8 +88,9 @@ mod builder_phase { /// interacts with the network: /// - Commands: Actions requested by the application to be executed by the /// network -/// - Events: Notifications from the network that the application needs to -/// handle +/// - Events: Application-specific notifications from the network that the +/// application needs to handle +/// - TestEvents: Application-specific events that are only emitted in tests /// - State: Data needed to track ongoing operations /// /// This trait is implemented by application-specific network behaviors (like @@ -99,6 +100,9 @@ pub trait ApplicationBehaviour: NetworkBehaviour { type Command: std::fmt::Debug; /// The type of events that the p2p network can emit to the outside world. type Event; + /// The type of events that the p2p network can emit to the outside world in + /// tests only. + type TestEvent; /// State needed to track pending network operations and their responses. type State; @@ -115,6 +119,7 @@ pub trait ApplicationBehaviour: NetworkBehaviour { event: ::ToSwarm, state: &mut Self::State, event_sender: mpsc::UnboundedSender, + test_event_sender: mpsc::UnboundedSender, ) -> impl Future + Send; /// Returns the domain string used for marker file creation in integration diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index 695342390f..3280da3656 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -49,8 +49,11 @@ where swarm: libp2p::swarm::Swarm>, /// Receives commands from the outside world. command_receiver: mpsc::UnboundedReceiver::Command>>, - /// Sends events to the outside world. - event_sender: mpsc::UnboundedSender<::Event>, + /// Sends application-specific events to the outside world. + app_event_sender: mpsc::UnboundedSender<::Event>, + /// Sends application-specific test events to the outside world, no-op in + /// production. + _app_test_event_sender: mpsc::UnboundedSender<::TestEvent>, /// Keeps track of pending dials and allows us to notify the caller when a /// dial succeeds or fails. pending_dials: PendingDials, @@ -61,8 +64,16 @@ where data_directory: PathBuf, /// State of the application behaviour. state: State, - _test_event_sender: mpsc::UnboundedSender, - _test_event_receiver: Option>, + /// Used to send [`crate::core::TestEvent`]-s during tests, no-op in + /// production. + _core_test_event_sender: mpsc::UnboundedSender, + /// Used to receive: + /// - [`crate::core::TestEvent`]-s during tests, no-op in production. + /// - Application-specific test events during tests, no-op in production. + _test_event_receivers: Option<( + mpsc::UnboundedReceiver, + mpsc::UnboundedReceiver<::TestEvent>, + )>, _pending_test_queries: TestQueries, /// We keep a single command sender instance at all times so that receiver /// can be polled even without any client instance available without @@ -98,19 +109,21 @@ where /// /// * `swarm` - The libp2p swarm, including the network behaviour for this /// loop. - /// * `event_sender` - The sender for events to the outside world. + /// * `app_event_sender` - The sender for application-specific events to the + /// outside world. /// * `data_directory` - The data directory for Pathfinder. pub fn new( swarm: libp2p::swarm::Swarm>, - event_sender: mpsc::UnboundedSender<::Event>, + app_event_sender: mpsc::UnboundedSender<::Event>, data_directory: PathBuf, ) -> ( Self, mpsc::UnboundedSender::Command>>, ) { - // Test event buffer is not used outside tests, so it is safe to make it - // unbounded as it will never contain any items in production. - let (_test_event_sender, rx) = mpsc::unbounded_channel(); + // Test event buffers are not used outside tests, so it is safe to make them + // unbounded as they will never contain any items in production. + let (_core_test_event_sender, core_rx) = mpsc::unbounded_channel(); + let (_app_test_event_sender, app_rx) = mpsc::unbounded_channel(); let (command_sender, command_receiver) = mpsc::unbounded_channel(); @@ -118,13 +131,14 @@ where Self { swarm, command_receiver, - event_sender, + app_event_sender, + _app_test_event_sender, pending_dials: Default::default(), pending_queries: Default::default(), state: Default::default(), data_directory, - _test_event_sender, - _test_event_receiver: Some(rx), + _core_test_event_sender, + _test_event_receivers: Some((core_rx, app_rx)), _pending_test_queries: Default::default(), _command_sender: command_sender.clone(), }, @@ -207,7 +221,7 @@ where } send_test_event( - &self._test_event_sender, + &self._core_test_event_sender, TestEvent::ConnectionEstablished { outbound: endpoint.is_dialer(), remote: peer_id, @@ -249,7 +263,7 @@ where tracing::debug!(%peer_id, "Connection closed"); if num_established == 0 { send_test_event( - &self._test_event_sender, + &self._core_test_event_sender, TestEvent::ConnectionClosed { remote: peer_id }, ) .await; @@ -367,7 +381,7 @@ where } }; send_test_event( - &self._test_event_sender, + &self._core_test_event_sender, TestEvent::KademliaBootstrapCompleted(result), ) .await; @@ -407,7 +421,7 @@ where // https://github.com/libp2p/rust-libp2p/blob/d7beb55f672dce54017fa4b30f67ecb8d66b9810/protocols/kad/src/behaviour.rs#L1401). if step.count == NonZeroUsize::new(1).expect("1>0") { send_test_event( - &self._test_event_sender, + &self._core_test_event_sender, TestEvent::KademliaBootstrapStarted, ) .await; @@ -422,7 +436,7 @@ where } => { if is_new_peer { send_test_event( - &self._test_event_sender, + &self._core_test_event_sender, TestEvent::PeerAddedToDHT { remote: peer }, ) .await @@ -452,7 +466,8 @@ where .handle_event( application_event, &mut self.state, - self.event_sender.clone(), + self.app_event_sender.clone(), + self._app_test_event_sender.clone(), ) .await; } @@ -589,7 +604,7 @@ where /// No-op outside tests async fn handle_event_for_test(&mut self, _event: SwarmEvent>) { #[cfg(test)] - test_utils::main_loop::handle_event(&self._test_event_sender, _event).await + test_utils::main_loop::handle_event(&self._core_test_event_sender, _event).await } /// No-op outside tests @@ -608,7 +623,7 @@ where #[cfg(test)] test_utils::main_loop::query_completed( &mut self._pending_test_queries.inner, - &self._test_event_sender, + &self._core_test_event_sender, _id, _result, ) @@ -666,14 +681,19 @@ where } } +#[cfg(test)] impl MainLoop where B: ApplicationBehaviour, { - #[cfg(test)] - pub fn take_test_event_receiver(&mut self) -> mpsc::UnboundedReceiver { - Option::take(&mut self._test_event_receiver) - .expect("Test event receiver not to have been taken before") + pub fn take_test_event_receivers( + &mut self, + ) -> ( + mpsc::UnboundedReceiver, + mpsc::UnboundedReceiver<::TestEvent>, + ) { + Option::take(&mut self._test_event_receivers) + .expect("Core test event receiver not to have been taken before") } } diff --git a/crates/p2p/src/preconfirmed.rs b/crates/p2p/src/preconfirmed.rs index 8af102c6fe..97a4fb13d1 100644 --- a/crates/p2p/src/preconfirmed.rs +++ b/crates/p2p/src/preconfirmed.rs @@ -1,5 +1,6 @@ //! Preconfirmed behaviour and other related utilities for the preconfirmed p2p //! network. +use libp2p::gossipsub::TopicHash; use libp2p::PeerId; #[cfg(test)] use tokio::sync::mpsc::Sender; @@ -41,7 +42,17 @@ pub enum EventKind { // TODO this is a placeholder /// A batch of preconfirmed transactions. PreconfirmedTransactionsPlaceholder, - Subscribed, +} + +#[derive(Debug, Clone)] +pub struct TestEvent { + pub source: PeerId, + pub kind: TestEventKind, +} + +#[derive(Debug, Clone)] +pub enum TestEventKind { + Subscribed(TopicHash), } // TODO this is a placeholder @@ -57,6 +68,7 @@ impl State { #[cfg(test)] mod tests { + use libp2p::gossipsub::Sha256Topic; use libp2p::identity; use super::*; @@ -75,8 +87,10 @@ mod tests { } async fn wait_for_subscribed(peer: &mut PcTestPeer, expected_peer_id: PeerId) { - peer.wait_for_event(|e| { - (matches!(e.kind, EventKind::Subscribed) && expected_peer_id == e.source).then_some(()) + let topic_hash = Sha256Topic::new(TOPIC_PRECONFIRMED_TRANSACTIONS).hash(); + peer.wait_for_app_test_event(|e| { + let TestEventKind::Subscribed(t) = e.kind; + (t == topic_hash && e.source == expected_peer_id).then_some(()) }) .await .unwrap(); diff --git a/crates/p2p/src/preconfirmed/behaviour.rs b/crates/p2p/src/preconfirmed/behaviour.rs index a507800da6..4ca13726ba 100644 --- a/crates/p2p/src/preconfirmed/behaviour.rs +++ b/crates/p2p/src/preconfirmed/behaviour.rs @@ -16,6 +16,7 @@ pub struct Behaviour { impl ApplicationBehaviour for Behaviour { type Command = preconfirmed::Command; type Event = preconfirmed::Event; + type TestEvent = preconfirmed::TestEvent; type State = preconfirmed::State; #[tracing::instrument(skip(self, _state))] @@ -49,6 +50,7 @@ impl ApplicationBehaviour for Behaviour { event: BehaviourEvent, state: &mut Self::State, event_sender: mpsc::UnboundedSender, + test_event_sender: mpsc::UnboundedSender, ) { use gossipsub::Event::*; let BehaviourEvent::Gossipsub(e) = event; @@ -77,11 +79,16 @@ impl ApplicationBehaviour for Behaviour { } _ => {} }, - Subscribed { peer_id, topic } if topic == topic_hash => { - let _ = event_sender.send(Self::Event { - source: peer_id, - kind: EventKind::Subscribed, - }); + Subscribed { peer_id, topic } => { + tracing::debug!("Peer {} subscribed to topic {}", peer_id, topic); + + #[cfg(test)] + { + let _ = test_event_sender.send(preconfirmed::TestEvent { + source: peer_id, + kind: preconfirmed::TestEventKind::Subscribed(topic), + }); + } } _ => { // TODO: Do we care about any other Gossipsub events? diff --git a/crates/p2p/src/preconfirmed/client.rs b/crates/p2p/src/preconfirmed/client.rs index 84a701ee38..a0ab172b12 100644 --- a/crates/p2p/src/preconfirmed/client.rs +++ b/crates/p2p/src/preconfirmed/client.rs @@ -13,10 +13,12 @@ pub struct Client { } impl From<(PeerId, mpsc::UnboundedSender>)> for Client { - fn from((peer_id, sender): (PeerId, mpsc::UnboundedSender>)) -> Self { + fn from( + (local_peer_id, _sender): (PeerId, mpsc::UnboundedSender>), + ) -> Self { Self { - _sender: sender, - local_peer_id: peer_id, + _sender, + local_peer_id, } } } diff --git a/crates/p2p/src/sync/behaviour.rs b/crates/p2p/src/sync/behaviour.rs index 467a886a29..5389cb119d 100644 --- a/crates/p2p/src/sync/behaviour.rs +++ b/crates/p2p/src/sync/behaviour.rs @@ -32,6 +32,7 @@ impl Behaviour { impl ApplicationBehaviour for Behaviour { type Command = sync::Command; type Event = sync::Event; + type TestEvent = (); type State = sync::State; async fn handle_command(&mut self, command: Self::Command, state: &mut Self::State) { @@ -96,6 +97,7 @@ impl ApplicationBehaviour for Behaviour { event: BehaviourEvent, state: &mut Self::State, event_sender: mpsc::UnboundedSender, + _: mpsc::UnboundedSender, ) { use p2p_stream::Event as P2PStreamEvent; match event { diff --git a/crates/p2p/src/test_utils/peer.rs b/crates/p2p/src/test_utils/peer.rs index 5def742aff..e45acf87a0 100644 --- a/crates/p2p/src/test_utils/peer.rs +++ b/crates/p2p/src/test_utils/peer.rs @@ -58,7 +58,8 @@ where pub peer_id: PeerId, pub client: Client<::Command>, pub app_event_receiver: mpsc::UnboundedReceiver<::Event>, - pub test_event_receiver: mpsc::UnboundedReceiver, + pub app_test_event_receiver: mpsc::UnboundedReceiver<::TestEvent>, + pub core_test_event_receiver: mpsc::UnboundedReceiver, pub main_loop_jh: JoinHandle<()>, } @@ -107,6 +108,7 @@ where ::ToSwarm: Debug, ::Command: Debug + Send, ::Event: Send, + ::TestEvent: Send, ::State: Default + Send, { pub fn build(self, cfg: Config) -> TestPeer { @@ -131,7 +133,8 @@ where .app_behaviour(app_behaviour.expect("App behaviour to be set in this phase")) .build(); - let test_event_receiver = main_loop.take_test_event_receiver(); + let (core_test_event_receiver, app_test_event_receiver) = + main_loop.take_test_event_receivers(); let main_loop_jh = tokio::spawn(main_loop.run()); TestPeer { @@ -139,7 +142,8 @@ where peer_id, client, app_event_receiver, - test_event_receiver, + app_test_event_receiver, + core_test_event_receiver, main_loop_jh, } } @@ -183,10 +187,11 @@ where .await .context("Start listening failed")?; - let event = tokio::time::timeout(Duration::from_secs(1), self.test_event_receiver.recv()) - .await - .context("Timedout while waiting for new listen address")? - .context("Event channel closed")?; + let event = + tokio::time::timeout(Duration::from_secs(1), self.core_test_event_receiver.recv()) + .await + .context("Timedout while waiting for new listen address")? + .context("Event channel closed")?; let addr = match event { TestEvent::NewListenAddress(addr) => addr, @@ -206,8 +211,8 @@ where self.client.for_test().get_connected_peers().await } - /// Wait for a specific test event to happen. Extract data from the event - /// using the provided function `f`. + /// Wait for a specific application-specific event to happen. Extract data + /// from the event using the provided function `f`. pub async fn wait_for_event( &mut self, f: impl FnMut(::Event) -> Option, @@ -222,16 +227,33 @@ where .await } - /// Wait for a specific test event to happen. Extract data from the event - /// using the provided function `f`. - pub async fn wait_for_test_event( + /// Wait for a specific core test event to happen. Extract data from the + /// event using the provided function `f`. + pub async fn wait_for_core_test_event( &mut self, f: impl FnMut(TestEvent) -> Option, ) -> Option where Data: Debug + Send + 'static, { - Self::wait_for_event_impl::(&mut self.test_event_receiver, f).await + Self::wait_for_event_impl::(&mut self.core_test_event_receiver, f).await + } + + /// Wait for a specific application-specific test event to happen. Extract + /// data from the event using the provided function `f`. + pub async fn wait_for_app_test_event( + &mut self, + f: impl FnMut(::TestEvent) -> Option, + ) -> Option + where + ::TestEvent: Send + 'static, + Data: Debug + Send + 'static, + { + Self::wait_for_event_impl::<::TestEvent, Data>( + &mut self.app_test_event_receiver, + f, + ) + .await } async fn wait_for_event_impl(