From d5a03c9d86bf9e68cf6ef93c4cc427e3f8482178 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 10 Feb 2025 04:55:22 -0300 Subject: [PATCH] Add more range sync tests (#6872) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently we have very poor coverage of range sync with unit tests. With the event driven test framework we could cover much more ground and be confident when modifying the code. Add two basic cases: - Happy path, complete a finalized sync for 2 epochs - Post-PeerDAS case where we start without enough custody peers and later we find enough ⚠️ If you have ideas for more test cases, please let me know! I'll write them --- Cargo.lock | 2 + .../src/peer_manager/peerdb.rs | 2 +- beacon_node/network/Cargo.toml | 2 + beacon_node/network/src/sync/manager.rs | 15 +- .../src/sync/range_sync/chain_collection.rs | 2 +- .../network/src/sync/range_sync/range.rs | 5 + beacon_node/network/src/sync/tests/lookups.rs | 27 ++- beacon_node/network/src/sync/tests/mod.rs | 5 +- beacon_node/network/src/sync/tests/range.rs | 196 ++++++++++++++++-- 9 files changed, 223 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 219b6df0d90..70ff204b65d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6081,6 +6081,7 @@ dependencies = [ "hex", "igd-next 0.16.0", "itertools 0.10.5", + "k256 0.13.4", "kzg", "lighthouse_network", "logging", @@ -6090,6 +6091,7 @@ dependencies = [ "operation_pool", "parking_lot 0.12.3", "rand 0.8.5", + "rand_chacha 0.3.1", "serde_json", "slog", "slog-async", diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 37cb5df6ea5..8e5d6121e04 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -689,8 +689,8 @@ impl PeerDB { &mut self, supernode: bool, spec: &ChainSpec, + enr_key: CombinedKey, ) -> PeerId { - let enr_key = CombinedKey::generate_secp256k1(); let mut enr = Enr::builder().build(&enr_key).unwrap(); let peer_id = enr.peer_id(); diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 09179c4a516..5071e247a32 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -10,8 +10,10 @@ eth2 = { workspace = true } eth2_network_config = { workspace = true } genesis = { workspace = true } gossipsub = { workspace = true } +k256 = "0.13.4" kzg = { workspace = true } matches = "0.1.8" +rand_chacha = "0.3.1" serde_json = { workspace = true } slog-async = { workspace = true } slog-term = { workspace = true } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fc31e837277..14702d3536c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -344,6 +344,16 @@ impl SyncManager { self.range_sync.state() } + #[cfg(test)] + pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { + self.range_sync.state() + } + + #[cfg(test)] + pub(crate) fn __range_failed_chains(&mut self) -> Vec { + self.range_sync.__failed_chains() + } + #[cfg(test)] pub(crate) fn get_failed_chains(&mut self) -> Vec { self.block_lookups.get_failed_chains() @@ -368,11 +378,6 @@ impl SyncManager { self.sampling.get_request_status(block_root, index) } - #[cfg(test)] - pub(crate) fn range_sync_state(&self) -> super::range_sync::SyncChainStatus { - self.range_sync.state() - } - #[cfg(test)] pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) { self.handle_new_execution_engine_state(state); diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 15bdf85e203..40285309469 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -477,7 +477,7 @@ impl ChainCollection { .find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root)) { Some((&id, chain)) => { - debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); + debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, "id" => id); debug_assert_eq!(chain.target_head_root, target_head_root); debug_assert_eq!(chain.target_head_slot, target_head_slot); if let Err(remove_reason) = chain.add_peer(network, peer) { diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 78679403bb4..38b032136cb 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -94,6 +94,11 @@ where } } + #[cfg(test)] + pub(crate) fn __failed_chains(&mut self) -> Vec { + self.failed_chains.keys().copied().collect() + } + pub fn state(&self) -> SyncChainStatus { self.chains.state() } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 9ab581950c9..ea20141df63 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -27,6 +27,7 @@ use beacon_chain::{ PayloadVerificationOutcome, PayloadVerificationStatus, }; use beacon_processor::WorkEvent; +use lighthouse_network::discovery::CombinedKey; use lighthouse_network::{ rpc::{RPCError, RequestType, RpcErrorResponse}, service::api_types::{ @@ -39,18 +40,16 @@ use lighthouse_network::{ use slog::info; use slot_clock::{SlotClock, TestingSlotClock}; use tokio::sync::mpsc; -use types::ForkContext; use types::{ data_column_sidecar::ColumnIndex, test_utils::{SeedableRng, TestRandom, XorShiftRng}, - BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkName, Hash256, - MinimalEthSpec as E, SignedBeaconBlock, Slot, + BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName, + Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, }; const D: Duration = Duration::new(0, 0); const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; const SAMPLING_REQUIRED_SUCCESSES: usize = 2; - type DCByRootIds = Vec; type DCByRootId = (SyncRequestId, Vec); @@ -117,7 +116,9 @@ impl TestRig { let spec = chain.spec.clone(); - let rng = XorShiftRng::from_seed([42; 16]); + // deterministic seed + let rng = ChaCha20Rng::from_seed([0u8; 32]); + TestRig { beacon_processor_rx, beacon_processor_rx_queue: vec![], @@ -154,7 +155,7 @@ impl TestRig { } } - fn test_setup_after_fulu() -> Option { + pub fn test_setup_after_fulu() -> Option { let r = Self::test_setup(); if r.fork_name.fulu_enabled() { Some(r) @@ -369,20 +370,26 @@ impl TestRig { } pub fn new_connected_peer(&mut self) -> PeerId { + let key = self.determinstic_key(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(false, &self.harness.spec) + .__add_connected_peer_testing_only(false, &self.harness.spec, key) } pub fn new_connected_supernode_peer(&mut self) -> PeerId { + let key = self.determinstic_key(); self.network_globals .peers .write() - .__add_connected_peer_testing_only(true, &self.harness.spec) + .__add_connected_peer_testing_only(true, &self.harness.spec, key) + } + + fn determinstic_key(&mut self) -> CombinedKey { + k256::ecdsa::SigningKey::random(&mut self.rng).into() } - fn new_connected_peers_for_peerdas(&mut self) { + pub fn new_connected_peers_for_peerdas(&mut self) { // Enough sampling peers with few columns for _ in 0..100 { self.new_connected_peer(); @@ -1113,7 +1120,7 @@ impl TestRig { } #[track_caller] - fn expect_empty_network(&mut self) { + pub fn expect_empty_network(&mut self) { self.drain_network_rx(); if !self.network_rx_queue.is_empty() { let n = self.network_rx_queue.len(); diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 6ed5c7f8fab..ef2bec80b80 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -7,12 +7,13 @@ use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_processor::WorkEvent; use lighthouse_network::NetworkGlobals; +use rand_chacha::ChaCha20Rng; use slog::Logger; use slot_clock::ManualSlotClock; use std::sync::Arc; use store::MemoryStore; use tokio::sync::mpsc; -use types::{test_utils::XorShiftRng, ChainSpec, ForkName, MinimalEthSpec as E}; +use types::{ChainSpec, ForkName, MinimalEthSpec as E}; mod lookups; mod range; @@ -61,7 +62,7 @@ struct TestRig { /// Beacon chain harness harness: BeaconChainHarness>, /// `rng` for generating test blocks and blobs. - rng: XorShiftRng, + rng: ChaCha20Rng, fork_name: ForkName, log: Logger, spec: Arc, diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index f78b44308d1..ddd4626cce6 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -1,11 +1,14 @@ use super::*; +use crate::network_beacon_processor::ChainSegmentProcessId; use crate::status::ToStatusMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; +use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; use crate::sync::SyncMessage; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer}; +use beacon_processor::WorkType; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, OldBlocksByRangeRequestV2, @@ -18,8 +21,8 @@ use lighthouse_network::service::api_types::{ use lighthouse_network::{PeerId, SyncInfo}; use std::time::Duration; use types::{ - BlobSidecarList, BlockImportSource, EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, - SignedBeaconBlockHash, Slot, + BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, + SignedBeaconBlock, SignedBeaconBlockHash, Slot, }; const D: Duration = Duration::new(0, 0); @@ -43,7 +46,7 @@ enum ByRangeDataRequestIds { /// To make writting tests succint, the machinery in this testing rig automatically identifies /// _which_ request to complete. Picking the right request is critical for tests to pass, so this /// filter allows better expressivity on the criteria to identify the right request. -#[derive(Default)] +#[derive(Default, Debug, Clone)] struct RequestFilter { peer: Option, epoch: Option, @@ -74,7 +77,7 @@ impl TestRig { /// Produce a head peer with an advanced head fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId { let local_info = self.local_info(); - self.add_peer(SyncInfo { + self.add_random_peer(SyncInfo { head_root, head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64), ..local_info @@ -90,7 +93,7 @@ impl TestRig { fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId { let local_info = self.local_info(); let finalized_epoch = local_info.finalized_epoch + 2; - self.add_peer(SyncInfo { + self.add_random_peer(SyncInfo { finalized_epoch, finalized_root, head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), @@ -98,6 +101,17 @@ impl TestRig { }) } + fn finalized_remote_info_advanced_by(&self, advanced_epochs: Epoch) -> SyncInfo { + let local_info = self.local_info(); + let finalized_epoch = local_info.finalized_epoch + advanced_epochs; + SyncInfo { + finalized_epoch, + finalized_root: Hash256::random(), + head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), + head_root: Hash256::random(), + } + } + fn local_info(&self) -> SyncInfo { let StatusMessage { fork_digest: _, @@ -114,28 +128,59 @@ impl TestRig { } } - fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId { + fn add_random_peer_not_supernode(&mut self, remote_info: SyncInfo) -> PeerId { + let peer_id = self.new_connected_peer(); + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); + peer_id + } + + fn add_random_peer(&mut self, remote_info: SyncInfo) -> PeerId { // Create valid peer known to network globals // TODO(fulu): Using supernode peers to ensure we have peer across all column // subnets for syncing. Should add tests connecting to full node peers. let peer_id = self.new_connected_supernode_peer(); // Send peer to sync - self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone())); + self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info)); peer_id } + fn add_random_peers(&mut self, remote_info: SyncInfo, count: usize) { + for _ in 0..count { + let peer = self.new_connected_peer(); + self.add_peer(peer, remote_info.clone()); + } + } + + fn add_peer(&mut self, peer: PeerId, remote_info: SyncInfo) { + self.send_sync_message(SyncMessage::AddPeer(peer, remote_info)); + } + fn assert_state(&self, state: RangeSyncType) { assert_eq!( self.sync_manager .range_sync_state() .expect("State is ok") - .expect("Range should be syncing") + .expect("Range should be syncing, there are no chains") .0, state, "not expected range sync state" ); } + fn assert_no_chains_exist(&self) { + if let Some(chain) = self.sync_manager.get_range_sync_chains().unwrap() { + panic!("There still exists a chain {chain:?}"); + } + } + + fn assert_no_failed_chains(&mut self) { + assert_eq!( + self.sync_manager.__range_failed_chains(), + Vec::::new(), + "Expected no failed chains" + ) + } + #[track_caller] fn expect_chain_segments(&mut self, count: usize) { for i in 0..count { @@ -170,7 +215,7 @@ impl TestRig { true }; - let block_req_id = self + let block_req = self .pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { peer_id, @@ -182,7 +227,9 @@ impl TestRig { } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) - .expect("Should have a blocks by range request"); + .unwrap_or_else(|e| { + panic!("Should have a BlocksByRange request, filter {request_filter:?}: {e:?}") + }); let by_range_data_requests = if self.after_fulu() { let mut data_columns_requests = vec![]; @@ -200,7 +247,7 @@ impl TestRig { data_columns_requests.push(data_columns_request); } if data_columns_requests.is_empty() { - panic!("Found zero DataColumnsByRange requests"); + panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}"); } ByRangeDataRequestIds::PostPeerDAS(data_columns_requests) } else if self.after_deneb() { @@ -213,16 +260,21 @@ impl TestRig { } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) - .expect("Should have a blobs by range request"); + .unwrap_or_else(|e| { + panic!("Should have a blobs by range request, filter {request_filter:?}: {e:?}") + }); ByRangeDataRequestIds::PrePeerDAS(id, peer) } else { ByRangeDataRequestIds::PreDeneb }; - (block_req_id, by_range_data_requests) + (block_req, by_range_data_requests) } - fn find_and_complete_blocks_by_range_request(&mut self, request_filter: RequestFilter) { + fn find_and_complete_blocks_by_range_request( + &mut self, + request_filter: RequestFilter, + ) -> RangeRequestId { let ((blocks_req_id, block_peer), by_range_data_request_ids) = self.find_blocks_by_range_request(request_filter); @@ -266,6 +318,60 @@ impl TestRig { } } } + + blocks_req_id.parent_request_id.requester + } + + fn find_and_complete_processing_chain_segment(&mut self, id: ChainSegmentProcessId) { + self.pop_received_processor_event(|ev| { + (ev.work_type() == WorkType::ChainSegment).then_some(()) + }) + .unwrap_or_else(|e| panic!("Expected chain segment work event: {e}")); + + self.log(&format!( + "Completing ChainSegment processing work {id:?} with success" + )); + self.send_sync_message(SyncMessage::BatchProcessed { + sync_type: id, + result: crate::sync::BatchProcessResult::Success { + sent_blocks: 8, + imported_blocks: 8, + }, + }); + } + + fn complete_and_process_range_sync_until( + &mut self, + last_epoch: u64, + request_filter: RequestFilter, + ) { + for epoch in 0..last_epoch { + // Note: In this test we can't predict the block peer + let id = + self.find_and_complete_blocks_by_range_request(request_filter.clone().epoch(epoch)); + if let RangeRequestId::RangeSync { batch_id, .. } = id { + assert_eq!(batch_id.as_u64(), epoch, "Unexpected batch_id"); + } else { + panic!("unexpected RangeRequestId {id:?}"); + } + + let id = match id { + RangeRequestId::RangeSync { chain_id, batch_id } => { + ChainSegmentProcessId::RangeBatchId(chain_id, batch_id) + } + RangeRequestId::BackfillSync { batch_id } => { + ChainSegmentProcessId::BackSyncBatchId(batch_id) + } + }; + + self.find_and_complete_processing_chain_segment(id); + if epoch < last_epoch - 1 { + self.assert_state(RangeSyncType::Finalized); + } else { + self.assert_no_chains_exist(); + self.assert_no_failed_chains(); + } + } } async fn create_canonical_block(&mut self) -> (SignedBeaconBlock, Option>) { @@ -442,3 +548,65 @@ fn pause_and_resume_on_ee_offline() { // The head chain and finalized chain (2) should be in the processing queue rig.expect_chain_segments(2); } + +/// To attempt to finalize the peer's status finalized checkpoint we synced to its finalized epoch + +/// 2 epochs + 1 slot. +const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1; + +#[test] +fn finalized_sync_enough_global_custody_peers_few_chain_peers() { + // Run for all forks + let mut r = TestRig::test_setup(); + // This test creates enough global custody peers to satisfy column queries but only adds few + // peers to the chain + r.new_connected_peers_for_peerdas(); + + let advanced_epochs: u64 = 2; + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + + // Current priorization only sends batches to idle peers, so we need enough peers for each batch + // TODO: Test this with a single peer in the chain, it should still work + r.add_random_peers( + remote_info, + (advanced_epochs + EXTRA_SYNCED_EPOCHS) as usize, + ); + r.assert_state(RangeSyncType::Finalized); + + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); +} + +#[test] +fn finalized_sync_not_enough_custody_peers_on_start() { + let mut r = TestRig::test_setup(); + // Only run post-PeerDAS + if !r.fork_name.fulu_enabled() { + return; + } + + let advanced_epochs: u64 = 2; + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + + // Unikely that the single peer we added has enough columns for us. Tests are determinstic and + // this error should never be hit + r.add_random_peer_not_supernode(remote_info.clone()); + r.assert_state(RangeSyncType::Finalized); + + // Because we don't have enough peers on all columns we haven't sent any request. + // NOTE: There's a small chance that this single peer happens to custody exactly the set we + // expect, in that case the test will fail. Find a way to make the test deterministic. + r.expect_empty_network(); + + // Generate enough peers and supernodes to cover all custody columns + r.new_connected_peers_for_peerdas(); + // Note: not necessary to add this peers to the chain, as we draw from the global pool + // We still need to add enough peers to trigger batch downloads with idle peers. Same issue as + // the test above. + r.add_random_peers( + remote_info, + (advanced_epochs + EXTRA_SYNCED_EPOCHS - 1) as usize, + ); + + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); +}