Skip to content

Commit e858f85

Browse files
sanityclaudegithub-actions[bot]iduartgomez
authored
fix: immediate peer discovery by sending FindOptimalPeer over network (#1926)
Co-authored-by: Claude <[email protected]> Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: nacho.d.g <[email protected]>
1 parent 312c423 commit e858f85

File tree

7 files changed

+141
-51
lines changed

7 files changed

+141
-51
lines changed

crates/core/src/message.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,11 @@ pub(crate) enum NodeEvent {
334334
key: ContractKey,
335335
subscribed: bool,
336336
},
337+
/// Send a message to a peer over the network
338+
SendMessage {
339+
target: PeerId,
340+
msg: Box<NetMessage>,
341+
},
337342
}
338343

339344
#[derive(Debug, Clone)]
@@ -410,6 +415,9 @@ impl Display for NodeEvent {
410415
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
411416
)
412417
}
418+
NodeEvent::SendMessage { target, msg } => {
419+
write!(f, "SendMessage (to {target}, tx: {})", msg.id())
420+
}
413421
}
414422
}
415423
}

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,13 @@ impl P2pConnManager {
308308
ChannelCloseReason::Handshake
309309
| ChannelCloseReason::Bridge
310310
| ChannelCloseReason::Controller => {
311-
// Critical internal channels closed - perform cleanup and shutdown gracefully
311+
// All ClosedChannel events are critical - the transport is unable to establish
312+
// more connections, rendering this peer useless. Perform cleanup and shutdown.
312313
tracing::error!(
313314
?reason,
314315
is_gateway = self.bridge.op_manager.ring.is_gateway(),
315316
num_connections = self.connections.len(),
316-
"🔴 CRITICAL CHANNEL CLOSED - performing cleanup and shutting down"
317+
"Critical channel closed - performing cleanup and shutting down"
317318
);
318319

319320
// Clean up all active connections
@@ -401,6 +402,15 @@ impl P2pConnManager {
401402
)
402403
.await?;
403404
}
405+
NodeEvent::SendMessage { target, msg } => {
406+
// Send the message to the target peer over the network
407+
tracing::debug!(
408+
tx = %msg.id(),
409+
%target,
410+
"SendMessage event: sending message to peer via network bridge"
411+
);
412+
self.bridge.send(&target, *msg).await?;
413+
}
404414
NodeEvent::QueryConnections { callback } => {
405415
let connections = self.connections.keys().cloned().collect();
406416
timeout(
@@ -718,14 +728,9 @@ impl P2pConnManager {
718728
self.handle_handshake_action(event, state, handshake_handler_msg).await?;
719729
Ok(EventResult::Continue)
720730
}
721-
Err(HandshakeError::ChannelClosed) => {
722-
tracing::error!(
723-
"🔴 HANDSHAKE CHANNEL CLOSED - handshake handler's channel has closed"
724-
);
725-
Ok(EventResult::Event(
726-
ConnEvent::ClosedChannel(ChannelCloseReason::Handshake).into(),
727-
))
728-
}
731+
Err(HandshakeError::ChannelClosed) => Ok(EventResult::Event(
732+
ConnEvent::ClosedChannel(ChannelCloseReason::Handshake).into(),
733+
)),
729734
Err(e) => {
730735
tracing::warn!("Handshake error: {:?}", e);
731736
Ok(EventResult::Continue)
@@ -1161,20 +1166,14 @@ impl P2pConnManager {
11611166
match msg {
11621167
Some(Left((_, msg))) => EventResult::Event(ConnEvent::OutboundMessage(*msg).into()),
11631168
Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()),
1164-
None => {
1165-
tracing::error!("🔴 BRIDGE CHANNEL CLOSED - P2P bridge channel has closed");
1166-
EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into())
1167-
}
1169+
None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()),
11681170
}
11691171
}
11701172

11711173
fn handle_node_controller_msg(&self, msg: Option<NodeEvent>) -> EventResult {
11721174
match msg {
11731175
Some(msg) => EventResult::Event(ConnEvent::NodeAction(msg).into()),
11741176
None => {
1175-
tracing::error!(
1176-
"🔴 CONTROLLER CHANNEL CLOSED - node controller channel has closed"
1177-
);
11781177
EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Controller).into())
11791178
}
11801179
}

crates/core/src/node/testing_impl.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -935,6 +935,11 @@ where
935935
NodeEvent::QueryNodeDiagnostics { .. } => {
936936
unimplemented!()
937937
}
938+
NodeEvent::SendMessage { target, msg } => {
939+
tracing::debug!(tx = %msg.id(), %target, "SendMessage event in testing_impl");
940+
conn_manager.send(&target, *msg).await?;
941+
continue;
942+
}
938943
},
939944
Err(err) => {
940945
super::report_result(

crates/core/src/operations/connect.rs

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,11 @@ impl Operation for ConnectOp {
180180
};
181181
let mut skip_connections = skip_connections.clone();
182182
let mut skip_forwards = skip_forwards.clone();
183-
skip_connections.extend([this_peer.clone(), query_target.peer.clone()]);
183+
skip_connections.extend([
184+
this_peer.clone(),
185+
query_target.peer.clone(),
186+
joiner.peer.clone(),
187+
]);
184188
skip_forwards.extend([this_peer.clone(), query_target.peer.clone()]);
185189
if this_peer == &query_target.peer {
186190
// this peer should be the original target queries
@@ -222,9 +226,21 @@ impl Operation for ConnectOp {
222226
tx = %id,
223227
query_target = %query_target.peer,
224228
joiner = %joiner.peer,
225-
"Gateway has no desirable peer to offer to joiner",
229+
"Gateway found no suitable peers to forward CheckConnectivity request",
226230
);
227-
return_msg = None;
231+
// Send a negative response back to the joiner to inform them
232+
// that no suitable peers are currently available
233+
let response = ConnectResponse::AcceptedBy {
234+
accepted: false,
235+
acceptor: own_loc.clone(),
236+
joiner: joiner.peer.clone(),
237+
};
238+
return_msg = Some(ConnectMsg::Response {
239+
id: *id,
240+
sender: own_loc.clone(),
241+
target: joiner.clone(),
242+
msg: response,
243+
});
228244
new_state = None;
229245
}
230246
} else {
@@ -269,17 +285,27 @@ impl Operation for ConnectOp {
269285
},
270286
..
271287
} => {
288+
let this_peer = op_manager.ring.connection_manager.own_location();
272289
if sender.peer == joiner.peer {
273290
tracing::error!(
274291
tx = %id,
275292
sender = %sender.peer,
276293
joiner = %joiner.peer,
277-
at = %op_manager.ring.connection_manager.own_location().peer,
278-
"Connectivity check from self, aborting"
294+
at = %this_peer.peer,
295+
"Connectivity check from self (sender == joiner), rejecting operation"
279296
);
280-
std::process::exit(1);
297+
return Err(OpError::UnexpectedOpState);
298+
}
299+
if this_peer.peer == joiner.peer {
300+
tracing::error!(
301+
tx = %id,
302+
this_peer = %this_peer.peer,
303+
joiner = %joiner.peer,
304+
sender = %sender.peer,
305+
"Received CheckConnectivity where this peer is the joiner (self-connection attempt), rejecting operation"
306+
);
307+
return Err(OpError::UnexpectedOpState);
281308
}
282-
let this_peer = op_manager.ring.connection_manager.own_location();
283309
let joiner_loc = joiner
284310
.location
285311
.expect("should be already set at the p2p bridge level");
@@ -978,16 +1004,19 @@ async fn connect_request(
9781004
None,
9791005
);
9801006

981-
// Push the new operation and send the message
1007+
// Push the new operation
9821008
op_manager
9831009
.push(new_tx_id, OpEnum::Connect(Box::new(new_op)))
9841010
.await?;
9851011

1012+
// Send the FindOptimalPeer message to the gateway over the network
1013+
// We use notify_node_event with a SendMessage event to ensure it goes through
1014+
// the proper network channel, not just local processing
9861015
op_manager
987-
.notify_op_change(
988-
NetMessage::from(msg),
989-
OpEnum::Connect(Box::new(ConnectOp::new(new_tx_id, None, None, None))),
990-
)
1016+
.notify_node_event(NodeEvent::SendMessage {
1017+
target: gateway.peer.clone(),
1018+
msg: Box::new(NetMessage::from(msg)),
1019+
})
9911020
.await?;
9921021
Ok(())
9931022
}
@@ -1171,13 +1200,17 @@ fn select_forward_target(
11711200
left_htl: usize,
11721201
skip_forwards: &HashSet<PeerId>,
11731202
) -> Option<PeerKeyLocation> {
1203+
// Create an extended skip list that includes the joiner to prevent forwarding to the joiner
1204+
let mut extended_skip = skip_forwards.clone();
1205+
extended_skip.insert(joiner.peer.clone());
1206+
11741207
if left_htl >= connection_manager.rnd_if_htl_above {
11751208
tracing::debug!(
11761209
tx = %id,
11771210
joiner = %joiner.peer,
11781211
"Randomly selecting peer to forward connect request",
11791212
);
1180-
connection_manager.random_peer(|p| !skip_forwards.contains(p))
1213+
connection_manager.random_peer(|p| !extended_skip.contains(p))
11811214
} else {
11821215
tracing::debug!(
11831216
tx = %id,
@@ -1188,7 +1221,7 @@ fn select_forward_target(
11881221
.routing(
11891222
joiner.location.unwrap(),
11901223
Some(&request_peer.peer),
1191-
skip_forwards,
1224+
&extended_skip,
11921225
router,
11931226
)
11941227
.and_then(|pkl| (pkl.peer != joiner.peer).then_some(pkl))

crates/core/src/operations/put.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,14 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
938938
.ring
939939
.closest_potentially_caching(&key, [&own_location.peer].as_slice());
940940

941+
tracing::debug!(
942+
tx = %id,
943+
%key,
944+
target_found = target.is_some(),
945+
target_peer = ?target.as_ref().map(|t| t.peer.to_string()),
946+
"Determined PUT routing target"
947+
);
948+
941949
// No other peers found - handle locally
942950
if target.is_none() {
943951
tracing::debug!(tx = %id, %key, "No other peers available, handling put operation locally");
@@ -1017,6 +1025,16 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
10171025
}
10181026

10191027
// At least one peer found - forward to network
1028+
let target_peer = target.unwrap();
1029+
1030+
tracing::debug!(
1031+
tx = %id,
1032+
%key,
1033+
target_peer = %target_peer.peer,
1034+
target_location = ?target_peer.location,
1035+
"Forwarding PUT to target peer"
1036+
);
1037+
10201038
put_op.state = Some(PutState::AwaitingResponse {
10211039
key,
10221040
upstream: None,
@@ -1032,9 +1050,15 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
10321050
related_contracts,
10331051
value,
10341052
htl,
1035-
target: target.unwrap(),
1053+
target: target_peer,
10361054
};
10371055

1056+
tracing::debug!(
1057+
tx = %id,
1058+
%key,
1059+
"Calling notify_op_change to send PUT message to network"
1060+
);
1061+
10381062
// Use notify_op_change to trigger the operation processing
10391063
// This will cause the operation to be processed through process_message for network propagation
10401064
op_manager

crates/core/tests/connectivity.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,9 @@ async fn test_three_node_network_connectivity() -> TestResult {
493493
let peer1_ws_port = peer1_ws_socket.local_addr()?.port();
494494
let peer2_ws_port = peer2_ws_socket.local_addr()?.port();
495495

496+
// Generate a single consistent location for the gateway
497+
let gateway_location = RNG.lock().unwrap().random();
498+
496499
let gateway_config = ConfigArgs {
497500
ws_api: WebsocketApiArgs {
498501
address: Some(Ipv4Addr::LOCALHOST.into()),
@@ -504,7 +507,7 @@ async fn test_three_node_network_connectivity() -> TestResult {
504507
is_gateway: true,
505508
skip_load_from_network: true,
506509
gateways: Some(vec![]),
507-
location: Some(RNG.lock().unwrap().random()),
510+
location: Some(gateway_location),
508511
ignore_protocol_checking: true,
509512
address: Some(Ipv4Addr::LOCALHOST.into()),
510513
network_port: Some(gateway_port),
@@ -522,10 +525,10 @@ async fn test_three_node_network_connectivity() -> TestResult {
522525
..Default::default()
523526
};
524527

525-
// Gateway info for peers
528+
// Gateway info for peers - use the SAME location as gateway config
526529
let gateway_info = InlineGwConfig {
527530
address: (Ipv4Addr::LOCALHOST, gateway_port).into(),
528-
location: Some(RNG.lock().unwrap().random()),
531+
location: Some(gateway_location),
529532
public_key_path: temp_dir_gw.path().join("public.pem"),
530533
};
531534

0 commit comments

Comments
 (0)