Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/p2p/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl<B> Builder<B, AppBehaviourSet> {
impl ApplicationBehaviour for dummy::Behaviour {
type Command = ();
type Event = ();
type TestEvent = ();
type State = ();

async fn handle_command(&mut self, _: Self::Command, _: &mut Self::State) {}
Expand All @@ -140,6 +141,7 @@ impl ApplicationBehaviour for dummy::Behaviour {
_: <Self as NetworkBehaviour>::ToSwarm,
_: &mut Self::State,
_: mpsc::UnboundedSender<Self::Event>,
_: mpsc::UnboundedSender<Self::TestEvent>,
) {
}
fn domain() -> &'static str {
Expand Down
244 changes: 114 additions & 130 deletions crates/p2p/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! network.
use std::collections::HashMap;

use libp2p::gossipsub::PublishError;
use libp2p::gossipsub::{PublishError, TopicHash};
use libp2p::PeerId;
use p2p_proto::consensus::{ProposalPart, Vote};
use pathfinder_common::ContractAddress;
Expand Down Expand Up @@ -93,6 +93,17 @@ impl EventKind {
}
}

#[derive(Debug, Clone)]
pub struct TestEvent {
pub source: PeerId,
pub kind: TestEventKind,
}

#[derive(Debug, Clone)]
pub enum TestEventKind {
Subscribed(TopicHash),
}

/// The state of the consensus P2P network.
#[derive(Default, Debug)]
pub struct State {
Expand Down Expand Up @@ -229,8 +240,8 @@ pub fn handle_incoming_proposal_message(
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use libp2p::gossipsub::Sha256Topic;
use libp2p::identity::Keypair;
use p2p_proto::common::{Address, Hash, L1DataAvailabilityMode};
use p2p_proto::consensus::{
Expand All @@ -242,15 +253,40 @@ mod tests {
VoteType,
};
use p2p_proto::transaction::L1HandlerV0;
use pathfinder_common::ChainId;
use pathfinder_crypto::Felt;
use tokio::sync::mpsc;

use super::*;
use crate::consensus::{Command, EventKind};
use crate::core::{self, Config};
use crate::libp2p::Multiaddr;
use crate::{consensus, main_loop, new_consensus};
use crate::core::Config;
use crate::test_utils::peer::{create_and_connect_pair, TestPeer, TestPeerBuilder};

type ConsensusTestPeer = TestPeer<Behaviour>;

fn create_peer() -> ConsensusTestPeer {
let keypair = Keypair::generate_ed25519();
TestPeerBuilder::new()
.keypair(keypair.clone())
.app_behaviour(Behaviour::new(keypair))
.build(Config::for_test())
}

async fn create_peers() -> (ConsensusTestPeer, ConsensusTestPeer) {
create_and_connect_pair(create_peer).await
}

async fn wait_for_subscribed(
peer: &mut ConsensusTestPeer,
expected_peer_id: PeerId,
topic: &str,
) {
let topic_hash = Sha256Topic::new(topic).hash();
peer.wait_for_app_test_event(|e| {
let TestEventKind::Subscribed(t) = e.kind;
(t == topic_hash && e.source == expected_peer_id).then_some(())
})
.await
.unwrap();
}

/// Tests creating an outgoing proposal message and updating the state.
#[test]
Expand Down Expand Up @@ -361,36 +397,13 @@ mod tests {
/// streams and receives all proposals in the proper order.
#[tokio::test]
async fn test_proposal_stream() {
// Create two nodes with different identities
let (node1_client, _, node1_loop) = create_test_node().await;
let (node2_client, mut node2_events, node2_loop) = create_test_node().await;

// Start the main loops
tokio::spawn(node1_loop.run());
tokio::spawn(node2_loop.run());

// Start listening on node1
let node1_addr = "/ip4/127.0.0.1/tcp/50003".parse::<Multiaddr>().unwrap();
node1_client
.start_listening(node1_addr.clone())
.await
.unwrap();

// Start listening on node2
let node2_addr = "/ip4/127.0.0.1/tcp/50004".parse::<Multiaddr>().unwrap();
node2_client
.start_listening(node2_addr.clone())
.await
.unwrap();

// Dial node1 from node2
node2_client
.dial(*node1_client.peer_id(), node1_addr.clone())
.await
.unwrap();

// Wait for the nodes to connect
tokio::time::sleep(Duration::from_millis(500)).await;
// Create two connected nodes
let (mut server, mut client) = create_peers().await;

// Wait for both peers to see each other subscribed to the proposals
// topic before sending — otherwise gossipsub would drop the messages.
wait_for_subscribed(&mut server, client.peer_id, TOPIC_PROPOSALS).await;
wait_for_subscribed(&mut client, server.peer_id, TOPIC_PROPOSALS).await;

// Create a sequence of complete proposal streams
let proposals = vec![
Expand All @@ -402,7 +415,8 @@ mod tests {
// Send proposals
for (height_round, proposal_stream) in &proposals {
// Send the entire proposal stream with shuffle flag
node1_client
server
.client
.send(Command::TestProposalStream(
(*height_round).into(),
proposal_stream.clone(),
Expand All @@ -412,49 +426,51 @@ mod tests {
.unwrap();
}

// Node 2 should receive all proposal streams
// Client should receive all proposal streams
let mut received_proposals = HashMap::new();
let mut completed_proposals = HashSet::new();

while completed_proposals.len() < proposals.len() {
if let Some(Event {
kind: EventKind::Proposal(height_and_round, received_proposal),
..
}) = node2_events.recv().await
{
// Get or create the vector for this height/round
let proposal_parts = received_proposals
.entry(height_and_round)
.or_insert_with(Vec::new);

proposal_parts.push(received_proposal.clone());

// If we received a Fin message, verify the complete proposal
if let ProposalPart::Fin(_) = received_proposal {
// Find the matching proposal by height/round
let (_, expected_stream) = proposals
.iter()
.find(|((h, r), _)| {
*h == height_and_round.height() && *r == height_and_round.round()
})
.expect("Received unknown proposal stream");

// Verify we have all parts and they match in order
let (height_and_round, received_proposal) = client
.wait_for_event(|e| match e.kind {
EventKind::Proposal(hnr, part) => Some((hnr, part)),
_ => None,
})
.await
.unwrap();

// Get or create the vector for this height/round
let proposal_parts = received_proposals
.entry(height_and_round)
.or_insert_with(Vec::new);

proposal_parts.push(received_proposal.clone());

// If we received a Fin message, verify the complete proposal
if let ProposalPart::Fin(_) = received_proposal {
// Find the matching proposal by height/round
let (_, expected_stream) = proposals
.iter()
.find(|((h, r), _)| {
*h == height_and_round.height() && *r == height_and_round.round()
})
.expect("Received unknown proposal stream");

// Verify we have all parts and they match in order
assert_eq!(
proposal_parts.len(),
expected_stream.len(),
"Received wrong number of proposal parts"
);

for (received, expected) in proposal_parts.iter().zip(expected_stream.iter()) {
assert_eq!(
proposal_parts.len(),
expected_stream.len(),
"Received wrong number of proposal parts"
received, expected,
"Proposal part content or order doesn't match"
);

for (received, expected) in proposal_parts.iter().zip(expected_stream.iter()) {
assert_eq!(
received, expected,
"Proposal part content or order doesn't match"
);
}

completed_proposals.insert(height_and_round);
}

completed_proposals.insert(height_and_round);
}
}
}
Expand All @@ -466,36 +482,13 @@ mod tests {
/// node correctly receives all vote messages.
#[tokio::test]
async fn test_vote_messages() {
// Create two nodes with different identities
let (node1_client, _, node1_loop) = create_test_node().await;
let (node2_client, mut node2_events, node2_loop) = create_test_node().await;

// Start the main loops
tokio::spawn(node1_loop.run());
tokio::spawn(node2_loop.run());

// Start listening on node1
let node1_addr = "/ip4/127.0.0.1/tcp/50001".parse::<Multiaddr>().unwrap();
node1_client
.start_listening(node1_addr.clone())
.await
.unwrap();

// Start listening on node2
let node2_addr = "/ip4/127.0.0.1/tcp/50002".parse::<Multiaddr>().unwrap();
node2_client
.start_listening(node2_addr.clone())
.await
.unwrap();

// Dial node1 from node2
node2_client
.dial(*node1_client.peer_id(), node1_addr.clone())
.await
.unwrap();

// Wait for the nodes to connect
tokio::time::sleep(Duration::from_millis(500)).await;
// Create two connected nodes
let (mut server, mut client) = create_peers().await;

// Wait for both peers to see each other subscribed to the votes topic
// before sending — otherwise gossipsub would drop the messages.
wait_for_subscribed(&mut server, client.peer_id, TOPIC_VOTES).await;
wait_for_subscribed(&mut client, server.peer_id, TOPIC_VOTES).await;

// Create a sequence of votes to send
let votes = vec![
Expand Down Expand Up @@ -525,11 +518,12 @@ mod tests {
},
];
let mut rxs = Vec::new();
// Send votes from node1
// Send votes from the server
for vote in &votes {
let (tx, rx) = tokio::sync::mpsc::channel(1);
rxs.push(rx);
node1_client
server
.client
.send(Command::Vote {
vote: vote.clone(),
done_tx: tx,
Expand All @@ -538,22 +532,24 @@ mod tests {
.unwrap();
}

// Node 2 should receive all votes
// Client should receive all votes
let mut received_votes = Vec::new();
let mut expected_votes = votes.clone();

while !expected_votes.is_empty() {
if let Some(Event {
kind: EventKind::Vote(received_vote),
..
}) = node2_events.recv().await
{
received_votes.push(received_vote.clone());

// Find and remove the matching expected vote
if let Some(pos) = expected_votes.iter().position(|v| v == &received_vote) {
expected_votes.remove(pos);
}
let received_vote = client
.wait_for_event(|e| match e.kind {
EventKind::Vote(vote) => Some(vote),
_ => None,
})
.await
.unwrap();

received_votes.push(received_vote.clone());

// Find and remove the matching expected vote
if let Some(pos) = expected_votes.iter().position(|v| v == &received_vote) {
expected_votes.remove(pos);
}
}

Expand All @@ -573,18 +569,6 @@ mod tests {
);
}

async fn create_test_node() -> (
core::Client<consensus::Command>,
mpsc::UnboundedReceiver<consensus::Event>,
main_loop::MainLoop<consensus::Behaviour>,
) {
let keypair = Keypair::generate_ed25519();
let core_config = Config::for_test();
let chain_id = ChainId::MAINNET;

new_consensus(keypair, core_config, chain_id)
}

fn create_proposal_stream(
height: u64,
round: u32,
Expand Down
13 changes: 13 additions & 0 deletions crates/p2p/src/consensus/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct Behaviour {
impl ApplicationBehaviour for Behaviour {
type Command = consensus::Command;
type Event = consensus::Event;
type TestEvent = consensus::TestEvent;
type State = consensus::State;

#[tracing::instrument(skip(self, state))]
Expand Down Expand Up @@ -148,6 +149,7 @@ impl ApplicationBehaviour for Behaviour {
event: BehaviourEvent,
state: &mut Self::State,
event_sender: mpsc::UnboundedSender<Self::Event>,
test_event_sender: mpsc::UnboundedSender<Self::TestEvent>,
) {
use gossipsub::Event::*;
let BehaviourEvent::Gossipsub(e) = event;
Expand Down Expand Up @@ -187,6 +189,17 @@ impl ApplicationBehaviour for Behaviour {
}
_ => {}
},
Subscribed { peer_id, topic } => {
tracing::debug!("Peer {} subscribed to topic {}", peer_id, topic);

#[cfg(test)]
{
let _ = test_event_sender.send(consensus::TestEvent {
source: peer_id,
kind: consensus::TestEventKind::Subscribed(topic),
});
}
}
_ => {
// TODO: Do we care about any other Gossipsub events?
}
Expand Down
Loading
Loading