From 3d06bc26d159536645d994121dcfffba1801074a Mon Sep 17 00:00:00 2001 From: chonghe <44791194+chong-he@users.noreply.github.com> Date: Tue, 4 Feb 2025 14:43:37 +0800 Subject: [PATCH] Add test to beacon node fallback feature (#6568) --- Cargo.lock | 67 +++++++ Cargo.toml | 5 + testing/validator_test_rig/Cargo.toml | 14 ++ testing/validator_test_rig/src/lib.rs | 1 + .../src/mock_beacon_node.rs | 132 +++++++++++++ validator_client/Cargo.toml | 3 - .../beacon_node_fallback/Cargo.toml | 4 + .../beacon_node_fallback/src/lib.rs | 176 +++++++++++++++++- validator_client/src/lib.rs | 16 +- 9 files changed, 405 insertions(+), 13 deletions(-) create mode 100644 testing/validator_test_rig/Cargo.toml create mode 100644 testing/validator_test_rig/src/lib.rs create mode 100644 testing/validator_test_rig/src/mock_beacon_node.rs diff --git a/Cargo.lock b/Cargo.lock index 5707d219536..4581fb9ce05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,6 +543,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "155a5a185e42c6b77ac7b88a15143d930a9e9727a5b7b77eed417404ab15c247" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -889,6 +899,7 @@ dependencies = [ "eth2", "futures", "itertools 0.10.5", + "logging", "serde", "slog", "slot_clock", @@ -896,6 +907,7 @@ dependencies = [ "tokio", "types", "validator_metrics", + "validator_test_rig", ] [[package]] @@ -1485,6 +1497,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "colored" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" +dependencies = [ + "lazy_static", + "windows-sys 0.59.0", +] + [[package]] name = "compare_fields" version = "0.2.0" @@ -5794,6 +5816,30 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0" +[[package]] +name = "mockito" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "652cd6d169a36eaf9d1e6bce1a221130439a966d7f27858af66a33a66e9c4ee2" +dependencies = [ + "assert-json-diff", + "bytes", + "colored", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "log", + "rand 0.8.5", + "regex", + "serde_json", + "serde_urlencoded", + "similar", + "tokio", +] + [[package]] name = "moka" version = "0.12.10" @@ -8175,6 +8221,12 @@ dependencies = [ "validator_metrics", ] +[[package]] +name = "similar" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1de1d4f81173b03af4c0cbed3c898f6bff5b870e4a7f5d6f4057d62a7a4b686e" + [[package]] name = "simple_asn1" version = "0.6.3" @@ -9090,6 +9142,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -9868,6 +9921,20 @@ dependencies = [ "validator_metrics", ] +[[package]] +name = "validator_test_rig" +version = "0.1.0" +dependencies = [ + "eth2", + "logging", + "mockito", + "regex", + "sensitive_url", + "serde_json", + "slog", + "types", +] + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index 86186da17d2..73912f60822 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,8 +86,10 @@ members = [ "testing/simulator", "testing/state_transition_vectors", "testing/test-test_logger", + "testing/validator_test_rig", "testing/web3signer_tests", + "validator_client", "validator_client/beacon_node_fallback", "validator_client/doppelganger_service", @@ -155,6 +157,7 @@ log = "0.4" lru = "0.12" maplit = "1" milhouse = "0.3" +mockito = "1.5.0" num_cpus = "1" parking_lot = "0.12" paste = "1" @@ -261,6 +264,7 @@ malloc_utils = { path = "common/malloc_utils" } merkle_proof = { path = "consensus/merkle_proof" } monitoring_api = { path = "common/monitoring_api" } network = { path = "beacon_node/network" } +node_test_rig = { path = "testing/node_test_rig" } operation_pool = { path = "beacon_node/operation_pool" } pretty_reqwest_error = { path = "common/pretty_reqwest_error" } proto_array = { path = "consensus/proto_array" } @@ -283,6 +287,7 @@ validator_http_api = { path = "validator_client/http_api" } validator_http_metrics = { path = "validator_client/http_metrics" } validator_metrics = { path = "validator_client/validator_metrics" } validator_store = { path = "validator_client/validator_store" } +validator_test_rig = { path = "testing/validator_test_rig" } warp_utils = { path = "common/warp_utils" } xdelta3 = { git = "http://github.com/sigp/xdelta3-rs", rev = "50d63cdf1878e5cf3538e9aae5eed34a22c64e4a" } zstd = "0.13" diff --git a/testing/validator_test_rig/Cargo.toml b/testing/validator_test_rig/Cargo.toml new file mode 100644 index 00000000000..76560b8afc5 --- /dev/null +++ b/testing/validator_test_rig/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "validator_test_rig" +version = "0.1.0" +edition = { workspace = true } + +[dependencies] +eth2 = { workspace = true } +logging = { workspace = true } +mockito = { workspace = true } +regex = { workspace = true } +sensitive_url = { workspace = true } +serde_json = { workspace = true } +slog = { workspace = true } +types = { workspace = true } diff --git a/testing/validator_test_rig/src/lib.rs b/testing/validator_test_rig/src/lib.rs new file mode 100644 index 00000000000..a0a979dfc88 --- /dev/null +++ b/testing/validator_test_rig/src/lib.rs @@ -0,0 +1 @@ +pub mod mock_beacon_node; diff --git a/testing/validator_test_rig/src/mock_beacon_node.rs b/testing/validator_test_rig/src/mock_beacon_node.rs new file mode 100644 index 00000000000..f875116155a --- /dev/null +++ b/testing/validator_test_rig/src/mock_beacon_node.rs @@ -0,0 +1,132 @@ +use eth2::types::{GenericResponse, SyncingData}; +use eth2::{BeaconNodeHttpClient, StatusCode, Timeouts}; +use logging::test_logger; +use mockito::{Matcher, Mock, Server, ServerGuard}; +use regex::Regex; +use sensitive_url::SensitiveUrl; +use slog::{info, Logger}; +use std::marker::PhantomData; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use types::{ChainSpec, ConfigAndPreset, EthSpec, SignedBlindedBeaconBlock}; + +pub struct MockBeaconNode { + server: ServerGuard, + pub beacon_api_client: BeaconNodeHttpClient, + log: Logger, + _phantom: PhantomData, + pub received_blocks: Arc>>>, +} + +impl MockBeaconNode { + pub async fn new() -> Self { + // mock server logging + let server = Server::new_async().await; + let beacon_api_client = BeaconNodeHttpClient::new( + SensitiveUrl::from_str(&server.url()).unwrap(), + Timeouts::set_all(Duration::from_secs(1)), + ); + let log = test_logger(); + Self { + server, + beacon_api_client, + log, + _phantom: PhantomData, + received_blocks: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Resets all mocks + #[allow(dead_code)] + pub fn reset_mocks(&mut self) { + self.server.reset(); + } + + pub fn mock_config_spec(&mut self, spec: &ChainSpec) { + let path_pattern = Regex::new(r"^/eth/v1/config/spec$").unwrap(); + let config_and_preset = ConfigAndPreset::from_chain_spec::(spec, None); + let data = GenericResponse::from(config_and_preset); + self.server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(200) + .with_body(serde_json::to_string(&data).unwrap()) + .create(); + } + + pub fn mock_get_node_syncing(&mut self, response: SyncingData) { + let path_pattern = Regex::new(r"^/eth/v1/node/syncing$").unwrap(); + + let data = GenericResponse::from(response); + + self.server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(200) + .with_body(serde_json::to_string(&data).unwrap()) + .create(); + } + + /// Mocks the `post_beacon_blinded_blocks_v2_ssz` response with an optional `delay`. + pub fn mock_post_beacon_blinded_blocks_v2_ssz(&mut self, delay: Duration) -> Mock { + let path_pattern = Regex::new(r"^/eth/v2/beacon/blinded_blocks$").unwrap(); + let log = self.log.clone(); + let url = self.server.url(); + + let received_blocks = Arc::clone(&self.received_blocks); + + self.server + .mock("POST", Matcher::Regex(path_pattern.to_string())) + .match_header("content-type", "application/octet-stream") + .with_status(200) + .with_body_from_request(move |request| { + info!( + log, + "{}", + format!( + "Received published block request on server {} with delay {} s", + url, + delay.as_secs(), + ) + ); + + let body = request.body().expect("Failed to get request body"); + let block: SignedBlindedBeaconBlock = + SignedBlindedBeaconBlock::any_from_ssz_bytes(body) + .expect("Failed to deserialize body as SignedBlindedBeaconBlock"); + + received_blocks.lock().unwrap().push(block); + + std::thread::sleep(delay); + vec![] + }) + .create() + } + + pub fn mock_offline_node(&mut self) -> Mock { + let path_pattern = Regex::new(r"^/eth/v1/node/version$").unwrap(); + + self.server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(StatusCode::INTERNAL_SERVER_ERROR.as_u16() as usize) + .with_header("content-type", "application/json") + .with_body(r#"{"message":"Internal Server Error"}"#) + .create() + } + + pub fn mock_online_node(&mut self) -> Mock { + let path_pattern = Regex::new(r"^/eth/v1/node/version$").unwrap(); + + self.server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#"{ + "data": { + "version": "lighthouse-mock" + } + }"#, + ) + .create() + } +} diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 504d96ae1c1..fb6007b00a6 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -8,9 +8,6 @@ edition = { workspace = true } name = "validator_client" path = "src/lib.rs" -[dev-dependencies] -tokio = { workspace = true } - [dependencies] account_utils = { workspace = true } beacon_node_fallback = { workspace = true } diff --git a/validator_client/beacon_node_fallback/Cargo.toml b/validator_client/beacon_node_fallback/Cargo.toml index 2c30290110f..598020d1379 100644 --- a/validator_client/beacon_node_fallback/Cargo.toml +++ b/validator_client/beacon_node_fallback/Cargo.toml @@ -21,3 +21,7 @@ strum = { workspace = true } tokio = { workspace = true } types = { workspace = true } validator_metrics = { workspace = true } + +[dev-dependencies] +logging = { workspace = true } +validator_test_rig = { workspace = true } diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 839e60d0116..abcf74a1a62 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -752,8 +752,12 @@ mod tests { use crate::beacon_node_health::BeaconNodeHealthTier; use eth2::SensitiveUrl; use eth2::Timeouts; + use logging::test_logger; + use slot_clock::TestingSlotClock; use strum::VariantNames; - use types::{MainnetEthSpec, Slot}; + use types::{BeaconBlockDeneb, MainnetEthSpec, Slot}; + use types::{EmptyBlock, Signature, SignedBeaconBlockDeneb, SignedBlindedBeaconBlock}; + use validator_test_rig::mock_beacon_node::MockBeaconNode; type E = MainnetEthSpec; @@ -772,7 +776,7 @@ mod tests { #[tokio::test] async fn check_candidate_order() { - // These fields is irrelvant for sorting. They are set to arbitrary values. + // These fields are irrelevant for sorting. They are set to arbitrary values. let head = Slot::new(99); let optimistic_status = IsOptimistic::No; let execution_status = ExecutionEngineHealth::Healthy; @@ -880,4 +884,172 @@ mod tests { assert_eq!(candidates, expected_candidates); } + + async fn new_mock_beacon_node( + index: usize, + spec: &ChainSpec, + ) -> (MockBeaconNode, CandidateBeaconNode) { + let mut mock_beacon_node = MockBeaconNode::::new().await; + mock_beacon_node.mock_config_spec(spec); + + let beacon_node = + CandidateBeaconNode::::new(mock_beacon_node.beacon_api_client.clone(), index); + + (mock_beacon_node, beacon_node) + } + + fn create_beacon_node_fallback( + candidates: Vec>, + topics: Vec, + spec: Arc, + log: Logger, + ) -> BeaconNodeFallback { + let mut beacon_node_fallback = + BeaconNodeFallback::new(candidates, Config::default(), topics, spec, log); + + beacon_node_fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + beacon_node_fallback + } + + #[tokio::test] + async fn update_all_candidates_should_update_sync_status() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + // Put this out of order to be sorted later + vec![ + beacon_node_2.clone(), + beacon_node_3.clone(), + beacon_node_1.clone(), + ], + vec![], + spec.clone(), + test_logger(), + ); + + // BeaconNodeHealthTier 1 + mock_beacon_node_1.mock_get_node_syncing(eth2::types::SyncingData { + is_syncing: false, + is_optimistic: false, + el_offline: false, + head_slot: Slot::new(1), + sync_distance: Slot::new(0), + }); + // BeaconNodeHealthTier 3 + mock_beacon_node_2.mock_get_node_syncing(eth2::types::SyncingData { + is_syncing: false, + is_optimistic: false, + el_offline: true, + head_slot: Slot::new(1), + sync_distance: Slot::new(0), + }); + // BeaconNodeHealthTier 5 + mock_beacon_node_3.mock_get_node_syncing(eth2::types::SyncingData { + is_syncing: false, + is_optimistic: true, + el_offline: false, + head_slot: Slot::new(1), + sync_distance: Slot::new(0), + }); + + beacon_node_fallback.update_all_candidates().await; + + let candidates = beacon_node_fallback.candidates.read().await; + assert_eq!( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + *candidates + ); + } + + #[tokio::test] + async fn broadcast_should_send_to_all_bns() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2], + vec![ApiTopic::Blocks], + spec.clone(), + test_logger(), + ); + + mock_beacon_node_1.mock_post_beacon_blinded_blocks_v2_ssz(Duration::from_secs(0)); + mock_beacon_node_2.mock_post_beacon_blinded_blocks_v2_ssz(Duration::from_secs(0)); + + let signed_block = SignedBlindedBeaconBlock::::Deneb(SignedBeaconBlockDeneb { + message: BeaconBlockDeneb::empty(&spec), + signature: Signature::empty(), + }); + + // trigger broadcast to `post_beacon_blinded_blocks_v2` + let result = beacon_node_fallback + .broadcast(|client| { + let signed_block_cloned = signed_block.clone(); + async move { + client + .post_beacon_blinded_blocks_v2_ssz(&signed_block_cloned, None) + .await + } + }) + .await; + + assert!(result.is_ok()); + + let received_blocks_from_bn_1 = mock_beacon_node_1.received_blocks.lock().unwrap(); + let received_blocks_from_bn_2 = mock_beacon_node_2.received_blocks.lock().unwrap(); + assert_eq!(received_blocks_from_bn_1.len(), 1); + assert_eq!(received_blocks_from_bn_2.len(), 1); + } + + #[tokio::test] + async fn first_success_should_try_nodes_in_order() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + test_logger(), + ); + + let mock1 = mock_beacon_node_1.mock_offline_node(); + let mock2 = mock_beacon_node_2.mock_offline_node(); + let mock3 = mock_beacon_node_3.mock_online_node(); + + let result_success = beacon_node_fallback + .first_success(|client| async move { client.get_node_version().await }) + .await; + + // mock3 expects to be called once since it is online in the first pass + mock3.expect(1).assert(); + assert!(result_success.is_ok()); + + // make all beacon node offline and the result should error + let _mock3 = mock_beacon_node_3.mock_offline_node(); + + let result_failure = beacon_node_fallback + .first_success(|client| async move { client.get_node_version().await }) + .await; + + assert!(result_failure.is_err()); + + // Both mock1 and mock2 should be called 3 times: + // - the first time is for the result_success case, + // - the second time is when it calls all 3 mock beacon nodes and all fails in the first pass, + // - which gives the third call because the function gives a second pass if no candidates succeeded in the first pass + mock1.expect(3).assert(); + mock2.expect(3).assert(); + } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 5f69bf125e2..70236d6a3cc 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -203,15 +203,15 @@ impl ProductionValidatorClient { config.initialized_validators.clone(), log.clone(), ) - .await - .map_err(|e| { - match e { - UnableToOpenVotingKeystore(err) => { - format!("Unable to initialize validators: {:?}. If you have recently moved the location of your data directory \ + .await + .map_err(|e| { + match e { + UnableToOpenVotingKeystore(err) => { + format!("Unable to initialize validators: {:?}. If you have recently moved the location of your data directory \ make sure to update the location of voting_keystore_path in your validator_definitions.yml", err) - }, - err => { - format!("Unable to initialize validators: {:?}", err)} + }, + err => { + format!("Unable to initialize validators: {:?}", err)} } })?;