Skip to content

Commit 727725e

Browse files
committed
feat(p2p/test_utils): add support for app specific test events
1 parent dcd00a3 commit 727725e

8 files changed

Lines changed: 118 additions & 60 deletions

File tree

crates/p2p/src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ impl<B> Builder<B, AppBehaviourSet> {
132132
impl ApplicationBehaviour for dummy::Behaviour {
133133
type Command = ();
134134
type Event = ();
135+
type TestEvent = ();
135136
type State = ();
136137

137138
async fn handle_command(&mut self, _: Self::Command, _: &mut Self::State) {}
@@ -140,6 +141,7 @@ impl ApplicationBehaviour for dummy::Behaviour {
140141
_: <Self as NetworkBehaviour>::ToSwarm,
141142
_: &mut Self::State,
142143
_: mpsc::UnboundedSender<Self::Event>,
144+
_: mpsc::UnboundedSender<Self::TestEvent>,
143145
) {
144146
}
145147
fn domain() -> &'static str {

crates/p2p/src/consensus.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! network.
33
use std::collections::HashMap;
44

5-
use libp2p::gossipsub::PublishError;
5+
use libp2p::gossipsub::{PublishError, TopicHash};
66
use libp2p::PeerId;
77
use p2p_proto::consensus::{ProposalPart, Vote};
88
use pathfinder_common::ContractAddress;
@@ -93,6 +93,17 @@ impl EventKind {
9393
}
9494
}
9595

96+
#[derive(Debug, Clone)]
97+
pub struct TestEvent {
98+
pub source: PeerId,
99+
pub kind: TestEventKind,
100+
}
101+
102+
#[derive(Debug, Clone)]
103+
pub enum TestEventKind {
104+
Subscribed(TopicHash),
105+
}
106+
96107
/// The state of the consensus P2P network.
97108
#[derive(Default, Debug)]
98109
pub struct State {

crates/p2p/src/consensus/behaviour.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub struct Behaviour {
2929
impl ApplicationBehaviour for Behaviour {
3030
type Command = consensus::Command;
3131
type Event = consensus::Event;
32+
type TestEvent = consensus::TestEvent;
3233
type State = consensus::State;
3334

3435
#[tracing::instrument(skip(self, state))]
@@ -148,6 +149,7 @@ impl ApplicationBehaviour for Behaviour {
148149
event: BehaviourEvent,
149150
state: &mut Self::State,
150151
event_sender: mpsc::UnboundedSender<Self::Event>,
152+
test_event_sender: mpsc::UnboundedSender<Self::TestEvent>,
151153
) {
152154
use gossipsub::Event::*;
153155
let BehaviourEvent::Gossipsub(e) = event;
@@ -187,6 +189,17 @@ impl ApplicationBehaviour for Behaviour {
187189
}
188190
_ => {}
189191
},
192+
Subscribed { peer_id, topic } => {
193+
tracing::debug!("Peer {} subscribed to topic {}", peer_id, topic);
194+
195+
#[cfg(test)]
196+
{
197+
let _ = test_event_sender.send(consensus::TestEvent {
198+
source: peer_id,
199+
kind: consensus::TestEventKind::Subscribed(topic),
200+
});
201+
}
202+
}
190203
_ => {
191204
// TODO: Do we care about any other Gossipsub events?
192205
}

crates/p2p/src/core/tests.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ async fn disconnect() {
4646
peer2.client.disconnect(peer1.peer_id).await.unwrap();
4747

4848
peer1
49-
.wait_for_test_event(move |e| match e {
49+
.wait_for_core_test_event(move |e| match e {
5050
TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()),
5151
_ => None,
5252
})
5353
.await;
5454

5555
peer2
56-
.wait_for_test_event(move |e| match e {
56+
.wait_for_core_test_event(move |e| match e {
5757
TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()),
5858
_ => None,
5959
})
@@ -102,17 +102,17 @@ async fn periodic_bootstrap() {
102102

103103
let peer_id2 = peer2.peer_id;
104104

105-
let peer2_added_to_dht_of_peer1 = peer1.wait_for_test_event(move |e| match e {
105+
let peer2_added_to_dht_of_peer1 = peer1.wait_for_core_test_event(move |e| match e {
106106
TestEvent::PeerAddedToDHT { remote } if remote == peer_id2 => Some(()),
107107
_ => None,
108108
});
109109

110110
join(peer2_added_to_dht_of_peer1, async {
111111
peer2
112-
.wait_for_test_event(filter_kademlia_bootstrap_completed)
112+
.wait_for_core_test_event(filter_kademlia_bootstrap_completed)
113113
.await;
114114
peer2
115-
.wait_for_test_event(filter_kademlia_bootstrap_completed)
115+
.wait_for_core_test_event(filter_kademlia_bootstrap_completed)
116116
.await;
117117
})
118118
.await;
@@ -155,14 +155,14 @@ async fn reconnect_too_quickly() {
155155
.unwrap();
156156

157157
peer1
158-
.wait_for_test_event(move |e| match e {
158+
.wait_for_core_test_event(move |e| match e {
159159
TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()),
160160
_ => None,
161161
})
162162
.await;
163163

164164
peer2
165-
.wait_for_test_event(move |e| match e {
165+
.wait_for_core_test_event(move |e| match e {
166166
TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()),
167167
_ => None,
168168
})
@@ -178,14 +178,14 @@ async fn reconnect_too_quickly() {
178178
peer1.client.disconnect(peer2.peer_id).await.unwrap();
179179

180180
peer1
181-
.wait_for_test_event(move |e| match e {
181+
.wait_for_core_test_event(move |e| match e {
182182
TestEvent::ConnectionClosed { remote } if remote == peer2.peer_id => Some(()),
183183
_ => None,
184184
})
185185
.await;
186186

187187
peer2
188-
.wait_for_test_event(move |e| match e {
188+
.wait_for_core_test_event(move |e| match e {
189189
TestEvent::ConnectionClosed { remote } if remote == peer1.peer_id => Some(()),
190190
_ => None,
191191
})
@@ -202,14 +202,14 @@ async fn reconnect_too_quickly() {
202202

203203
// The connection is established.
204204
peer1
205-
.wait_for_test_event(move |e| match e {
205+
.wait_for_core_test_event(move |e| match e {
206206
TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()),
207207
_ => None,
208208
})
209209
.await;
210210

211211
peer2
212-
.wait_for_test_event(move |e| match e {
212+
.wait_for_core_test_event(move |e| match e {
213213
TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()),
214214
_ => None,
215215
})
@@ -241,14 +241,14 @@ async fn duplicate_connection() {
241241
.unwrap();
242242

243243
peer1
244-
.wait_for_test_event(move |e| match e {
244+
.wait_for_core_test_event(move |e| match e {
245245
TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()),
246246
_ => None,
247247
})
248248
.await;
249249

250250
peer2
251-
.wait_for_test_event(move |e| match e {
251+
.wait_for_core_test_event(move |e| match e {
252252
TestEvent::ConnectionEstablished { remote, .. } if remote == peer1.peer_id => Some(()),
253253
_ => None,
254254
})
@@ -267,14 +267,14 @@ async fn duplicate_connection() {
267267
.unwrap();
268268

269269
peer1_copy
270-
.wait_for_test_event(move |e| match e {
270+
.wait_for_core_test_event(move |e| match e {
271271
TestEvent::ConnectionEstablished { remote, .. } if remote == peer2.peer_id => Some(()),
272272
_ => None,
273273
})
274274
.await;
275275

276276
peer1_copy
277-
.wait_for_test_event(move |e| match e {
277+
.wait_for_core_test_event(move |e| match e {
278278
TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()),
279279
_ => None,
280280
})
@@ -364,7 +364,7 @@ async fn outbound_peer_eviction() {
364364
assert!(peers.contains_key(&inbound1.peer_id));
365365

366366
// Ensure that outbound1 actually got disconnected.
367-
peer.wait_for_test_event(move |e| match e {
367+
peer.wait_for_core_test_event(move |e| match e {
368368
TestEvent::ConnectionClosed { remote, .. } if remote == outbound1.peer_id => Some(()),
369369
_ => None,
370370
})
@@ -443,7 +443,7 @@ async fn inbound_peer_eviction() {
443443

444444
// Ensure that a peer got disconnected.
445445
let disconnected = peer
446-
.wait_for_test_event(|e| match e {
446+
.wait_for_core_test_event(|e| match e {
447447
TestEvent::ConnectionClosed { remote, .. }
448448
if inbound_peers.iter().take(25).any(|p| p.peer_id == remote) =>
449449
{
@@ -496,7 +496,7 @@ async fn evicted_peer_reconnection() {
496496

497497
// Check that peer2 got evicted.
498498
peer1
499-
.wait_for_test_event(|e| match e {
499+
.wait_for_core_test_event(|e| match e {
500500
TestEvent::ConnectionClosed { remote, .. } if remote == peer2.peer_id => Some(()),
501501
_ => None,
502502
})
@@ -517,7 +517,7 @@ async fn evicted_peer_reconnection() {
517517
.await
518518
.unwrap();
519519
peer2
520-
.wait_for_test_event(|e| match e {
520+
.wait_for_core_test_event(|e| match e {
521521
TestEvent::ConnectionClosed { remote, .. } if remote == peer1.peer_id => Some(()),
522522
_ => None,
523523
})
@@ -529,7 +529,7 @@ async fn evicted_peer_reconnection() {
529529

530530
// peer3 gets evicted.
531531
peer1
532-
.wait_for_test_event(|e| match e {
532+
.wait_for_core_test_event(|e| match e {
533533
TestEvent::ConnectionClosed { remote, .. } if remote == peer3.peer_id => Some(()),
534534
_ => None,
535535
})

crates/p2p/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,9 @@ mod builder_phase {
8888
/// interacts with the network:
8989
/// - Commands: Actions requested by the application to be executed by the
9090
/// network
91-
/// - Events: Notifications from the network that the application needs to
92-
/// handle
91+
/// - Events: Application-specific notifications from the network that the
92+
/// application needs to handle
93+
/// - TestEvents: Application-specific events that are only emitted in tests
9394
/// - State: Data needed to track ongoing operations
9495
///
9596
/// This trait is implemented by application-specific network behaviors (like
@@ -99,6 +100,9 @@ pub trait ApplicationBehaviour: NetworkBehaviour {
99100
type Command: std::fmt::Debug;
100101
/// The type of events that the p2p network can emit to the outside world.
101102
type Event;
103+
/// The type of events that the p2p network can emit to the outside world in
104+
/// tests only.
105+
type TestEvent;
102106
/// State needed to track pending network operations and their responses.
103107
type State;
104108

@@ -115,6 +119,7 @@ pub trait ApplicationBehaviour: NetworkBehaviour {
115119
event: <Self as NetworkBehaviour>::ToSwarm,
116120
state: &mut Self::State,
117121
event_sender: mpsc::UnboundedSender<Self::Event>,
122+
test_event_sender: mpsc::UnboundedSender<Self::TestEvent>,
118123
) -> impl Future<Output = ()> + Send;
119124

120125
/// Returns the domain string used for marker file creation in integration

0 commit comments

Comments
 (0)