Skip to content

Commit fdf7f74

Browse files
authored
Merge pull request #718 from 0xMimir/feat/port-scheduler
Ported scheduler and pnet layer
2 parents 55a9ae2 + 6a609e6 commit fdf7f74

26 files changed

+1049
-678
lines changed

node/src/action_kind.rs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ use crate::p2p::network::kad::stream::P2pNetworkKademliaStreamAction;
4545
use crate::p2p::network::kad::{P2pNetworkKadAction, P2pNetworkKademliaAction};
4646
use crate::p2p::network::noise::P2pNetworkNoiseAction;
4747
use crate::p2p::network::pnet::P2pNetworkPnetAction;
48+
use crate::p2p::network::pnet_effectful::P2pNetworkPnetEffectfulAction;
4849
use crate::p2p::network::pubsub::pubsub_effectful::P2pNetworkPubsubEffectfulAction;
4950
use crate::p2p::network::pubsub::P2pNetworkPubsubAction;
5051
use crate::p2p::network::rpc::P2pNetworkRpcAction;
5152
use crate::p2p::network::scheduler::P2pNetworkSchedulerAction;
53+
use crate::p2p::network::scheduler_effectful::P2pNetworkSchedulerEffectfulAction;
5254
use crate::p2p::network::select::P2pNetworkSelectAction;
5355
use crate::p2p::network::yamux::P2pNetworkYamuxAction;
5456
use crate::p2p::network::P2pNetworkAction;
@@ -306,6 +308,8 @@ pub enum ActionKind {
306308
P2pNetworkPnetOutgoingData,
307309
P2pNetworkPnetSetupNonce,
308310
P2pNetworkPnetTimeout,
311+
P2pNetworkPnetEffectfulOutgoingData,
312+
P2pNetworkPnetEffectfulSetupNonce,
309313
P2pNetworkPubsubBroadcast,
310314
P2pNetworkPubsubBroadcastSigned,
311315
P2pNetworkPubsubGraft,
@@ -331,7 +335,6 @@ pub enum ActionKind {
331335
P2pNetworkSchedulerDisconnect,
332336
P2pNetworkSchedulerDisconnected,
333337
P2pNetworkSchedulerError,
334-
P2pNetworkSchedulerIncomingConnectionIsReady,
335338
P2pNetworkSchedulerIncomingDataDidReceive,
336339
P2pNetworkSchedulerIncomingDataIsReady,
337340
P2pNetworkSchedulerIncomingDidAccept,
@@ -347,6 +350,16 @@ pub enum ActionKind {
347350
P2pNetworkSchedulerSelectDone,
348351
P2pNetworkSchedulerSelectError,
349352
P2pNetworkSchedulerYamuxDidInit,
353+
P2pNetworkSchedulerEffectfulDisconnect,
354+
P2pNetworkSchedulerEffectfulError,
355+
P2pNetworkSchedulerEffectfulIncomingConnectionIsReady,
356+
P2pNetworkSchedulerEffectfulIncomingDataIsReady,
357+
P2pNetworkSchedulerEffectfulIncomingDidAccept,
358+
P2pNetworkSchedulerEffectfulInterfaceDetected,
359+
P2pNetworkSchedulerEffectfulNoiseSelectDone,
360+
P2pNetworkSchedulerEffectfulOutgoingConnect,
361+
P2pNetworkSchedulerEffectfulOutgoingDidConnect,
362+
P2pNetworkSchedulerEffectfulSelectError,
350363
P2pNetworkSelectIncomingData,
351364
P2pNetworkSelectIncomingDataAuth,
352365
P2pNetworkSelectIncomingDataMux,
@@ -548,7 +561,7 @@ pub enum ActionKind {
548561
}
549562

550563
impl ActionKind {
551-
pub const COUNT: u16 = 454;
564+
pub const COUNT: u16 = 465;
552565
}
553566

554567
impl std::fmt::Display for ActionKind {
@@ -946,7 +959,9 @@ impl ActionKindGet for P2pNetworkAction {
946959
fn kind(&self) -> ActionKind {
947960
match self {
948961
Self::Scheduler(a) => a.kind(),
962+
Self::SchedulerEffectful(a) => a.kind(),
949963
Self::Pnet(a) => a.kind(),
964+
Self::PnetEffectful(a) => a.kind(),
950965
Self::Select(a) => a.kind(),
951966
Self::Noise(a) => a.kind(),
952967
Self::Yamux(a) => a.kind(),
@@ -1371,13 +1386,10 @@ impl ActionKindGet for P2pNetworkSchedulerAction {
13711386
Self::InterfaceExpired { .. } => ActionKind::P2pNetworkSchedulerInterfaceExpired,
13721387
Self::ListenerReady { .. } => ActionKind::P2pNetworkSchedulerListenerReady,
13731388
Self::ListenerError { .. } => ActionKind::P2pNetworkSchedulerListenerError,
1374-
Self::IncomingConnectionIsReady { .. } => {
1375-
ActionKind::P2pNetworkSchedulerIncomingConnectionIsReady
1376-
}
13771389
Self::IncomingDidAccept { .. } => ActionKind::P2pNetworkSchedulerIncomingDidAccept,
1390+
Self::IncomingDataIsReady { .. } => ActionKind::P2pNetworkSchedulerIncomingDataIsReady,
13781391
Self::OutgoingConnect { .. } => ActionKind::P2pNetworkSchedulerOutgoingConnect,
13791392
Self::OutgoingDidConnect { .. } => ActionKind::P2pNetworkSchedulerOutgoingDidConnect,
1380-
Self::IncomingDataIsReady { .. } => ActionKind::P2pNetworkSchedulerIncomingDataIsReady,
13811393
Self::IncomingDataDidReceive { .. } => {
13821394
ActionKind::P2pNetworkSchedulerIncomingDataDidReceive
13831395
}
@@ -1394,6 +1406,33 @@ impl ActionKindGet for P2pNetworkSchedulerAction {
13941406
}
13951407
}
13961408

1409+
impl ActionKindGet for P2pNetworkSchedulerEffectfulAction {
1410+
fn kind(&self) -> ActionKind {
1411+
match self {
1412+
Self::InterfaceDetected { .. } => {
1413+
ActionKind::P2pNetworkSchedulerEffectfulInterfaceDetected
1414+
}
1415+
Self::IncomingConnectionIsReady { .. } => {
1416+
ActionKind::P2pNetworkSchedulerEffectfulIncomingConnectionIsReady
1417+
}
1418+
Self::IncomingDidAccept { .. } => {
1419+
ActionKind::P2pNetworkSchedulerEffectfulIncomingDidAccept
1420+
}
1421+
Self::OutgoingConnect { .. } => ActionKind::P2pNetworkSchedulerEffectfulOutgoingConnect,
1422+
Self::OutgoingDidConnect { .. } => {
1423+
ActionKind::P2pNetworkSchedulerEffectfulOutgoingDidConnect
1424+
}
1425+
Self::IncomingDataIsReady { .. } => {
1426+
ActionKind::P2pNetworkSchedulerEffectfulIncomingDataIsReady
1427+
}
1428+
Self::NoiseSelectDone { .. } => ActionKind::P2pNetworkSchedulerEffectfulNoiseSelectDone,
1429+
Self::SelectError { .. } => ActionKind::P2pNetworkSchedulerEffectfulSelectError,
1430+
Self::Disconnect { .. } => ActionKind::P2pNetworkSchedulerEffectfulDisconnect,
1431+
Self::Error { .. } => ActionKind::P2pNetworkSchedulerEffectfulError,
1432+
}
1433+
}
1434+
}
1435+
13971436
impl ActionKindGet for P2pNetworkPnetAction {
13981437
fn kind(&self) -> ActionKind {
13991438
match self {
@@ -1405,6 +1444,15 @@ impl ActionKindGet for P2pNetworkPnetAction {
14051444
}
14061445
}
14071446

1447+
impl ActionKindGet for P2pNetworkPnetEffectfulAction {
1448+
fn kind(&self) -> ActionKind {
1449+
match self {
1450+
Self::OutgoingData { .. } => ActionKind::P2pNetworkPnetEffectfulOutgoingData,
1451+
Self::SetupNonce { .. } => ActionKind::P2pNetworkPnetEffectfulSetupNonce,
1452+
}
1453+
}
1454+
}
1455+
14081456
impl ActionKindGet for P2pNetworkSelectAction {
14091457
fn kind(&self) -> ActionKind {
14101458
match self {

node/src/event_source/event_source_effects.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use p2p::channels::snark::P2pChannelsSnarkAction;
22
use p2p::channels::streaming_rpc::P2pChannelsStreamingRpcAction;
33
use p2p::channels::transaction::P2pChannelsTransactionAction;
4+
use p2p::P2pNetworkSchedulerEffectfulAction;
45
use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyError};
56

67
use crate::action::CheckTimeoutsAction;
@@ -73,9 +74,11 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
7374
.dispatch(P2pNetworkSchedulerAction::ListenerError { listener, error });
7475
}
7576
MioEvent::IncomingConnectionIsReady { listener } => {
76-
store.dispatch(P2pNetworkSchedulerAction::IncomingConnectionIsReady {
77-
listener,
78-
});
77+
store.dispatch(
78+
P2pNetworkSchedulerEffectfulAction::IncomingConnectionIsReady {
79+
listener,
80+
},
81+
);
7982
}
8083
MioEvent::IncomingConnectionDidAccept(addr, result) => {
8184
store.dispatch(P2pNetworkSchedulerAction::IncomingDidAccept {

node/src/logger/logger_effects.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ pub fn logger_effects<S: Service>(store: &Store<S>, action: ActionWithMetaRef<'_
6868
P2pAction::Peer(action) => action.action_event(&context),
6969
P2pAction::Network(action) => match action {
7070
P2pNetworkAction::Scheduler(action) => action.action_event(&context),
71+
P2pNetworkAction::SchedulerEffectful(action) => action.action_event(&context),
7172
P2pNetworkAction::Pnet(action) => action.action_event(&context),
73+
P2pNetworkAction::PnetEffectful(action) => action.action_event(&context),
7274
P2pNetworkAction::Select(action) => action.action_event(&context),
7375
P2pNetworkAction::Noise(action) => action.action_event(&context),
7476
P2pNetworkAction::Yamux(action) => action.action_event(&context),

node/src/p2p/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,5 +112,7 @@ impl_into_global_action!(p2p::P2pNetworkSelectAction);
112112
impl_into_global_action!(p2p::P2pNetworkPnetAction);
113113
impl_into_global_action!(p2p::P2pNetworkNoiseAction);
114114
impl_into_global_action!(p2p::P2pNetworkRpcAction);
115+
impl_into_global_action!(p2p::P2pNetworkSchedulerEffectfulAction);
116+
impl_into_global_action!(p2p::P2pNetworkPnetEffectfulAction);
115117

116118
impl p2p::P2pActionTrait<crate::State> for crate::Action {}

node/src/p2p/network/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,9 @@ impl redux::EnablingCondition<crate::State> for P2pNetworkPubsubAction {
7777
state.p2p.is_enabled(self, time)
7878
}
7979
}
80+
81+
impl redux::EnablingCondition<crate::State> for P2pNetworkSchedulerEffectfulAction {
82+
fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
83+
state.p2p.is_enabled(self, time)
84+
}
85+
}

p2p/src/lib.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ pub mod connection;
44
pub mod disconnection;
55
pub mod discovery;
66
pub mod identity;
7-
pub mod peer;
87
use bootstrap::P2pNetworkKadBootstrapState;
98
use channels::{
10-
rpc::P2pChannelsRpcAction, snark::P2pChannelsSnarkAction,
11-
transaction::P2pChannelsTransactionAction,
9+
best_tip::P2pChannelsBestTipAction, rpc::P2pChannelsRpcAction, snark::P2pChannelsSnarkAction,
10+
snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
11+
streaming_rpc::P2pChannelsStreamingRpcAction, transaction::P2pChannelsTransactionAction,
1212
};
1313
use connection::incoming::P2pConnectionIncomingAction;
1414
use disconnection::P2pDisconnectionAction;
@@ -27,6 +27,9 @@ pub mod identify;
2727
pub mod network;
2828
pub use self::network::*;
2929

30+
pub mod peer;
31+
pub use peer::*;
32+
3033
mod p2p_config;
3134
pub use p2p_config::*;
3235

@@ -106,6 +109,7 @@ pub trait P2pActionTrait<State>:
106109
+ From<P2pNetworkIdentifyStreamEffectfulAction>
107110
+ From<P2pNetworkSelectAction>
108111
+ From<P2pNetworkPnetAction>
112+
+ From<P2pNetworkPnetEffectfulAction>
109113
+ From<P2pNetworkNoiseAction>
110114
+ From<P2pConnectionIncomingAction>
111115
+ From<P2pNetworkPubsubAction>
@@ -115,5 +119,9 @@ pub trait P2pActionTrait<State>:
115119
+ From<P2pNetworkRpcAction>
116120
+ From<P2pChannelsRpcAction>
117121
+ From<P2pDisconnectionAction>
122+
+ From<P2pNetworkSchedulerEffectfulAction>
123+
+ From<P2pChannelsBestTipAction>
124+
+ From<P2pChannelsSnarkJobCommitmentAction>
125+
+ From<P2pChannelsStreamingRpcAction>
118126
{
119127
}

p2p/src/network/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@ mod p2p_network_effects;
2222
pub mod scheduler;
2323
pub use self::scheduler::*;
2424

25+
pub mod scheduler_effectful;
26+
pub use self::scheduler_effectful::*;
27+
2528
pub mod pnet;
2629
pub use self::pnet::*;
2730

31+
pub mod pnet_effectful;
32+
pub use self::pnet_effectful::*;
33+
2834
pub mod select;
2935
pub use self::select::*;
3036

p2p/src/network/p2p_network_actions.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ use openmina_core::ActionEvent;
22
use serde::{Deserialize, Serialize};
33

44
use super::{
5-
identify::*, kad::*, noise::*, pnet::*, pubsub::*, rpc::*, scheduler::*, select::*, yamux::*,
5+
identify::*, kad::*, noise::*, pnet::*, pnet_effectful::*, pubsub::*, rpc::*, scheduler::*,
6+
select::*, yamux::*, P2pNetworkSchedulerEffectfulAction,
67
};
78

89
use crate::P2pState;
910

1011
#[derive(derive_more::From, Serialize, Deserialize, Debug, Clone, ActionEvent)]
1112
pub enum P2pNetworkAction {
1213
Scheduler(P2pNetworkSchedulerAction),
14+
SchedulerEffectful(P2pNetworkSchedulerEffectfulAction),
1315
Pnet(P2pNetworkPnetAction),
16+
PnetEffectful(P2pNetworkPnetEffectfulAction),
1417
Select(P2pNetworkSelectAction),
1518
Noise(P2pNetworkNoiseAction),
1619
Yamux(P2pNetworkYamuxAction),
@@ -25,7 +28,9 @@ impl redux::EnablingCondition<P2pState> for P2pNetworkAction {
2528
fn is_enabled(&self, state: &P2pState, time: redux::Timestamp) -> bool {
2629
match self {
2730
Self::Scheduler(v) => v.is_enabled(state, time),
31+
Self::SchedulerEffectful(v) => v.is_enabled(state, time),
2832
Self::Pnet(v) => v.is_enabled(state, time),
33+
Self::PnetEffectful(v) => v.is_enabled(state, time),
2934
Self::Select(v) => v.is_enabled(state, time),
3035
Self::Noise(v) => v.is_enabled(state, time),
3136
Self::Yamux(v) => v.is_enabled(state, time),

p2p/src/network/p2p_network_effects.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,23 @@ impl P2pNetworkAction {
99
Store::Service: P2pMioService + P2pCryptoService + P2pNetworkService,
1010
{
1111
match self {
12-
P2pNetworkAction::Scheduler(v) => v.effects(meta, store),
13-
P2pNetworkAction::Pnet(v) => v.effects(meta, store),
1412
P2pNetworkAction::Identify(v) => match v.effects(meta, store) {
1513
Ok(_) => {}
1614
Err(e) => error!(meta.time(); "error dispatching Identify stream action: {e}"),
1715
},
18-
P2pNetworkAction::Select(_)
16+
P2pNetworkAction::Pnet(_)
17+
| P2pNetworkAction::Select(_)
1918
| P2pNetworkAction::Noise(_)
2019
| P2pNetworkAction::Yamux(_)
2120
| P2pNetworkAction::Kad(_)
2221
| P2pNetworkAction::Pubsub(_)
23-
| P2pNetworkAction::Rpc(_) => {
22+
| P2pNetworkAction::Rpc(_)
23+
| P2pNetworkAction::Scheduler(_) => {
2424
// handled by reducer
2525
}
26+
P2pNetworkAction::SchedulerEffectful(v) => v.effects(meta, store),
2627
P2pNetworkAction::PubsubEffectful(v) => v.effects(meta, store),
28+
P2pNetworkAction::PnetEffectful(v) => v.effects(meta, store),
2729
}
2830
}
2931
}

p2p/src/network/p2p_network_reducer.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,24 @@ use super::*;
66

77
impl P2pNetworkState {
88
pub fn reducer<State, Action>(
9-
mut state_context: Substate<Action, State, Self>,
9+
state_context: Substate<Action, State, Self>,
1010
action: redux::ActionWithMeta<&P2pNetworkAction>,
1111
limits: &P2pLimits,
1212
) -> Result<(), String>
1313
where
1414
State: crate::P2pStateTrait,
1515
Action: crate::P2pActionTrait<State>,
1616
{
17-
let state = state_context.get_substate_mut()?;
18-
1917
let (action, meta) = action.split();
2018
match action {
21-
P2pNetworkAction::Scheduler(a) => {
22-
state.scheduler.reducer(meta.with_action(a));
23-
Ok(())
24-
}
25-
P2pNetworkAction::Pnet(a) => {
26-
if let Some(cn) = state.scheduler.connections.get_mut(a.addr()) {
27-
cn.pnet.reducer(meta.with_action(a))
28-
}
29-
Ok(())
30-
}
19+
P2pNetworkAction::Pnet(a) => P2pNetworkPnetState::reducer(
20+
Substate::from_compatible_substate(state_context),
21+
meta.with_action(a),
22+
),
23+
P2pNetworkAction::Scheduler(a) => P2pNetworkSchedulerState::reducer(
24+
Substate::from_compatible_substate(state_context),
25+
meta.with_action(a),
26+
),
3127
P2pNetworkAction::Select(a) => P2pNetworkSelectState::reducer(
3228
Substate::from_compatible_substate(state_context),
3329
meta.with_action(a),
@@ -54,15 +50,17 @@ impl P2pNetworkState {
5450
Substate::from_compatible_substate(state_context),
5551
meta.with_action(a),
5652
),
57-
P2pNetworkAction::PubsubEffectful(_) => {
58-
// Effectful action; no reducer
59-
Ok(())
60-
}
6153
P2pNetworkAction::Rpc(a) => P2pNetworkRpcState::reducer(
6254
Substate::from_compatible_substate(state_context),
6355
meta.with_action(a),
6456
limits,
6557
),
58+
P2pNetworkAction::PubsubEffectful(_)
59+
| P2pNetworkAction::SchedulerEffectful(_)
60+
| P2pNetworkAction::PnetEffectful(_) => {
61+
// Effectful action; no reducer
62+
Ok(())
63+
}
6664
}
6765
}
6866

0 commit comments

Comments
 (0)