Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fe2804d
Fixed the AddRouteResultMessage panic problem too
dnwiebe Aug 15, 2025
81333fe
Preemptive review issues
dnwiebe Aug 15, 2025
02d46c3
Formatting
dnwiebe Aug 15, 2025
442e63c
example.com -> www.example.com
dnwiebe Aug 18, 2025
ac4c096
Formatting
dnwiebe Aug 18, 2025
1234f5c
Added some debug logging to track RRI
dnwiebe Aug 26, 2025
70049a9
Formatting
dnwiebe Aug 26, 2025
30f832b
Clippy
dnwiebe Aug 26, 2025
b8c3894
Merge branch 'master' into GH-818
dnwiebe Aug 27, 2025
7c3fa52
Made RRI logs more consistent and increased straggler timeout to 30s
dnwiebe Aug 27, 2025
f9cab9a
Typo corrections
dnwiebe Aug 28, 2025
5a89e1a
Interim commit
dnwiebe Sep 1, 2025
8f774b6
Tests pass
dnwiebe Sep 6, 2025
f79d84d
Unit tests pass
dnwiebe Sep 15, 2025
b5df05c
Interim commit: a real mess
dnwiebe Sep 16, 2025
828ad89
Proxy Server tests passing; some cleanup yet to do
dnwiebe Sep 24, 2025
814bdef
Unit tests are all passing
dnwiebe Sep 25, 2025
bdec124
Unit and multinode tests pass
dnwiebe Sep 25, 2025
3798177
All the tests pass now
dnwiebe Oct 1, 2025
d6432d1
Tests passing except one in proxy_client
dnwiebe Oct 9, 2025
7791bb3
RRIDs are completely gone; hostnames are mandatory.
dnwiebe Oct 10, 2025
393d324
Tests all pass
dnwiebe Oct 11, 2025
42b055a
MOre review issues
dnwiebe Oct 11, 2025
e019807
Review issues
dnwiebe Oct 14, 2025
c5ba669
Test is passing now, and code is cleaner
dnwiebe Oct 16, 2025
05adecf
Cleaned up some code
dnwiebe Oct 16, 2025
723c8b0
More review issues
dnwiebe Oct 21, 2025
91193af
Formatting
dnwiebe Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
**/*.rs.bk
.idea/azure/
.idea/inspectionProfiles/Project_Default.xml
.idea/copilot.data.migration.*

### Node
node_modules
Expand Down
73 changes: 28 additions & 45 deletions multinode_integration_tests/tests/connection_termination_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

use masq_lib::blockchains::chains::Chain;
use masq_lib::constants::HTTP_PORT;
use masq_lib::test_utils::utils::TEST_DEFAULT_MULTINODE_CHAIN;
use masq_lib::utils::find_free_port;
use multinode_integration_tests_lib::masq_mock_node::MASQMockNode;
Expand Down Expand Up @@ -86,13 +87,12 @@ fn reported_server_drop() {
let (_, _, lcp) = mock_node
.wait_for_package(&masquerader, Duration::from_secs(2))
.unwrap();
let (stream_key, return_route_id) =
context_from_request_lcp(lcp, real_node.main_cryptde_null().unwrap(), &exit_cryptde);
let stream_key = stream_key_from_request_lcp(lcp, &exit_cryptde);

mock_node
.transmit_package(
mock_node.port_list()[0],
create_server_drop_report(&mock_node, &real_node, stream_key, return_route_id),
create_server_drop_report(&mock_node, &real_node, stream_key),
&masquerader,
real_node.main_public_key(),
real_node.socket_addr(PortSelector::First),
Expand All @@ -115,7 +115,7 @@ fn actual_server_drop() {
let server_port = find_free_port();
let mut server = real_node.make_server(server_port);
let masquerader = JsonMasquerader::new();
let (stream_key, return_route_id) = arbitrary_context();
let stream_key = arbitrary_stream_key();
let index: u64 = 0;
request_server_payload(
index,
Expand All @@ -125,7 +125,6 @@ fn actual_server_drop() {
&mut server,
&masquerader,
stream_key,
return_route_id,
);
let index: u64 = 1;
request_server_payload(
Expand All @@ -136,7 +135,6 @@ fn actual_server_drop() {
&mut server,
&masquerader,
stream_key,
return_route_id,
);

server.shutdown();
Expand Down Expand Up @@ -174,7 +172,6 @@ fn request_server_payload(
server: &mut MASQNodeServer,
masquerader: &JsonMasquerader,
stream_key: StreamKey,
return_route_id: u32,
) {
mock_node
.transmit_package(
Expand All @@ -184,7 +181,6 @@ fn request_server_payload(
&mock_node,
&real_node,
stream_key,
return_route_id,
&server,
cluster.chain,
),
Expand Down Expand Up @@ -212,7 +208,7 @@ fn reported_client_drop() {
let server_port = find_free_port();
let mut server = real_node.make_server(server_port);
let masquerader = JsonMasquerader::new();
let (stream_key, return_route_id) = arbitrary_context();
let stream_key = arbitrary_stream_key();
let index: u64 = 0;
mock_node
.transmit_package(
Expand All @@ -222,7 +218,6 @@ fn reported_client_drop() {
&mock_node,
&real_node,
stream_key,
return_route_id,
&server,
cluster.chain,
),
Expand All @@ -240,7 +235,7 @@ fn reported_client_drop() {
mock_node
.transmit_package(
mock_node.port_list()[0],
create_client_drop_report(&mock_node, &real_node, stream_key, return_route_id),
create_client_drop_report(&mock_node, &real_node, stream_key),
&masquerader,
real_node.main_public_key(),
real_node.socket_addr(PortSelector::First),
Expand Down Expand Up @@ -322,40 +317,32 @@ fn full_neighbor(one: &mut NodeRecord, another: &mut NodeRecord) {
.unwrap();
}

fn context_from_request_lcp(
lcp: LiveCoresPackage,
originating_cryptde: &dyn CryptDE,
exit_cryptde: &dyn CryptDE,
) -> (StreamKey, u32) {
fn stream_key_from_request_lcp(lcp: LiveCoresPackage, exit_cryptde: &dyn CryptDE) -> StreamKey {
let payload = match decodex::<MessageType>(exit_cryptde, &lcp.payload).unwrap() {
MessageType::ClientRequest(vd) => vd
.extract(&node_lib::sub_lib::migrations::client_request_payload::MIGRATIONS)
.unwrap(),
mt => panic!("Unexpected: {:?}", mt),
};
let stream_key = payload.stream_key;
let return_route_id = decodex::<u32>(originating_cryptde, &lcp.route.hops[6]).unwrap();
(stream_key, return_route_id)
stream_key
}

fn arbitrary_context() -> (StreamKey, u32) {
(
StreamKey::make_meaningful_stream_key("arbitrary_context"),
12345678,
)
fn arbitrary_stream_key() -> StreamKey {
StreamKey::make_meaningful_stream_key("arbitrary_context")
}

fn create_request_icp(
index: u64,
originating_node: &MASQMockNode,
exit_node: &MASQRealNode,
stream_key: StreamKey,
return_route_id: u32,
server: &MASQNodeServer,
chain: Chain,
) -> IncipientCoresPackage {
let originating_main_cryptde = originating_node.main_cryptde_null().unwrap();
IncipientCoresPackage::new(
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -371,9 +358,8 @@ fn create_request_icp(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
originating_node.consuming_wallet(),
return_route_id,
Some(chain.rec().contract),
)
.unwrap(),
Expand All @@ -382,7 +368,7 @@ fn create_request_icp(
&ClientRequestPayload_0v1 {
stream_key,
sequenced_packet: SequencedPacket::new(Vec::from(HTTP_REQUEST), index, false),
target_hostname: Some(format!("{}", server.local_addr().ip())),
target_hostname: format!("{}", server.local_addr().ip()),
target_port: server.local_addr().port(),
protocol: ProxyProtocol::HTTP,
originator_public_key: originating_node.main_public_key().clone(),
Expand All @@ -400,8 +386,9 @@ fn create_meaningless_icp(
let socket_addr = SocketAddr::from_str("3.2.1.0:7654").unwrap();
let stream_key =
StreamKey::make_meaningful_stream_key("Chancellor on brink of second bailout for banks");
let main_cryptde = originating_node.main_cryptde_null().unwrap();
IncipientCoresPackage::new(
originating_node.main_cryptde_null().unwrap(),
main_cryptde,
Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -417,9 +404,8 @@ fn create_meaningless_icp(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
main_cryptde,
originating_node.consuming_wallet(),
1357,
Some(TEST_DEFAULT_MULTINODE_CHAIN.rec().contract),
)
.unwrap(),
Expand All @@ -428,7 +414,7 @@ fn create_meaningless_icp(
&ClientRequestPayload_0v1 {
stream_key,
sequenced_packet: SequencedPacket::new(Vec::from(HTTP_REQUEST), 0, false),
target_hostname: Some(format!("nowhere.com")),
target_hostname: "nowhere.com".to_string(),
target_port: socket_addr.port(),
protocol: ProxyProtocol::HTTP,
originator_public_key: originating_node.main_public_key().clone(),
Expand All @@ -443,8 +429,9 @@ fn create_server_drop_report(
exit_node: &MASQMockNode,
originating_node: &MASQRealNode,
stream_key: StreamKey,
return_route_id: u32,
) -> IncipientCoresPackage {
let exit_main_cryptde = exit_node.main_cryptde_null().unwrap();
let originating_main_cryptde = originating_node.main_cryptde_null().unwrap();
let mut route = Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -460,15 +447,12 @@ fn create_server_drop_report(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
originating_node.consuming_wallet(),
return_route_id,
Some(TEST_DEFAULT_MULTINODE_CHAIN.rec().contract),
)
.unwrap();
route
.shift(originating_node.main_cryptde_null().unwrap())
.unwrap();
route.shift(originating_main_cryptde).unwrap();
let payload = MessageType::ClientResponse(VersionedData::new(
&node_lib::sub_lib::migrations::client_response_payload::MIGRATIONS,
&ClientResponsePayload_0v1 {
Expand All @@ -478,7 +462,7 @@ fn create_server_drop_report(
));

IncipientCoresPackage::new(
exit_node.main_cryptde_null().unwrap(),
exit_main_cryptde,
route,
payload,
originating_node.alias_public_key(),
Expand All @@ -490,8 +474,8 @@ fn create_client_drop_report(
originating_node: &MASQMockNode,
exit_node: &MASQRealNode,
stream_key: StreamKey,
return_route_id: u32,
) -> IncipientCoresPackage {
let originating_main_cryptde = originating_node.main_cryptde_null().unwrap();
let route = Route::round_trip(
RouteSegment::new(
vec![
Expand All @@ -507,9 +491,8 @@ fn create_client_drop_report(
],
Component::ProxyServer,
),
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
originating_node.consuming_wallet(),
return_route_id,
Some(TEST_DEFAULT_MULTINODE_CHAIN.rec().contract),
)
.unwrap();
Expand All @@ -518,15 +501,15 @@ fn create_client_drop_report(
&ClientRequestPayload_0v1 {
stream_key,
sequenced_packet: SequencedPacket::new(vec![], 1, true),
target_hostname: Some(String::from("doesnt.matter.com")),
target_port: 80,
target_hostname: String::from("doesnt.matter.com"),
target_port: HTTP_PORT,
protocol: ProxyProtocol::HTTP,
originator_public_key: originating_node.main_public_key().clone(),
},
));

IncipientCoresPackage::new(
originating_node.main_cryptde_null().unwrap(),
originating_main_cryptde,
route,
payload,
exit_node.main_public_key(),
Expand Down
11 changes: 7 additions & 4 deletions multinode_integration_tests/tests/self_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use node_lib::sub_lib::dispatcher::Component;
use node_lib::sub_lib::hopper::IncipientCoresPackage;
use node_lib::sub_lib::route::Route;
use node_lib::sub_lib::route::RouteSegment;
use node_lib::sub_lib::stream_key::StreamKey;
use node_lib::test_utils::{make_meaningless_message_type, make_paying_wallet};
use std::collections::HashSet;
use std::io::ErrorKind;
Expand Down Expand Up @@ -68,6 +69,7 @@ fn server_relays_cores_package() {
let masquerader = JsonMasquerader::new();
let server = MASQCoresServer::new(cluster.chain);
let cryptde = server.main_cryptde();
let stream_key = StreamKey::make_meaningless_stream_key();
let mut client = MASQCoresClient::new(server.local_addr(), cryptde);
let mut route = Route::one_way(
RouteSegment::new(
Expand All @@ -82,7 +84,7 @@ fn server_relays_cores_package() {
let incipient = IncipientCoresPackage::new(
cryptde,
route.clone(),
make_meaningless_message_type(),
make_meaningless_message_type(stream_key),
&cryptde.public_key(),
)
.unwrap();
Expand All @@ -99,7 +101,7 @@ fn server_relays_cores_package() {

route.shift(cryptde).unwrap();
assert_eq!(expired.remaining_route, route);
assert_eq!(expired.payload, make_meaningless_message_type());
assert_eq!(expired.payload, make_meaningless_message_type(stream_key));
}

#[test]
Expand All @@ -111,6 +113,7 @@ fn one_mock_node_talks_to_another() {
let mock_node_1 = cluster.get_mock_node_by_name("mock_node_1").unwrap();
let mock_node_2 = cluster.get_mock_node_by_name("mock_node_2").unwrap();
let cryptde = CryptDENull::new(TEST_DEFAULT_CHAIN);
let stream_key = StreamKey::make_meaningless_stream_key();
let route = Route::one_way(
RouteSegment::new(
vec![
Expand All @@ -127,7 +130,7 @@ fn one_mock_node_talks_to_another() {
let incipient_cores_package = IncipientCoresPackage::new(
&cryptde,
route,
make_meaningless_message_type(),
make_meaningless_message_type(stream_key),
&mock_node_2.main_public_key(),
)
.unwrap();
Expand Down Expand Up @@ -156,7 +159,7 @@ fn one_mock_node_talks_to_another() {
assert_eq!(package_to, mock_node_2.socket_addr(PortSelector::First));
assert_eq!(
expired_cores_package.payload,
make_meaningless_message_type()
make_meaningless_message_type(stream_key)
);
}

Expand Down
Loading
Loading