Skip to content

Commit 0c30eb0

Browse files
committed
refactor(p2p/consensus/tests): use TestPeer, remove sleeps
1 parent 1324bc4 commit 0c30eb0

4 files changed

Lines changed: 148 additions & 137 deletions

File tree

crates/p2p/src/consensus.rs

Lines changed: 102 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ pub fn handle_incoming_proposal_message(
240240
#[cfg(test)]
241241
mod tests {
242242
use std::collections::{HashMap, HashSet};
243-
use std::time::Duration;
244243

244+
use libp2p::gossipsub::Sha256Topic;
245245
use libp2p::identity::Keypair;
246246
use p2p_proto::common::{Address, Hash, L1DataAvailabilityMode};
247247
use p2p_proto::consensus::{
@@ -253,15 +253,40 @@ mod tests {
253253
VoteType,
254254
};
255255
use p2p_proto::transaction::L1HandlerV0;
256-
use pathfinder_common::ChainId;
257256
use pathfinder_crypto::Felt;
258-
use tokio::sync::mpsc;
259257

260258
use super::*;
261259
use crate::consensus::{Command, EventKind};
262-
use crate::core::{self, Config};
263-
use crate::libp2p::Multiaddr;
264-
use crate::{consensus, main_loop, new_consensus};
260+
use crate::core::Config;
261+
use crate::test_utils::peer::{create_and_connect_pair, TestPeer, TestPeerBuilder};
262+
263+
type ConsensusTestPeer = TestPeer<Behaviour>;
264+
265+
fn create_peer() -> ConsensusTestPeer {
266+
let keypair = Keypair::generate_ed25519();
267+
TestPeerBuilder::new()
268+
.keypair(keypair.clone())
269+
.app_behaviour(Behaviour::new(keypair))
270+
.build(Config::for_test())
271+
}
272+
273+
async fn create_peers() -> (ConsensusTestPeer, ConsensusTestPeer) {
274+
create_and_connect_pair(create_peer).await
275+
}
276+
277+
async fn wait_for_subscribed(
278+
peer: &mut ConsensusTestPeer,
279+
expected_peer_id: PeerId,
280+
topic: &str,
281+
) {
282+
let topic_hash = Sha256Topic::new(topic).hash();
283+
peer.wait_for_app_test_event(|e| {
284+
let TestEventKind::Subscribed(t) = e.kind;
285+
(t == topic_hash && e.source == expected_peer_id).then_some(())
286+
})
287+
.await
288+
.unwrap();
289+
}
265290

266291
/// Tests creating an outgoing proposal message and updating the state.
267292
#[test]
@@ -372,36 +397,13 @@ mod tests {
372397
/// streams and receives all proposals in the proper order.
373398
#[tokio::test]
374399
async fn test_proposal_stream() {
375-
// Create two nodes with different identities
376-
let (node1_client, _, node1_loop) = create_test_node().await;
377-
let (node2_client, mut node2_events, node2_loop) = create_test_node().await;
378-
379-
// Start the main loops
380-
tokio::spawn(node1_loop.run());
381-
tokio::spawn(node2_loop.run());
382-
383-
// Start listening on node1
384-
let node1_addr = "/ip4/127.0.0.1/tcp/50003".parse::<Multiaddr>().unwrap();
385-
node1_client
386-
.start_listening(node1_addr.clone())
387-
.await
388-
.unwrap();
389-
390-
// Start listening on node2
391-
let node2_addr = "/ip4/127.0.0.1/tcp/50004".parse::<Multiaddr>().unwrap();
392-
node2_client
393-
.start_listening(node2_addr.clone())
394-
.await
395-
.unwrap();
396-
397-
// Dial node1 from node2
398-
node2_client
399-
.dial(*node1_client.peer_id(), node1_addr.clone())
400-
.await
401-
.unwrap();
402-
403-
// Wait for the nodes to connect
404-
tokio::time::sleep(Duration::from_millis(500)).await;
400+
// Create two connected nodes
401+
let (mut server, mut client) = create_peers().await;
402+
403+
// Wait for both peers to see each other subscribed to the proposals
404+
// topic before sending — otherwise gossipsub would drop the messages.
405+
wait_for_subscribed(&mut server, client.peer_id, TOPIC_PROPOSALS).await;
406+
wait_for_subscribed(&mut client, server.peer_id, TOPIC_PROPOSALS).await;
405407

406408
// Create a sequence of complete proposal streams
407409
let proposals = vec![
@@ -413,7 +415,8 @@ mod tests {
413415
// Send proposals
414416
for (height_round, proposal_stream) in &proposals {
415417
// Send the entire proposal stream with shuffle flag
416-
node1_client
418+
server
419+
.client
417420
.send(Command::TestProposalStream(
418421
(*height_round).into(),
419422
proposal_stream.clone(),
@@ -423,49 +426,51 @@ mod tests {
423426
.unwrap();
424427
}
425428

426-
// Node 2 should receive all proposal streams
429+
// Client should receive all proposal streams
427430
let mut received_proposals = HashMap::new();
428431
let mut completed_proposals = HashSet::new();
429432

430433
while completed_proposals.len() < proposals.len() {
431-
if let Some(Event {
432-
kind: EventKind::Proposal(height_and_round, received_proposal),
433-
..
434-
}) = node2_events.recv().await
435-
{
436-
// Get or create the vector for this height/round
437-
let proposal_parts = received_proposals
438-
.entry(height_and_round)
439-
.or_insert_with(Vec::new);
440-
441-
proposal_parts.push(received_proposal.clone());
442-
443-
// If we received a Fin message, verify the complete proposal
444-
if let ProposalPart::Fin(_) = received_proposal {
445-
// Find the matching proposal by height/round
446-
let (_, expected_stream) = proposals
447-
.iter()
448-
.find(|((h, r), _)| {
449-
*h == height_and_round.height() && *r == height_and_round.round()
450-
})
451-
.expect("Received unknown proposal stream");
452-
453-
// Verify we have all parts and they match in order
434+
let (height_and_round, received_proposal) = client
435+
.wait_for_event(|e| match e.kind {
436+
EventKind::Proposal(hnr, part) => Some((hnr, part)),
437+
_ => None,
438+
})
439+
.await
440+
.unwrap();
441+
442+
// Get or create the vector for this height/round
443+
let proposal_parts = received_proposals
444+
.entry(height_and_round)
445+
.or_insert_with(Vec::new);
446+
447+
proposal_parts.push(received_proposal.clone());
448+
449+
// If we received a Fin message, verify the complete proposal
450+
if let ProposalPart::Fin(_) = received_proposal {
451+
// Find the matching proposal by height/round
452+
let (_, expected_stream) = proposals
453+
.iter()
454+
.find(|((h, r), _)| {
455+
*h == height_and_round.height() && *r == height_and_round.round()
456+
})
457+
.expect("Received unknown proposal stream");
458+
459+
// Verify we have all parts and they match in order
460+
assert_eq!(
461+
proposal_parts.len(),
462+
expected_stream.len(),
463+
"Received wrong number of proposal parts"
464+
);
465+
466+
for (received, expected) in proposal_parts.iter().zip(expected_stream.iter()) {
454467
assert_eq!(
455-
proposal_parts.len(),
456-
expected_stream.len(),
457-
"Received wrong number of proposal parts"
468+
received, expected,
469+
"Proposal part content or order doesn't match"
458470
);
459-
460-
for (received, expected) in proposal_parts.iter().zip(expected_stream.iter()) {
461-
assert_eq!(
462-
received, expected,
463-
"Proposal part content or order doesn't match"
464-
);
465-
}
466-
467-
completed_proposals.insert(height_and_round);
468471
}
472+
473+
completed_proposals.insert(height_and_round);
469474
}
470475
}
471476
}
@@ -477,36 +482,13 @@ mod tests {
477482
/// node correctly receives all vote messages.
478483
#[tokio::test]
479484
async fn test_vote_messages() {
480-
// Create two nodes with different identities
481-
let (node1_client, _, node1_loop) = create_test_node().await;
482-
let (node2_client, mut node2_events, node2_loop) = create_test_node().await;
483-
484-
// Start the main loops
485-
tokio::spawn(node1_loop.run());
486-
tokio::spawn(node2_loop.run());
487-
488-
// Start listening on node1
489-
let node1_addr = "/ip4/127.0.0.1/tcp/50001".parse::<Multiaddr>().unwrap();
490-
node1_client
491-
.start_listening(node1_addr.clone())
492-
.await
493-
.unwrap();
494-
495-
// Start listening on node2
496-
let node2_addr = "/ip4/127.0.0.1/tcp/50002".parse::<Multiaddr>().unwrap();
497-
node2_client
498-
.start_listening(node2_addr.clone())
499-
.await
500-
.unwrap();
501-
502-
// Dial node1 from node2
503-
node2_client
504-
.dial(*node1_client.peer_id(), node1_addr.clone())
505-
.await
506-
.unwrap();
507-
508-
// Wait for the nodes to connect
509-
tokio::time::sleep(Duration::from_millis(500)).await;
485+
// Create two connected nodes
486+
let (mut server, mut client) = create_peers().await;
487+
488+
// Wait for both peers to see each other subscribed to the votes topic
489+
// before sending — otherwise gossipsub would drop the messages.
490+
wait_for_subscribed(&mut server, client.peer_id, TOPIC_VOTES).await;
491+
wait_for_subscribed(&mut client, server.peer_id, TOPIC_VOTES).await;
510492

511493
// Create a sequence of votes to send
512494
let votes = vec![
@@ -536,11 +518,12 @@ mod tests {
536518
},
537519
];
538520
let mut rxs = Vec::new();
539-
// Send votes from node1
521+
// Send votes from the server
540522
for vote in &votes {
541523
let (tx, rx) = tokio::sync::mpsc::channel(1);
542524
rxs.push(rx);
543-
node1_client
525+
server
526+
.client
544527
.send(Command::Vote {
545528
vote: vote.clone(),
546529
done_tx: tx,
@@ -549,22 +532,24 @@ mod tests {
549532
.unwrap();
550533
}
551534

552-
// Node 2 should receive all votes
535+
// Client should receive all votes
553536
let mut received_votes = Vec::new();
554537
let mut expected_votes = votes.clone();
555538

556539
while !expected_votes.is_empty() {
557-
if let Some(Event {
558-
kind: EventKind::Vote(received_vote),
559-
..
560-
}) = node2_events.recv().await
561-
{
562-
received_votes.push(received_vote.clone());
563-
564-
// Find and remove the matching expected vote
565-
if let Some(pos) = expected_votes.iter().position(|v| v == &received_vote) {
566-
expected_votes.remove(pos);
567-
}
540+
let received_vote = client
541+
.wait_for_event(|e| match e.kind {
542+
EventKind::Vote(vote) => Some(vote),
543+
_ => None,
544+
})
545+
.await
546+
.unwrap();
547+
548+
received_votes.push(received_vote.clone());
549+
550+
// Find and remove the matching expected vote
551+
if let Some(pos) = expected_votes.iter().position(|v| v == &received_vote) {
552+
expected_votes.remove(pos);
568553
}
569554
}
570555

@@ -584,18 +569,6 @@ mod tests {
584569
);
585570
}
586571

587-
async fn create_test_node() -> (
588-
core::Client<consensus::Command>,
589-
mpsc::UnboundedReceiver<consensus::Event>,
590-
main_loop::MainLoop<consensus::Behaviour>,
591-
) {
592-
let keypair = Keypair::generate_ed25519();
593-
let core_config = Config::for_test();
594-
let chain_id = ChainId::MAINNET;
595-
596-
new_consensus(keypair, core_config, chain_id)
597-
}
598-
599572
fn create_proposal_stream(
600573
height: u64,
601574
round: u32,

crates/p2p/src/preconfirmed.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Preconfirmed behaviour and other related utilities for the preconfirmed p2p
22
//! network.
3+
use libp2p::gossipsub::TopicHash;
34
use libp2p::PeerId;
45
#[cfg(test)]
56
use tokio::sync::mpsc::Sender;
@@ -41,7 +42,17 @@ pub enum EventKind {
4142
// TODO this is a placeholder
4243
/// A batch of preconfirmed transactions.
4344
PreconfirmedTransactionsPlaceholder,
44-
Subscribed,
45+
}
46+
47+
#[derive(Debug, Clone)]
48+
pub struct TestEvent {
49+
pub source: PeerId,
50+
pub kind: TestEventKind,
51+
}
52+
53+
#[derive(Debug, Clone)]
54+
pub enum TestEventKind {
55+
Subscribed(TopicHash),
4556
}
4657

4758
// TODO this is a placeholder
@@ -57,6 +68,7 @@ impl State {
5768

5869
#[cfg(test)]
5970
mod tests {
71+
use libp2p::gossipsub::Sha256Topic;
6072
use libp2p::identity;
6173

6274
use super::*;
@@ -75,8 +87,10 @@ mod tests {
7587
}
7688

7789
async fn wait_for_subscribed(peer: &mut PcTestPeer, expected_peer_id: PeerId) {
78-
peer.wait_for_event(|e| {
79-
(matches!(e.kind, EventKind::Subscribed) && expected_peer_id == e.source).then_some(())
90+
let topic_hash = Sha256Topic::new(TOPIC_PRECONFIRMED_TRANSACTIONS).hash();
91+
peer.wait_for_app_test_event(|e| {
92+
let TestEventKind::Subscribed(t) = e.kind;
93+
(t == topic_hash && e.source == expected_peer_id).then_some(())
8094
})
8195
.await
8296
.unwrap();

crates/p2p/src/preconfirmed/behaviour.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub struct Behaviour {
1616
impl ApplicationBehaviour for Behaviour {
1717
type Command = preconfirmed::Command;
1818
type Event = preconfirmed::Event;
19+
type TestEvent = preconfirmed::TestEvent;
1920
type State = preconfirmed::State;
2021

2122
#[tracing::instrument(skip(self, _state))]
@@ -49,6 +50,7 @@ impl ApplicationBehaviour for Behaviour {
4950
event: BehaviourEvent,
5051
state: &mut Self::State,
5152
event_sender: mpsc::UnboundedSender<Self::Event>,
53+
test_event_sender: mpsc::UnboundedSender<Self::TestEvent>,
5254
) {
5355
use gossipsub::Event::*;
5456
let BehaviourEvent::Gossipsub(e) = event;
@@ -77,11 +79,16 @@ impl ApplicationBehaviour for Behaviour {
7779
}
7880
_ => {}
7981
},
80-
Subscribed { peer_id, topic } if topic == topic_hash => {
81-
let _ = event_sender.send(Self::Event {
82-
source: peer_id,
83-
kind: EventKind::Subscribed,
84-
});
82+
Subscribed { peer_id, topic } => {
83+
tracing::debug!("Peer {} subscribed to topic {}", peer_id, topic);
84+
85+
#[cfg(test)]
86+
{
87+
let _ = test_event_sender.send(preconfirmed::TestEvent {
88+
source: peer_id,
89+
kind: preconfirmed::TestEventKind::Subscribed(topic),
90+
});
91+
}
8592
}
8693
_ => {
8794
// TODO: Do we care about any other Gossipsub events?

0 commit comments

Comments
 (0)