Skip to content

Commit ba6a082

Browse files
committed
feat(p2p): preconfirmed p2p event consumer
Adds a task that reads preconfirmed P2P events and updates a watched value based on those events. For now, since the event itself is a placeholder, the watched value is a simple counter. Adds the receiver part of the watched value to the `RpcContext`.
1 parent 649cbf6 commit ba6a082

4 files changed

Lines changed: 91 additions & 6 deletions

File tree

crates/pathfinder/src/bin/pathfinder/main.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ use pathfinder_ethereum::EthereumClient;
1717
use pathfinder_gas_price::{L1GasPriceConfig, L1GasPriceProvider};
1818
#[cfg(feature = "p2p")]
1919
use pathfinder_lib::consensus::ConsensusTaskHandles;
20+
#[cfg(feature = "p2p")]
21+
use pathfinder_lib::preconfirmed::PreconfirmedTaskHandles;
2022
use pathfinder_lib::state::{sync_gas_prices, L1GasPriceSyncConfig, SyncContext};
2123
use pathfinder_lib::ConsensusChannels;
2224
#[cfg(feature = "p2p")]
23-
use pathfinder_lib::{config, consensus, monitoring, p2p_network, state};
25+
use pathfinder_lib::{config, consensus, monitoring, p2p_network, preconfirmed, state};
2426
#[cfg(not(feature = "p2p"))]
2527
use pathfinder_lib::{config, monitoring, p2p_network, state};
2628
use pathfinder_rpc::context::{EthContractAddresses, WebsocketContext};
@@ -323,7 +325,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
323325
)
324326
.await;
325327

326-
let (preconfirmed_p2p_handle, _preconfirmed_p2p_client_and_event_rx) =
328+
let (preconfirmed_p2p_handle, preconfirmed_p2p_client_and_event_rx) =
327329
if config.preconfirmed_p2p.enable {
328330
p2p_network::preconfirmed::start(
329331
chain_id,
@@ -437,13 +439,41 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
437439
)
438440
};
439441

440-
let context = if let Some(consensus_info_watch) = consensus_channels
442+
#[cfg(feature = "p2p")]
443+
let (preconfirmed_p2p_event_processing_handle, preconfirmed_watch_rx) = {
444+
let handles = if config.preconfirmed_p2p.enable {
445+
match preconfirmed_p2p_client_and_event_rx {
446+
Some((event_rx, _)) => preconfirmed::start(event_rx),
447+
None => PreconfirmedTaskHandles::pending(),
448+
}
449+
} else {
450+
PreconfirmedTaskHandles::pending()
451+
};
452+
(
453+
handles.preconfirmed_p2p_event_processing_handle,
454+
handles.preconfirmed_watch,
455+
)
456+
};
457+
458+
#[cfg(not(feature = "p2p"))]
459+
let (preconfirmed_p2p_event_processing_handle, preconfirmed_watch_rx) = {
460+
let _ = preconfirmed_p2p_client_and_event_rx;
461+
(
462+
tokio::task::spawn(std::future::pending::<anyhow::Result<()>>()),
463+
None,
464+
)
465+
};
466+
467+
let context = match consensus_channels
441468
.as_ref()
442469
.map(|cc| cc.consensus_info_watch.clone())
443470
{
444-
context.with_consensus_info_watch(consensus_info_watch)
445-
} else {
446-
context
471+
Some(consensus_info_watch) => context.with_consensus_info_watch(consensus_info_watch),
472+
None => context,
473+
};
474+
let context = match preconfirmed_watch_rx {
475+
Some(preconfirmed_watch) => context.with_preconfirmed_watch(preconfirmed_watch),
476+
None => context,
447477
};
448478

449479
let default_version = match config.rpc_root_version {
@@ -528,6 +558,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
528558
result = consensus_p2p_handle => handle_critical_task_result("Consensus P2P network", result),
529559
result = preconfirmed_p2p_handle => handle_critical_task_result("Preconfirmed P2P network", result),
530560
result = consensus_p2p_event_processing_handle => handle_critical_task_result("Consensus P2P event processing", result),
561+
result = preconfirmed_p2p_event_processing_handle => handle_critical_task_result("Preconfirmed P2P event processing", result),
531562
result = consensus_engine_handle => handle_critical_task_result("Consensus engine", result),
532563
result = http_client_refresh_handle => handle_critical_task_result("HTTP client refresh", result),
533564
_ = term_signal.recv() => {

crates/pathfinder/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ pub mod consensus;
77
pub mod devnet;
88
pub mod monitoring;
99
pub mod p2p_network;
10+
#[cfg(feature = "p2p")]
11+
pub mod preconfirmed;
1012
pub mod state;
1113
pub mod sync;
1214
pub enum SyncMessageToConsensus {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use p2p::preconfirmed::Event;
2+
use tokio::sync::{mpsc, watch};
3+
4+
pub type PreconfirmedP2PEventProcessingTaskHandle = tokio::task::JoinHandle<anyhow::Result<()>>;
5+
6+
pub struct PreconfirmedTaskHandles {
7+
pub preconfirmed_p2p_event_processing_handle: PreconfirmedP2PEventProcessingTaskHandle,
8+
// Placeholder watched type, should be the preconfirmed block.
9+
pub preconfirmed_watch: Option<watch::Receiver<u32>>,
10+
}
11+
12+
impl PreconfirmedTaskHandles {
13+
pub fn pending() -> Self {
14+
Self {
15+
preconfirmed_p2p_event_processing_handle: tokio::task::spawn(std::future::pending()),
16+
preconfirmed_watch: None,
17+
}
18+
}
19+
}
20+
21+
pub fn start(p2p_event_rx: mpsc::UnboundedReceiver<Event>) -> PreconfirmedTaskHandles {
22+
let (watch_tx, watch_rx) = watch::channel(0);
23+
let jh = util::task::spawn(process_preconfirmed_p2p_events(p2p_event_rx, watch_tx));
24+
PreconfirmedTaskHandles {
25+
preconfirmed_p2p_event_processing_handle: jh,
26+
preconfirmed_watch: Some(watch_rx),
27+
}
28+
}
29+
30+
async fn process_preconfirmed_p2p_events(
31+
mut p2p_event_rx: mpsc::UnboundedReceiver<Event>,
32+
preconfirmed_watch_tx: watch::Sender<u32>,
33+
) -> anyhow::Result<()> {
34+
while let Some(event) = p2p_event_rx.recv().await {
35+
match event.kind {
36+
p2p::preconfirmed::EventKind::PreconfirmedTransactionsPlaceholder => {
37+
preconfirmed_watch_tx.send_modify(|x| *x += 1)
38+
}
39+
}
40+
}
41+
42+
Ok(())
43+
}

crates/rpc/src/context.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub struct RpcContext {
9999
pub config: RpcConfig,
100100
pub native_class_cache: Option<NativeClassCache>,
101101
pub consensus_info_watch: Option<watch::Receiver<consensus_info::ConsensusInfo>>,
102+
pub preconfirmed_watch: Option<watch::Receiver<u32>>,
102103
}
103104

104105
impl RpcContext {
@@ -144,6 +145,7 @@ impl RpcContext {
144145
config,
145146
native_class_cache,
146147
consensus_info_watch: None,
148+
preconfirmed_watch: None,
147149
}
148150
}
149151

@@ -180,6 +182,13 @@ impl RpcContext {
180182
}
181183
}
182184

185+
pub fn with_preconfirmed_watch(self, preconfirmed_watch: watch::Receiver<u32>) -> Self {
186+
Self {
187+
preconfirmed_watch: Some(preconfirmed_watch),
188+
..self
189+
}
190+
}
191+
183192
#[cfg(test)]
184193
pub fn with_notifications(self, notifications: Notifications) -> Self {
185194
Self {

0 commit comments

Comments
 (0)