Skip to content

Commit f49581c

Browse files
authored
Merge pull request #638 from openmina/fix/p2p/crash-resilience
Improve crash resilience in p2p
2 parents 4a1bd73 + b8f4103 commit f49581c

File tree

17 files changed

+438
-235
lines changed

17 files changed

+438
-235
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ tracing-wasm = "0.2"
3232
[target.'cfg(not(target_family = "wasm"))'.dependencies]
3333
redux = { workspace = true, features=["serializable_callbacks"] }
3434
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
35+
libp2p-identity = { version = "=0.2.7", features = ["ed25519", "rand", "serde"] }
3536

3637
[features]
3738
p2p-webrtc = ["node/p2p-webrtc"]

node/common/src/service/p2p.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,14 @@ impl P2pCryptoService for NodeService {
8787
.sign(&msg)
8888
.expect("unable to create signature")
8989
}
90+
91+
fn verify_publication(
92+
&mut self,
93+
pk: &libp2p_identity::PublicKey,
94+
publication: &[u8],
95+
sig: &[u8],
96+
) -> bool {
97+
let msg: Vec<u8> = [b"libp2p-pubsub:", publication].concat();
98+
pk.verify(&msg, sig)
99+
}
90100
}

node/src/action_kind.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,11 +307,14 @@ pub enum ActionKind {
307307
P2pNetworkPubsubBroadcastSigned,
308308
P2pNetworkPubsubGraft,
309309
P2pNetworkPubsubIncomingData,
310+
P2pNetworkPubsubIncomingMessage,
310311
P2pNetworkPubsubNewStream,
311312
P2pNetworkPubsubOutgoingData,
312313
P2pNetworkPubsubOutgoingMessage,
314+
P2pNetworkPubsubOutgoingMessageError,
313315
P2pNetworkPubsubPrune,
314316
P2pNetworkPubsubSign,
317+
P2pNetworkPubsubSignError,
315318
P2pNetworkRpcHeartbeatSend,
316319
P2pNetworkRpcIncomingData,
317320
P2pNetworkRpcIncomingMessage,
@@ -538,7 +541,7 @@ pub enum ActionKind {
538541
}
539542

540543
impl ActionKind {
541-
pub const COUNT: u16 = 446;
544+
pub const COUNT: u16 = 449;
542545
}
543546

544547
impl std::fmt::Display for ActionKind {
@@ -1462,12 +1465,15 @@ impl ActionKindGet for P2pNetworkPubsubAction {
14621465
match self {
14631466
Self::NewStream { .. } => ActionKind::P2pNetworkPubsubNewStream,
14641467
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubIncomingData,
1468+
Self::IncomingMessage { .. } => ActionKind::P2pNetworkPubsubIncomingMessage,
14651469
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
14661470
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
14671471
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,
14681472
Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign,
1473+
Self::SignError { .. } => ActionKind::P2pNetworkPubsubSignError,
14691474
Self::BroadcastSigned { .. } => ActionKind::P2pNetworkPubsubBroadcastSigned,
14701475
Self::OutgoingMessage { .. } => ActionKind::P2pNetworkPubsubOutgoingMessage,
1476+
Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError,
14711477
Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData,
14721478
}
14731479
}

node/testing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ hex = "0.4.3"
5555

5656
[target.'cfg(not(target_family = "wasm"))'.dependencies]
5757
redux = { workspace = true, features=["serializable_callbacks"] }
58+
libp2p-identity = { version = "=0.2.7", features = ["ed25519", "rand", "serde"] }
5859

5960
[features]
6061
default = ["p2p-libp2p", "scenario-generators"]

node/testing/src/service/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,15 @@ impl P2pCryptoService for NodeTestingService {
296296
fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
297297
self.real.sign_publication(publication)
298298
}
299+
300+
fn verify_publication(
301+
&mut self,
302+
pk: &libp2p_identity::PublicKey,
303+
publication: &[u8],
304+
sig: &[u8],
305+
) -> bool {
306+
self.real.verify_publication(pk, publication, sig)
307+
}
299308
}
300309

301310
impl node::ledger::LedgerService for NodeTestingService {

p2p/src/network/noise/p2p_network_noise_effects.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,14 @@ impl P2pNetworkNoiseAction {
5252
} else {
5353
None
5454
};
55-
let handshake_error = if let Some(P2pNetworkNoiseStateInner::Error(error)) = &state.inner {
56-
Some(error)
57-
} else {
58-
None
59-
};
55+
56+
if let Some(P2pNetworkNoiseStateInner::Error(error)) = &state.inner {
57+
store.dispatch(P2pNetworkSchedulerAction::Error {
58+
addr: *self.addr(),
59+
error: error.clone().into(),
60+
});
61+
return;
62+
}
6063

6164
let middle_initiator =
6265
matches!(&state.inner, Some(P2pNetworkNoiseStateInner::Initiator(..)))
@@ -93,14 +96,6 @@ impl P2pNetworkNoiseAction {
9396
}
9497
}
9598
P2pNetworkNoiseAction::IncomingChunk { addr, .. } => {
96-
if let Some(error) = handshake_error {
97-
store.dispatch(P2pNetworkSchedulerAction::Error {
98-
addr,
99-
error: error.clone().into(),
100-
});
101-
return;
102-
}
103-
10499
if let Some((peer_id, true)) = handshake_done {
105100
let addr = *self.addr();
106101
store.dispatch(P2pConnectionIncomingAction::FinalizePendingLibp2p {

p2p/src/network/noise/p2p_network_noise_reducer.rs

Lines changed: 80 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
use chacha20poly1305::{aead::generic_array::GenericArray, AeadInPlace, ChaCha20Poly1305, KeyInit};
2+
use crypto_bigint::consts::U12;
23

34
use self::p2p_network_noise_state::ResponderConsumeOutput;
45

56
use super::*;
67

78
use super::p2p_network_noise_state::{
8-
NoiseError, NoiseState, P2pNetworkNoiseState, P2pNetworkNoiseStateInitiator,
9+
InitiatorOutput, NoiseError, NoiseState, P2pNetworkNoiseState, P2pNetworkNoiseStateInitiator,
910
P2pNetworkNoiseStateInner, P2pNetworkNoiseStateResponder, ResponderOutput,
1011
};
1112

13+
const MAX_CHUNK_SIZE: usize = u16::MAX as usize - 19;
14+
1215
impl P2pNetworkNoiseState {
1316
pub fn reducer(&mut self, action: redux::ActionWithMeta<&P2pNetworkNoiseAction>) {
1417
match action.action() {
@@ -68,13 +71,16 @@ impl P2pNetworkNoiseState {
6871
let mut offset = 0;
6972
loop {
7073
let buf = &self.buffer[offset..];
71-
if buf.len() >= 2 {
72-
let len = u16::from_be_bytes(buf[..2].try_into().expect("cannot fail"));
74+
// TODO: add bug_condition
75+
let len = buf
76+
.get(..2)
77+
.and_then(|buf| Some(u16::from_be_bytes(buf.try_into().ok()?)));
78+
79+
if let Some(len) = len {
7380
let full_len = 2 + len as usize;
7481
if buf.len() >= full_len {
7582
self.incoming_chunks.push_back(buf[..full_len].to_vec());
7683
offset += full_len;
77-
7884
continue;
7985
}
8086
}
@@ -187,56 +193,73 @@ impl P2pNetworkNoiseState {
187193
..
188194
} => {
189195
let aead = ChaCha20Poly1305::new(&send_key.0.into());
190-
let chunk_max_size = u16::MAX as usize - 19;
191-
let chunks = data
192-
.chunks(chunk_max_size)
193-
.map(|data| {
194-
let mut chunk = Vec::with_capacity(18 + data.len());
195-
chunk.extend_from_slice(&((data.len() + 16) as u16).to_be_bytes());
196-
chunk.extend_from_slice(data);
196+
let mut chunks = vec![];
197+
198+
for data_chunk in data.chunks(MAX_CHUNK_SIZE) {
199+
let mut chunk = Vec::with_capacity(18 + data_chunk.len());
200+
chunk
201+
.extend_from_slice(&((data_chunk.len() + 16) as u16).to_be_bytes());
202+
chunk.extend_from_slice(data_chunk);
197203

198-
let mut nonce = GenericArray::default();
199-
nonce[4..].clone_from_slice(&send_nonce.to_le_bytes());
200-
*send_nonce += 1;
204+
let mut nonce: GenericArray<u8, U12> = GenericArray::default();
205+
nonce[4..].clone_from_slice(&send_nonce.to_le_bytes());
206+
*send_nonce += 1;
201207

202-
let tag = aead
203-
.encrypt_in_place_detached(
204-
&nonce,
205-
&[],
206-
&mut chunk[2..(2 + data.len())],
207-
)
208-
.expect("cannot fail");
209-
chunk.extend_from_slice(&tag);
210-
chunk.into()
211-
})
212-
.collect();
208+
let tag = aead.encrypt_in_place_detached(
209+
&nonce,
210+
&[],
211+
&mut chunk[2..(2 + data_chunk.len())],
212+
);
213+
214+
let tag = match tag {
215+
Ok(tag) => tag,
216+
Err(_) => {
217+
*state =
218+
P2pNetworkNoiseStateInner::Error(NoiseError::Encryption);
219+
return;
220+
}
221+
};
222+
223+
chunk.extend_from_slice(&tag);
224+
chunks.push(chunk.into());
225+
}
213226
self.outgoing_chunks.push_back(chunks);
214227
}
215228
P2pNetworkNoiseStateInner::Initiator(i) => {
216-
if let (Some((chunk, (send_key, recv_key))), Some(remote_pk)) =
217-
(i.generate(data), i.remote_pk.clone())
218-
{
219-
self.outgoing_chunks.push_back(vec![chunk.into()]);
220-
let remote_peer_id = remote_pk.peer_id();
221-
222-
if self
223-
.expected_peer_id
224-
.is_some_and(|expected_per_id| expected_per_id != remote_peer_id)
225-
{
226-
*state = P2pNetworkNoiseStateInner::Error(dbg!(
227-
NoiseError::RemotePeerIdMismatch
228-
));
229-
} else {
230-
*state = P2pNetworkNoiseStateInner::Done {
231-
incoming: false,
229+
match (i.generate(data), i.remote_pk.clone()) {
230+
(
231+
Ok(Some(InitiatorOutput {
232232
send_key,
233233
recv_key,
234-
recv_nonce: 0,
235-
send_nonce: 0,
236-
remote_pk,
237-
remote_peer_id,
238-
};
234+
chunk,
235+
})),
236+
Some(remote_pk),
237+
) => {
238+
self.outgoing_chunks.push_back(vec![chunk.into()]);
239+
let remote_peer_id = remote_pk.peer_id();
240+
241+
if self.expected_peer_id.is_some_and(|expected_per_id| {
242+
expected_per_id != remote_peer_id
243+
}) {
244+
*state = P2pNetworkNoiseStateInner::Error(dbg!(
245+
NoiseError::RemotePeerIdMismatch
246+
));
247+
} else {
248+
*state = P2pNetworkNoiseStateInner::Done {
249+
incoming: false,
250+
send_key,
251+
recv_key,
252+
recv_nonce: 0,
253+
send_nonce: 0,
254+
remote_pk,
255+
remote_peer_id,
256+
};
257+
}
258+
}
259+
(Err(error), Some(_)) => {
260+
*state = P2pNetworkNoiseStateInner::Error(error);
239261
}
262+
_ => (),
240263
}
241264
}
242265
P2pNetworkNoiseStateInner::Responder(r) => {
@@ -268,9 +291,17 @@ impl P2pNetworkNoiseState {
268291
nonce[4..].clone_from_slice(&send_nonce.to_le_bytes());
269292
*send_nonce += 1;
270293

271-
let tag = aead
272-
.encrypt_in_place_detached(&nonce, &[], &mut chunk[2..(2 + data.len())])
273-
.expect("cannot fail");
294+
let tag = match aead.encrypt_in_place_detached(
295+
&nonce,
296+
&[],
297+
&mut chunk[2..(2 + data.len())],
298+
) {
299+
Ok(tag) => tag,
300+
Err(_) => {
301+
self.inner = Some(P2pNetworkNoiseStateInner::Error(NoiseError::Encryption));
302+
return;
303+
}
304+
};
274305
chunk.extend_from_slice(&tag);
275306

276307
self.outgoing_chunks.push_back(vec![chunk.into()]);

0 commit comments

Comments
 (0)