From 2193f6a4d4634ec1b86991d211bd57a03cceba0b Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 5 Feb 2025 04:08:28 -0300 Subject: [PATCH] Add individual by_range sync requests (#6497) Part of - https://github.com/sigp/lighthouse/issues/6258 To address PeerDAS sync issues we need to make individual by_range requests within a batch retriable. We should adopt the same pattern for lookup sync where each request (block/blobs/columns) is tracked individually within a "meta" request that group them all and handles retry logic. - Building on https://github.com/sigp/lighthouse/pull/6398 second step is to add individual request accumulators for `blocks_by_range`, `blobs_by_range`, and `data_columns_by_range`. This will allow each request to progress independently and be retried separately. Most of the logic is just piping, excuse the large diff. This PR does not change the logic of how requests are handled or retried. This will be done in a future PR changing the logic of `RangeBlockComponentsRequest`. ### Before - Sync manager receives block with `SyncRequestId::RangeBlockAndBlobs` - Insert block into `SyncNetworkContext::range_block_components_requests` - (If received stream terminators of all requests) - Return `Vec`, and insert into `range_sync` ### Now - Sync manager receives block with `SyncRequestId::RangeBlockAndBlobs` - Insert block into `SyncNetworkContext:: blocks_by_range_requests` - (If received stream terminator of this request) - Return `Vec`, and insert into `SyncNetworkContext::components_by_range_requests ` - (If received a result for all requests) - Return `Vec`, and insert into `range_sync` --- .../lighthouse_network/src/rpc/methods.rs | 21 + .../src/rpc/self_limiter.rs | 22 +- .../src/service/api_types.rs | 58 +- beacon_node/network/src/router.rs | 2 +- .../src/sync/block_sidecar_coupling.rs | 170 +++--- beacon_node/network/src/sync/manager.rs | 203 ++++--- .../network/src/sync/network_context.rs | 565 ++++++++++-------- .../src/sync/network_context/requests.rs | 11 +- .../requests/blobs_by_range.rs | 56 ++ .../network_context/requests/blobs_by_root.rs | 2 +- .../requests/blocks_by_range.rs | 48 ++ .../requests/data_columns_by_range.rs | 54 ++ .../requests/data_columns_by_root.rs | 5 +- .../network/src/sync/range_sync/batch.rs | 32 +- beacon_node/network/src/sync/tests/range.rs | 29 +- 15 files changed, 776 insertions(+), 502 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs create mode 100644 beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs create mode 100644 beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index ad6bea455ec..2f6200a836b 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -411,6 +411,27 @@ impl OldBlocksByRangeRequest { } } +impl From for OldBlocksByRangeRequest { + fn from(req: BlocksByRangeRequest) -> Self { + match req { + BlocksByRangeRequest::V1(ref req) => { + OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }) + } + BlocksByRangeRequest::V2(ref req) => { + OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }) + } + } + } +} + /// Request a number of beacon block bodies from a peer. #[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))] #[derive(Clone, Debug, PartialEq)] diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index e0c8593f29f..ae63e5cdb5a 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -217,7 +217,7 @@ mod tests { use crate::rpc::rate_limiter::Quota; use crate::rpc::self_limiter::SelfRateLimiter; use crate::rpc::{Ping, Protocol, RequestType}; - use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId}; + use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId}; use libp2p::PeerId; use std::time::Duration; use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot}; @@ -238,12 +238,16 @@ mod tests { let mut limiter: SelfRateLimiter = SelfRateLimiter::new(config, fork_context, log).unwrap(); let peer_id = PeerId::random(); + let lookup_id = 0; for i in 1..=5u32 { let _ = limiter.allows( peer_id, - RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { - id: i, + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + lookup_id, + req_id: i, + }, })), RequestType::Ping(Ping { data: i as u64 }), ); @@ -261,9 +265,9 @@ mod tests { for i in 2..=5u32 { assert!(matches!( iter.next().unwrap().request_id, - RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { - id, - })) if id == i + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + })) if req_id == i, )); } @@ -286,9 +290,9 @@ mod tests { for i in 3..=5 { assert!(matches!( iter.next().unwrap().request_id, - RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { - id - })) if id == i + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + })) if req_id == i, )); } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 85fabbb0c3c..800d988d1a9 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap, + BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, }; @@ -31,8 +31,12 @@ pub enum SyncRequestId { SingleBlob { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), - /// Range request that is composed by both a block range request and a blob range request. - RangeBlockAndBlobs { id: Id }, + /// Blocks by range request + BlocksByRange(BlocksByRangeRequestId), + /// Blobs by range request + BlobsByRange(BlobsByRangeRequestId), + /// Data columns by range request + DataColumnsByRange(DataColumnsByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -43,12 +47,60 @@ pub struct DataColumnsByRootRequestId { pub requester: DataColumnsByRootRequester, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct BlocksByRangeRequestId { + /// Id to identify this attempt at a blocks_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct BlobsByRangeRequestId { + /// Id to identify this attempt at a blobs_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRangeRequestId { + /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + +/// Block components by range request for range sync. Includes an ID for downstream consumers to +/// handle retries and tie all their sub requests together. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct ComponentsByRangeRequestId { + /// Each `RangeRequestId` may request the same data in a later retry. This Id identifies the + /// current attempt. + pub id: Id, + /// What sync component is issuing a components by range request and expecting data back + pub requester: RangeRequestId, +} + +/// Range sync chain or backfill batch +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum RangeRequestId { + RangeSync { chain_id: Id, batch_id: Epoch }, + BackfillSync { batch_id: Epoch }, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), Custody(CustodyId), } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum RangeRequester { + RangeSync { chain_id: u64, batch_id: Epoch }, + BackfillSync { batch_id: Epoch }, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct SamplingId { pub id: SamplingRequester, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 41b9f2c91ed..36e5c391e91 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -624,7 +624,7 @@ impl Router { ) { let request_id = match request_id { AppRequestId::Sync(sync_id) => match sync_id { - id @ SyncRequestId::RangeBlockAndBlobs { .. } => id, + id @ SyncRequestId::BlocksByRange { .. } => id, other => { crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other); return; diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 70a3fe4f5aa..6c8a8eab633 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,7 +1,6 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; -use lighthouse_network::PeerId; use std::{ collections::{HashMap, VecDeque}, sync::Arc, @@ -29,9 +28,6 @@ pub struct RangeBlockComponentsRequest { /// Used to determine if the number of data columns stream termination this accumulator should /// wait for. This may be less than the number of `expects_custody_columns` due to request batching. num_custody_column_requests: Option, - /// The peers the request was made to. - pub(crate) peer_ids: Vec, - max_blobs_per_block: usize, } impl RangeBlockComponentsRequest { @@ -39,8 +35,6 @@ impl RangeBlockComponentsRequest { expects_blobs: bool, expects_custody_columns: Option>, num_custody_column_requests: Option, - peer_ids: Vec, - max_blobs_per_block: usize, ) -> Self { Self { blocks: <_>::default(), @@ -52,50 +46,42 @@ impl RangeBlockComponentsRequest { expects_blobs, expects_custody_columns, num_custody_column_requests, - peer_ids, - max_blobs_per_block, } } - // TODO: This function should be deprecated when simplying the retry mechanism of this range - // requests. - pub fn get_requirements(&self) -> (bool, Option>) { - (self.expects_blobs, self.expects_custody_columns.clone()) - } - - pub fn add_block_response(&mut self, block_opt: Option>>) { - match block_opt { - Some(block) => self.blocks.push_back(block), - None => self.is_blocks_stream_terminated = true, + pub fn add_blocks(&mut self, blocks: Vec>>) { + for block in blocks { + self.blocks.push_back(block); } + self.is_blocks_stream_terminated = true; } - pub fn add_sidecar_response(&mut self, sidecar_opt: Option>>) { - match sidecar_opt { - Some(sidecar) => self.blobs.push_back(sidecar), - None => self.is_sidecars_stream_terminated = true, + pub fn add_blobs(&mut self, blobs: Vec>>) { + for blob in blobs { + self.blobs.push_back(blob); } + self.is_sidecars_stream_terminated = true; } - pub fn add_data_column(&mut self, column_opt: Option>>) { - match column_opt { - Some(column) => self.data_columns.push_back(column), - // TODO(das): this mechanism is dangerous, if somehow there are two requests for the - // same column index it can terminate early. This struct should track that all requests - // for all custody columns terminate. - None => self.custody_columns_streams_terminated += 1, + pub fn add_custody_columns(&mut self, columns: Vec>>) { + for column in columns { + self.data_columns.push_back(column); } + // TODO(das): this mechanism is dangerous, if somehow there are two requests for the + // same column index it can terminate early. This struct should track that all requests + // for all custody columns terminate. + self.custody_columns_streams_terminated += 1; } pub fn into_responses(self, spec: &ChainSpec) -> Result>, String> { if let Some(expects_custody_columns) = self.expects_custody_columns.clone() { self.into_responses_with_custody_columns(expects_custody_columns, spec) } else { - self.into_responses_with_blobs() + self.into_responses_with_blobs(spec) } } - fn into_responses_with_blobs(self) -> Result>, String> { + fn into_responses_with_blobs(self, spec: &ChainSpec) -> Result>, String> { let RangeBlockComponentsRequest { blocks, blobs, .. } = self; // There can't be more more blobs than blocks. i.e. sending any blob (empty @@ -103,7 +89,8 @@ impl RangeBlockComponentsRequest { let mut responses = Vec::with_capacity(blocks.len()); let mut blob_iter = blobs.into_iter().peekable(); for block in blocks.into_iter() { - let mut blob_list = Vec::with_capacity(self.max_blobs_per_block); + let max_blobs_per_block = spec.max_blobs_per_block(block.epoch()) as usize; + let mut blob_list = Vec::with_capacity(max_blobs_per_block); while { let pair_next_blob = blob_iter .peek() @@ -114,7 +101,7 @@ impl RangeBlockComponentsRequest { blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?); } - let mut blobs_buffer = vec![None; self.max_blobs_per_block]; + let mut blobs_buffer = vec![None; max_blobs_per_block]; for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { @@ -128,7 +115,7 @@ impl RangeBlockComponentsRequest { } let blobs = RuntimeVariableList::new( blobs_buffer.into_iter().flatten().collect::>(), - self.max_blobs_per_block, + max_blobs_per_block, ) .map_err(|_| "Blobs returned exceeds max length".to_string())?; responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?) @@ -246,30 +233,25 @@ mod tests { use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; - use lighthouse_network::PeerId; use rand::SeedableRng; - use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E}; + use std::sync::Arc; + use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; #[test] fn no_blobs_into_responses() { let spec = test_spec::(); - let peer_id = PeerId::random(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng, &spec) .0 + .into() }) - .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize; - let mut info = - RangeBlockComponentsRequest::::new(false, None, None, vec![peer_id], max_len); + .collect::>>>(); + let mut info = RangeBlockComponentsRequest::::new(false, None, None); // Send blocks and complete terminate response - for block in blocks { - info.add_block_response(Some(block.into())); - } - info.add_block_response(None); + info.add_blocks(blocks); // Assert response is finished and RpcBlocks can be constructed assert!(info.is_finished()); @@ -279,7 +261,6 @@ mod tests { #[test] fn empty_blobs_into_responses() { let spec = test_spec::(); - let peer_id = PeerId::random(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -291,19 +272,15 @@ mod tests { &spec, ) .0 + .into() }) - .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize; - let mut info = - RangeBlockComponentsRequest::::new(true, None, None, vec![peer_id], max_len); + .collect::>>>(); + let mut info = RangeBlockComponentsRequest::::new(true, None, None); // Send blocks and complete terminate response - for block in blocks { - info.add_block_response(Some(block.into())); - } - info.add_block_response(None); + info.add_blocks(blocks); // Expect no blobs returned - info.add_sidecar_response(None); + info.add_blobs(vec![]); // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. // This makes sure we don't expect blobs here when they have expired. Checking this logic should @@ -316,7 +293,6 @@ mod tests { fn rpc_block_with_custody_columns() { let spec = test_spec::(); let expects_custody_columns = vec![1, 2, 3, 4]; - let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -328,34 +304,24 @@ mod tests { ) }) .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize; let mut info = RangeBlockComponentsRequest::::new( false, Some(expects_custody_columns.clone()), Some(expects_custody_columns.len()), - vec![PeerId::random()], - max_len, ); // Send blocks and complete terminate response - for block in &blocks { - info.add_block_response(Some(block.0.clone().into())); - } - info.add_block_response(None); + info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect()); // Assert response is not finished assert!(!info.is_finished()); - // Send data columns interleaved - for block in &blocks { - for column in &block.1 { - if expects_custody_columns.contains(&column.index) { - info.add_data_column(Some(column.clone())); - } - } - } - - // Terminate the requests - for (i, _column_index) in expects_custody_columns.iter().enumerate() { - info.add_data_column(None); + // Send data columns + for (i, &column_index) in expects_custody_columns.iter().enumerate() { + info.add_custody_columns( + blocks + .iter() + .flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned()) + .collect(), + ); if i < expects_custody_columns.len() - 1 { assert!( @@ -377,8 +343,21 @@ mod tests { #[test] fn rpc_block_with_custody_columns_batched() { let spec = test_spec::(); - let expects_custody_columns = vec![1, 2, 3, 4]; - let num_of_data_column_requests = 2; + let batched_column_requests = [vec![1_u64, 2], vec![3, 4]]; + let expects_custody_columns = batched_column_requests + .iter() + .flatten() + .cloned() + .collect::>(); + let custody_column_request_ids = + (0..batched_column_requests.len() as u32).collect::>(); + let num_of_data_column_requests = custody_column_request_ids.len(); + + let mut info = RangeBlockComponentsRequest::::new( + false, + Some(expects_custody_columns.clone()), + Some(num_of_data_column_requests), + ); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) @@ -391,34 +370,25 @@ mod tests { ) }) .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize; - let mut info = RangeBlockComponentsRequest::::new( - false, - Some(expects_custody_columns.clone()), - Some(num_of_data_column_requests), - vec![PeerId::random()], - max_len, - ); + // Send blocks and complete terminate response - for block in &blocks { - info.add_block_response(Some(block.0.clone().into())); - } - info.add_block_response(None); + info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect()); // Assert response is not finished assert!(!info.is_finished()); - // Send data columns interleaved - for block in &blocks { - for column in &block.1 { - if expects_custody_columns.contains(&column.index) { - info.add_data_column(Some(column.clone())); - } - } - } + for (i, column_indices) in batched_column_requests.iter().enumerate() { + // Send the set of columns in the same batch request + info.add_custody_columns( + blocks + .iter() + .flat_map(|b| { + b.1.iter() + .filter(|d| column_indices.contains(&d.index)) + .cloned() + }) + .collect::>(), + ); - // Terminate the requests - for i in 0..num_of_data_column_requests { - info.add_data_column(None); if i < num_of_data_column_requests - 1 { assert!( !info.is_finished(), diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fd91dc78b13..fc31e837277 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; use super::network_context::{ - BlockOrBlob, CustodyByRootResult, RangeRequestId, RpcEvent, SyncNetworkContext, + CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext, }; use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; @@ -47,7 +47,6 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; -use crate::sync::block_sidecar_coupling::RangeBlockComponentsRequest; use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; @@ -57,8 +56,9 @@ use beacon_chain::{ use futures::StreamExt; use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ - CustodyRequester, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingId, - SamplingRequester, SingleLookupReqId, SyncRequestId, + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester, + DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, + SamplingId, SamplingRequester, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; @@ -491,36 +491,14 @@ impl SyncManager { SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) } - SyncRequestId::RangeBlockAndBlobs { id } => { - if let Some(sender_id) = self.network.range_request_failed(id) { - match sender_id { - RangeRequestId::RangeSync { chain_id, batch_id } => { - self.range_sync.inject_error( - &mut self.network, - peer_id, - batch_id, - chain_id, - id, - ); - self.update_sync_state(); - } - RangeRequestId::BackfillSync { batch_id } => match self - .backfill_sync - .inject_error(&mut self.network, batch_id, &peer_id, id) - { - Ok(_) => {} - Err(_) => self.update_sync_state(), - }, - } - } else { - debug!( - self.log, - "RPC error for range request has no associated entry in network context, ungraceful disconnect"; - "peer_id" => %peer_id, - "request_id" => %id, - "error" => ?error, - ); - } + SyncRequestId::BlocksByRange(req_id) => { + self.on_blocks_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) + } + SyncRequestId::BlobsByRange(req_id) => { + self.on_blobs_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) + } + SyncRequestId::DataColumnsByRange(req_id) => { + self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } } } @@ -1051,14 +1029,13 @@ impl SyncManager { SyncRequestId::SingleBlock { id } => self.on_single_block_response( id, peer_id, - match block { - Some(block) => RpcEvent::Response(block, seen_timestamp), - None => RpcEvent::StreamTermination, - }, + RpcEvent::from_chunk(block, seen_timestamp), + ), + SyncRequestId::BlocksByRange(id) => self.on_blocks_by_range_response( + id, + peer_id, + RpcEvent::from_chunk(block, seen_timestamp), ), - SyncRequestId::RangeBlockAndBlobs { id } => { - self.range_block_and_blobs_response(id, peer_id, block.into()) - } _ => { crit!(self.log, "bad request id for block"; "peer_id" => %peer_id ); } @@ -1094,14 +1071,13 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => self.on_single_blob_response( id, peer_id, - match blob { - Some(blob) => RpcEvent::Response(blob, seen_timestamp), - None => RpcEvent::StreamTermination, - }, + RpcEvent::from_chunk(blob, seen_timestamp), + ), + SyncRequestId::BlobsByRange(id) => self.on_blobs_by_range_response( + id, + peer_id, + RpcEvent::from_chunk(blob, seen_timestamp), ), - SyncRequestId::RangeBlockAndBlobs { id } => { - self.range_block_and_blobs_response(id, peer_id, blob.into()) - } _ => { crit!(self.log, "bad request id for blob"; "peer_id" => %peer_id); } @@ -1120,19 +1096,14 @@ impl SyncManager { self.on_data_columns_by_root_response( req_id, peer_id, - match data_column { - Some(data_column) => RpcEvent::Response(data_column, seen_timestamp), - None => RpcEvent::StreamTermination, - }, - ); - } - SyncRequestId::RangeBlockAndBlobs { id } => { - self.range_block_and_blobs_response( - id, - peer_id, - BlockOrBlob::CustodyColumns(data_column), + RpcEvent::from_chunk(data_column, seen_timestamp), ); } + SyncRequestId::DataColumnsByRange(id) => self.on_data_columns_by_range_response( + id, + peer_id, + RpcEvent::from_chunk(data_column, seen_timestamp), + ), _ => { crit!(self.log, "bad request id for data_column"; "peer_id" => %peer_id); } @@ -1188,6 +1159,54 @@ impl SyncManager { } } + fn on_blocks_by_range_response( + &mut self, + id: BlocksByRangeRequestId, + peer_id: PeerId, + block: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_blocks_by_range_response(id, peer_id, block) { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::Block(resp), + ); + } + } + + fn on_blobs_by_range_response( + &mut self, + id: BlobsByRangeRequestId, + peer_id: PeerId, + blob: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_blobs_by_range_response(id, peer_id, blob) { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::Blob(resp), + ); + } + } + + fn on_data_columns_by_range_response( + &mut self, + id: DataColumnsByRangeRequestId, + peer_id: PeerId, + data_column: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_data_columns_by_range_response(id, peer_id, data_column) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::CustodyColumns(resp), + ); + } + } + fn on_custody_by_root_result( &mut self, requester: CustodyRequester, @@ -1230,27 +1249,26 @@ impl SyncManager { /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. - fn range_block_and_blobs_response( + fn on_range_components_response( &mut self, - id: Id, + range_request_id: ComponentsByRangeRequestId, peer_id: PeerId, - block_or_blob: BlockOrBlob, + range_block_component: RangeBlockComponent, ) { if let Some(resp) = self .network - .range_block_and_blob_response(id, block_or_blob) + .range_block_component_response(range_request_id, range_block_component) { - let epoch = resp.sender_id.batch_id(); - match resp.responses { + match resp { Ok(blocks) => { - match resp.sender_id { + match range_request_id.requester { RangeRequestId::RangeSync { chain_id, batch_id } => { self.range_sync.blocks_by_range_response( &mut self.network, peer_id, chain_id, batch_id, - id, + range_request_id.id, blocks, ); self.update_sync_state(); @@ -1260,7 +1278,7 @@ impl SyncManager { &mut self.network, batch_id, &peer_id, - id, + range_request_id.id, blocks, ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), @@ -1274,36 +1292,25 @@ impl SyncManager { } } } - Err(e) => { - // Re-insert the request so we can retry - self.network.insert_range_blocks_and_blobs_request( - id, - resp.sender_id, - RangeBlockComponentsRequest::new( - resp.expects_blobs, - resp.expects_custody_columns, - None, - vec![], - self.chain.spec.max_blobs_per_block(epoch) as usize, - ), - ); - // inform range that the request needs to be treated as failed - // With time we will want to downgrade this log - warn!( - self.log, - "Blocks and blobs request for range received invalid data"; - "peer_id" => %peer_id, - "sender_id" => ?resp.sender_id, - "error" => e.clone() - ); - let id = SyncRequestId::RangeBlockAndBlobs { id }; - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "block_blob_faulty_batch", - ); - self.inject_error(peer_id, id, RPCError::InvalidData(e)) - } + Err(_) => match range_request_id.requester { + RangeRequestId::RangeSync { chain_id, batch_id } => { + self.range_sync.inject_error( + &mut self.network, + peer_id, + batch_id, + chain_id, + range_request_id.id, + ); + self.update_sync_state(); + } + RangeRequestId::BackfillSync { batch_id } => match self + .backfill_sync + .inject_error(&mut self.network, batch_id, &peer_id, range_request_id.id) + { + Ok(_) => {} + Err(_) => self.update_sync_state(), + }, + }, } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 4135f901b1c..0cd21de7f41 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -5,7 +5,7 @@ use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; -use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; +use super::range_sync::ByRangeRequestType; use super::SyncMessage; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; @@ -17,13 +17,12 @@ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, - OldBlocksByRangeRequestV1, OldBlocksByRangeRequestV2, -}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; +pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ - AppRequestId, CustodyId, CustodyRequester, DataColumnsByRootRequestId, + AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; @@ -32,8 +31,8 @@ use rand::prelude::IteratorRandom; use rand::thread_rng; pub use requests::LookupVerifyError; use requests::{ - ActiveRequests, BlobsByRootRequestItems, BlocksByRootRequestItems, - DataColumnsByRootRequestItems, + ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, + BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, }; use slog::{debug, error, warn}; use std::collections::hash_map::Entry; @@ -50,33 +49,6 @@ use types::{ pub mod custody; mod requests; -pub struct BlocksAndBlobsByRangeResponse { - pub sender_id: RangeRequestId, - pub responses: Result>, String>, - pub expects_blobs: bool, - pub expects_custody_columns: Option>, -} - -#[derive(Debug, Clone, Copy)] -pub enum RangeRequestId { - RangeSync { - chain_id: ChainId, - batch_id: BatchId, - }, - BackfillSync { - batch_id: BatchId, - }, -} - -impl RangeRequestId { - pub fn batch_id(&self) -> BatchId { - match self { - RangeRequestId::RangeSync { batch_id, .. } => *batch_id, - RangeRequestId::BackfillSync { batch_id, .. } => *batch_id, - } - } -} - #[derive(Debug)] pub enum RpcEvent { StreamTermination, @@ -84,6 +56,15 @@ pub enum RpcEvent { RPCError(RPCError), } +impl RpcEvent { + pub fn from_chunk(chunk: Option, seen_timestamp: Duration) -> Self { + match chunk { + Some(item) => RpcEvent::Response(item, seen_timestamp), + None => RpcEvent::StreamTermination, + } + } +} + pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; pub type CustodyByRootResult = Result<(DataColumnSidecarList, PeerGroup), RpcResponseError>; @@ -93,6 +74,7 @@ pub enum RpcResponseError { RpcError(RPCError), VerifyError(LookupVerifyError), CustodyRequestError(CustodyRequestError), + BlockComponentCouplingError(String), } #[derive(Debug, PartialEq, Eq)] @@ -110,16 +92,6 @@ pub enum SendErrorProcessor { ProcessorNotAvailable, } -impl std::fmt::Display for RpcResponseError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - RpcResponseError::RpcError(e) => write!(f, "RPC Error: {:?}", e), - RpcResponseError::VerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), - RpcResponseError::CustodyRequestError(e) => write!(f, "Custody Request Error: {:?}", e), - } - } -} - impl From for RpcResponseError { fn from(e: RPCError) -> Self { RpcResponseError::RpcError(e) @@ -199,13 +171,22 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRoot requests data_columns_by_root_requests: ActiveRequests>, + /// A mapping of active BlocksByRange requests + blocks_by_range_requests: + ActiveRequests>, + /// A mapping of active BlobsByRange requests + blobs_by_range_requests: + ActiveRequests>, + /// A mapping of active DataColumnsByRange requests + data_columns_by_range_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, - /// BlocksByRange requests paired with BlobsByRange - range_block_components_requests: - FnvHashMap)>, + /// BlocksByRange requests paired with other ByRange requests for data components + components_by_range_requests: + FnvHashMap>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. @@ -223,22 +204,10 @@ pub struct SyncNetworkContext { } /// Small enumeration to make dealing with block and blob requests easier. -pub enum BlockOrBlob { - Block(Option>>), - Blob(Option>>), - CustodyColumns(Option>>), -} - -impl From>>> for BlockOrBlob { - fn from(block: Option>>) -> Self { - BlockOrBlob::Block(block) - } -} - -impl From>>> for BlockOrBlob { - fn from(blob: Option>>) -> Self { - BlockOrBlob::Blob(blob) - } +pub enum RangeBlockComponent { + Block(RpcResponseResult>>>), + Blob(RpcResponseResult>>>), + CustodyColumns(RpcResponseResult>>>), } impl SyncNetworkContext { @@ -256,8 +225,11 @@ impl SyncNetworkContext { blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), + blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), + blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), + data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), custody_by_root_requests: <_>::default(), - range_block_components_requests: FnvHashMap::default(), + components_by_range_requests: FnvHashMap::default(), network_beacon_processor, chain, fork_context, @@ -272,37 +244,60 @@ impl SyncNetworkContext { /// Returns the ids of all the requests made to the given peer_id. pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec { - let failed_range_ids = - self.range_block_components_requests - .iter() - .filter_map(|(id, request)| { - if request.1.peer_ids.contains(peer_id) { - Some(SyncRequestId::RangeBlockAndBlobs { id: *id }) - } else { - None - } - }); - - let failed_block_ids = self - .blocks_by_root_requests + // Note: using destructuring pattern without a default case to make sure we don't forget to + // add new request types to this function. Otherwise, lookup sync can break and lookups + // will get stuck if a peer disconnects during an active requests. + let Self { + network_send: _, + request_id: _, + blocks_by_root_requests, + blobs_by_root_requests, + data_columns_by_root_requests, + blocks_by_range_requests, + blobs_by_range_requests, + data_columns_by_range_requests, + // custody_by_root_requests is a meta request of data_columns_by_root_requests + custody_by_root_requests: _, + // components_by_range_requests is a meta request of various _by_range requests + components_by_range_requests: _, + execution_engine_state: _, + network_beacon_processor: _, + chain: _, + fork_context: _, + log: _, + } = self; + + let blocks_by_root_ids = blocks_by_root_requests .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlock { id: *id }); - let failed_blob_ids = self - .blobs_by_root_requests + let blobs_by_root_ids = blobs_by_root_requests .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlob { id: *id }); - let failed_data_column_by_root_ids = self - .data_columns_by_root_requests + let data_column_by_root_ids = data_columns_by_root_requests .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRoot(*req_id)); - - failed_range_ids - .chain(failed_block_ids) - .chain(failed_blob_ids) - .chain(failed_data_column_by_root_ids) + let blocks_by_range_ids = blocks_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::BlocksByRange(*req_id)); + let blobs_by_range_ids = blobs_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::BlobsByRange(*req_id)); + let data_column_by_range_ids = data_columns_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + + blocks_by_root_ids + .chain(blobs_by_root_ids) + .chain(data_column_by_root_ids) + .chain(blocks_by_range_ids) + .chain(blobs_by_range_ids) + .chain(data_column_by_range_ids) .collect() } @@ -361,117 +356,62 @@ impl SyncNetworkContext { peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, - sender_id: RangeRequestId, + requester: RangeRequestId, ) -> Result { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let id = self.next_id(); - let mut requested_peers = vec![peer_id]; - debug!( - self.log, - "Sending BlocksByRange request"; - "method" => "BlocksByRange", - "count" => request.count(), - "epoch" => epoch, - "peer" => %peer_id, - "id" => id, - ); - let rpc_request = match request { - BlocksByRangeRequest::V1(ref req) => { - RequestType::BlocksByRange(OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 { - start_slot: req.start_slot, - count: req.count, - step: 1, - })) - } - BlocksByRangeRequest::V2(ref req) => { - RequestType::BlocksByRange(OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 { - start_slot: req.start_slot, - count: req.count, - step: 1, - })) - } + // Create the overall components_by_range request ID before its individual components + let id = ComponentsByRangeRequestId { + id: self.next_id(), + requester, }; - self.network_send - .send(NetworkMessage::SendRequest { - peer_id, - request: rpc_request, - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; - let expected_blobs = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { - debug!( - self.log, - "Sending BlobsByRange requests"; - "method" => "BlobsByRange", - "count" => request.count(), - "epoch" => epoch, - "peer" => %peer_id, - ); + let _blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?; - // Create the blob request based on the blocks request. - self.network_send - .send(NetworkMessage::SendRequest { - peer_id, - request: RequestType::BlobsByRange(BlobsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - }), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; - true + let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { + Some(self.send_blobs_by_range_request( + peer_id, + BlobsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + )?) } else { - false + None }; - let (expects_columns, num_of_column_req) = + let (expects_columns, data_column_requests) = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { let column_indexes = self.network_globals().sampling_columns.clone(); - let mut num_of_custody_column_req = 0; - for (peer_id, columns_by_range_request) in - self.make_columns_by_range_requests(request, &column_indexes)? - { - requested_peers.push(peer_id); - - debug!( - self.log, - "Sending DataColumnsByRange requests"; - "method" => "DataColumnsByRange", - "count" => columns_by_range_request.count, - "epoch" => epoch, - "columns" => ?columns_by_range_request.columns, - "peer" => %peer_id, - "id" => id, - ); - - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: RequestType::DataColumnsByRange(columns_by_range_request), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + let data_column_requests = self + .make_columns_by_range_requests(request, &column_indexes)? + .into_iter() + .map(|(peer_id, columns_by_range_request)| { + self.send_data_columns_by_range_request( + peer_id, + columns_by_range_request, + id, + ) }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .collect::, _>>()?; - num_of_custody_column_req += 1; - } - - (Some(column_indexes), Some(num_of_custody_column_req)) + ( + Some(column_indexes.into_iter().collect::>()), + Some(data_column_requests), + ) } else { (None, None) }; - let max_blobs_len = self.chain.spec.max_blobs_per_block(epoch); + let expected_blobs = blobs_req_id.is_some(); let info = RangeBlockComponentsRequest::new( expected_blobs, - expects_columns.map(|c| c.into_iter().collect()), - num_of_column_req, - requested_peers, - max_blobs_len as usize, + expects_columns, + data_column_requests.map(|items| items.len()), ); - self.range_block_components_requests - .insert(id, (sender_id, info)); - Ok(id) + self.components_by_range_requests.insert(id, info); + + Ok(id.id) } fn make_columns_by_range_requests( @@ -508,54 +448,43 @@ impl SyncNetworkContext { Ok(peer_id_to_request_map) } - pub fn range_request_failed(&mut self, request_id: Id) -> Option { - let sender_id = self - .range_block_components_requests - .remove(&request_id) - .map(|(sender_id, _info)| sender_id); - if let Some(sender_id) = sender_id { - debug!( - self.log, - "Sync range request failed"; - "request_id" => request_id, - "sender_id" => ?sender_id - ); - Some(sender_id) - } else { - debug!(self.log, "Sync range request failed"; "request_id" => request_id); - None - } - } - /// Received a blocks by range or blobs by range response for a request that couples blocks ' /// and blobs. - pub fn range_block_and_blob_response( + pub fn range_block_component_response( &mut self, - request_id: Id, - block_or_blob: BlockOrBlob, - ) -> Option> { - let Entry::Occupied(mut entry) = self.range_block_components_requests.entry(request_id) - else { + id: ComponentsByRangeRequestId, + range_block_component: RangeBlockComponent, + ) -> Option>, RpcResponseError>> { + let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else { metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]); return None; }; - let (_, info) = entry.get_mut(); - match block_or_blob { - BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), - BlockOrBlob::CustodyColumns(column) => info.add_data_column(column), + if let Err(e) = { + let request = entry.get_mut(); + match range_block_component { + RangeBlockComponent::Block(resp) => resp.map(|(blocks, _)| { + request.add_blocks(blocks); + }), + RangeBlockComponent::Blob(resp) => resp.map(|(blobs, _)| { + request.add_blobs(blobs); + }), + RangeBlockComponent::CustodyColumns(resp) => resp.map(|(custody_columns, _)| { + request.add_custody_columns(custody_columns); + }), + } + } { + entry.remove(); + return Some(Err(e)); } - if info.is_finished() { + + if entry.get_mut().is_finished() { // If the request is finished, dequeue everything - let (sender_id, info) = entry.remove(); - let (expects_blobs, expects_custody_columns) = info.get_requirements(); - Some(BlocksAndBlobsByRangeResponse { - sender_id, - responses: info.into_responses(&self.chain.spec), - expects_blobs, - expects_custody_columns, - }) + let request = entry.remove(); + let blocks = request + .into_responses(&self.chain.spec) + .map_err(RpcResponseError::BlockComponentCouplingError); + Some(blocks) } else { None } @@ -831,6 +760,125 @@ impl SyncNetworkContext { } } + fn send_blocks_by_range_request( + &mut self, + peer_id: PeerId, + request: BlocksByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + ) -> Result { + let id = BlocksByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + debug!( + self.log, + "Sending BlocksByRange request"; + "method" => "BlocksByRange", + "count" => request.count(), + "epoch" => Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()), + "peer" => %peer_id, + "id" => ?id, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlocksByRange(request.clone().into()), + request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; + + self.blocks_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + BlocksByRangeRequestItems::new(request), + ); + Ok(id) + } + + fn send_blobs_by_range_request( + &mut self, + peer_id: PeerId, + request: BlobsByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + ) -> Result { + let id = BlobsByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + let request_epoch = Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()); + debug!( + self.log, + "Sending BlobsByRange requests"; + "method" => "BlobsByRange", + "count" => request.count, + "epoch" => request_epoch, + "peer" => %peer_id, + "id" => ?id, + ); + + // Create the blob request based on the blocks request. + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlobsByRange(request.clone()), + request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; + + let max_blobs_per_block = self.chain.spec.max_blobs_per_block(request_epoch); + self.blobs_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + BlobsByRangeRequestItems::new(request, max_blobs_per_block), + ); + Ok(id) + } + + fn send_data_columns_by_range_request( + &mut self, + peer_id: PeerId, + request: DataColumnsByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + ) -> Result { + let id = DataColumnsByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + debug!( + self.log, + "Sending DataColumnsByRange requests"; + "method" => "DataColumnsByRange", + "count" => request.count, + "epoch" => Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + "columns" => ?request.columns, + "peer" => %peer_id, + "id" => ?id, + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRange(request.clone()), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; + + self.data_columns_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + DataColumnsByRangeRequestItems::new(request), + ); + Ok(id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -929,16 +977,6 @@ impl SyncNetworkContext { } } - pub fn insert_range_blocks_and_blobs_request( - &mut self, - id: Id, - sender_id: RangeRequestId, - info: RangeBlockComponentsRequest, - ) { - self.range_block_components_requests - .insert(id, (sender_id, info)); - } - /// Attempt to make progress on all custody_by_root requests. Some request may be stale waiting /// for custody peers. Returns a Vec of results as zero or more requests may fail in this /// attempt. @@ -1037,6 +1075,41 @@ impl SyncNetworkContext { self.report_rpc_response_errors(resp, peer_id) } + #[allow(clippy::type_complexity)] + pub(crate) fn on_blocks_by_range_response( + &mut self, + id: BlocksByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self.blocks_by_range_requests.on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_blobs_by_range_response( + &mut self, + id: BlobsByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self.blobs_by_range_requests.on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_data_columns_by_range_response( + &mut self, + id: DataColumnsByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>> { + let resp = self + .data_columns_by_range_requests + .on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } + fn report_rpc_response_errors( &mut self, resp: Option>, @@ -1191,21 +1264,27 @@ impl SyncNetworkContext { } pub(crate) fn register_metrics(&self) { - metrics::set_gauge_vec( - &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, - &["blocks_by_root"], - self.blocks_by_root_requests.len() as i64, - ); - metrics::set_gauge_vec( - &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, - &["blobs_by_root"], - self.blobs_by_root_requests.len() as i64, - ); - metrics::set_gauge_vec( - &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, - &["range_blocks"], - self.range_block_components_requests.len() as i64, - ); + for (id, count) in [ + ("blocks_by_root", self.blocks_by_root_requests.len()), + ("blobs_by_root", self.blobs_by_root_requests.len()), + ( + "data_columns_by_root", + self.data_columns_by_root_requests.len(), + ), + ("blocks_by_range", self.blocks_by_range_requests.len()), + ("blobs_by_range", self.blobs_by_range_requests.len()), + ( + "data_columns_by_range", + self.data_columns_by_range_requests.len(), + ), + ("custody_by_root", self.custody_by_root_requests.len()), + ( + "components_by_range", + self.components_by_range_requests.len(), + ), + ] { + metrics::set_gauge_vec(&metrics::SYNC_ACTIVE_NETWORK_REQUESTS, &[id], count as i64); + } } } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 4a5a16459d3..c9b85e47b69 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -4,10 +4,13 @@ use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use strum::IntoStaticStr; -use types::Hash256; +use types::{Hash256, Slot}; +pub use blobs_by_range::BlobsByRangeRequestItems; pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest}; +pub use blocks_by_range::BlocksByRangeRequestItems; pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest}; +pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; @@ -16,8 +19,11 @@ use crate::metrics; use super::{RpcEvent, RpcResponseResult}; +mod blobs_by_range; mod blobs_by_root; +mod blocks_by_range; mod blocks_by_root; +mod data_columns_by_range; mod data_columns_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -26,8 +32,9 @@ pub enum LookupVerifyError { TooManyResponses, UnrequestedBlockRoot(Hash256), UnrequestedIndex(u64), + UnrequestedSlot(Slot), InvalidInclusionProof, - DuplicateData, + DuplicatedData(Slot, u64), InternalError(String), } diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs new file mode 100644 index 00000000000..9c6f516199c --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs @@ -0,0 +1,56 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use std::sync::Arc; +use types::{BlobSidecar, EthSpec}; + +/// Accumulates results of a blobs_by_range request. Only returns items after receiving the +/// stream termination. +pub struct BlobsByRangeRequestItems { + request: BlobsByRangeRequest, + items: Vec>>, + max_blobs_per_block: u64, +} + +impl BlobsByRangeRequestItems { + pub fn new(request: BlobsByRangeRequest, max_blobs_per_block: u64) -> Self { + Self { + request, + items: vec![], + max_blobs_per_block, + } + } +} + +impl ActiveRequestItems for BlobsByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, blob: Self::Item) -> Result { + if blob.slot() < self.request.start_slot + || blob.slot() >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(blob.slot())); + } + if blob.index >= self.max_blobs_per_block { + return Err(LookupVerifyError::UnrequestedIndex(blob.index)); + } + if !blob.verify_blob_sidecar_inclusion_proof() { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if self + .items + .iter() + .any(|existing| existing.slot() == blob.slot() && existing.index == blob.index) + { + return Err(LookupVerifyError::DuplicatedData(blob.slot(), blob.index)); + } + + self.items.push(blob); + + // Skip check if blobs are ready as it's rare that all blocks have max blobs + Ok(false) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs index a6702298845..547c51198e4 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -57,7 +57,7 @@ impl ActiveRequestItems for BlobsByRootRequestItems { return Err(LookupVerifyError::UnrequestedIndex(blob.index)); } if self.items.iter().any(|b| b.index == blob.index) { - return Err(LookupVerifyError::DuplicateData); + return Err(LookupVerifyError::DuplicatedData(blob.slot(), blob.index)); } self.items.push(blob); diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs new file mode 100644 index 00000000000..c7d2dda01ea --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs @@ -0,0 +1,48 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::BlocksByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedBeaconBlock}; + +/// Accumulates results of a blocks_by_range request. Only returns items after receiving the +/// stream termination. +pub struct BlocksByRangeRequestItems { + request: BlocksByRangeRequest, + items: Vec>>, +} + +impl BlocksByRangeRequestItems { + pub fn new(request: BlocksByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for BlocksByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, block: Self::Item) -> Result { + if block.slot().as_u64() < *self.request.start_slot() + || block.slot().as_u64() >= self.request.start_slot() + self.request.count() + { + return Err(LookupVerifyError::UnrequestedSlot(block.slot())); + } + if self + .items + .iter() + .any(|existing| existing.slot() == block.slot()) + { + // DuplicatedData is a common error for all components, default index to 0 + return Err(LookupVerifyError::DuplicatedData(block.slot(), 0)); + } + + self.items.push(block); + + Ok(self.items.len() >= *self.request.count() as usize) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs new file mode 100644 index 00000000000..9dabb2defa0 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs @@ -0,0 +1,54 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::DataColumnsByRangeRequest; +use std::sync::Arc; +use types::{DataColumnSidecar, EthSpec}; + +/// Accumulates results of a data_columns_by_range request. Only returns items after receiving the +/// stream termination. +pub struct DataColumnsByRangeRequestItems { + request: DataColumnsByRangeRequest, + items: Vec>>, +} + +impl DataColumnsByRangeRequestItems { + pub fn new(request: DataColumnsByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for DataColumnsByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, data_column: Self::Item) -> Result { + if data_column.slot() < self.request.start_slot + || data_column.slot() >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(data_column.slot())); + } + if !self.request.columns.contains(&data_column.index) { + return Err(LookupVerifyError::UnrequestedIndex(data_column.index)); + } + if !data_column.verify_inclusion_proof() { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if self.items.iter().any(|existing| { + existing.slot() == data_column.slot() && existing.index == data_column.index + }) { + return Err(LookupVerifyError::DuplicatedData( + data_column.slot(), + data_column.index, + )); + } + + self.items.push(data_column); + + Ok(self.items.len() >= self.request.count as usize * self.request.columns.len()) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index 1b8d46ff072..4e02737f086 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -57,7 +57,10 @@ impl ActiveRequestItems for DataColumnsByRootRequestItems { return Err(LookupVerifyError::UnrequestedIndex(data_column.index)); } if self.items.iter().any(|d| d.index == data_column.index) { - return Err(LookupVerifyError::DuplicateData); + return Err(LookupVerifyError::DuplicatedData( + data_column.slot(), + data_column.index, + )); } self.items.push(data_column); diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 53fb55b14da..818fde07b83 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,4 +1,4 @@ -use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; +use beacon_chain::block_verification_types::RpcBlock; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::service::api_types::Id; use lighthouse_network::PeerId; @@ -277,36 +277,6 @@ impl BatchInfo { > { match self.state.poison() { BatchState::Downloading(peer, _request_id) => { - // verify that blocks are in range - if let Some(last_slot) = blocks.last().map(|b| b.slot()) { - // the batch is non-empty - let first_slot = blocks[0].slot(); - - let failed_range = if first_slot < self.start_slot { - Some((self.start_slot, first_slot)) - } else if self.end_slot < last_slot { - Some((self.end_slot, last_slot)) - } else { - None - }; - - if let Some((expected, received)) = failed_range { - // this is a failed download, register the attempt and check if the batch - // can be tried again - self.failed_download_attempts.push(peer); - self.state = if self.failed_download_attempts.len() - >= B::max_batch_download_attempts() as usize - { - BatchState::Failed - } else { - // drop the blocks - BatchState::AwaitingDownload - }; - - return Err(Ok((expected, received, self.outcome()))); - } - } - let received = blocks.len(); self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); Ok(received) diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index cfd89f7b44e..f78b44308d1 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -11,7 +11,10 @@ use lighthouse_network::rpc::methods::{ OldBlocksByRangeRequestV2, }; use lighthouse_network::rpc::{RequestType, StatusMessage}; -use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId}; +use lighthouse_network::service::api_types::{ + AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + SyncRequestId, +}; use lighthouse_network::{PeerId, SyncInfo}; use std::time::Duration; use types::{ @@ -28,8 +31,8 @@ pub(crate) enum DataSidecars { enum ByRangeDataRequestIds { PreDeneb, - PrePeerDAS(Id, PeerId), - PostPeerDAS(Vec<(Id, PeerId)>), + PrePeerDAS(BlobsByRangeRequestId, PeerId), + PostPeerDAS(Vec<(DataColumnsByRangeRequestId, PeerId)>), } /// Sync tests are usually written in the form: @@ -151,7 +154,7 @@ impl TestRig { fn find_blocks_by_range_request( &mut self, request_filter: RequestFilter, - ) -> ((Id, PeerId), ByRangeDataRequestIds) { + ) -> ((BlocksByRangeRequestId, PeerId), ByRangeDataRequestIds) { let filter_f = |peer: PeerId, start_slot: u64| { if let Some(expected_epoch) = request_filter.epoch { let epoch = Slot::new(start_slot).epoch(E::slots_per_epoch()).as_u64(); @@ -175,7 +178,7 @@ impl TestRig { RequestType::BlocksByRange(OldBlocksByRangeRequest::V2( OldBlocksByRangeRequestV2 { start_slot, .. }, )), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)), } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) @@ -190,7 +193,7 @@ impl TestRig { RequestType::DataColumnsByRange(DataColumnsByRangeRequest { start_slot, .. }), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) { @@ -206,7 +209,7 @@ impl TestRig { NetworkMessage::SendRequest { peer_id, request: RequestType::BlobsByRange(BlobsByRangeRequest { start_slot, .. }), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)), } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) @@ -225,10 +228,10 @@ impl TestRig { // Complete the request with a single stream termination self.log(&format!( - "Completing BlocksByRange request {blocks_req_id} with empty stream" + "Completing BlocksByRange request {blocks_req_id:?} with empty stream" )); self.send_sync_message(SyncMessage::RpcBlock { - request_id: SyncRequestId::RangeBlockAndBlobs { id: blocks_req_id }, + request_id: SyncRequestId::BlocksByRange(blocks_req_id), peer_id: block_peer, beacon_block: None, seen_timestamp: D, @@ -239,10 +242,10 @@ impl TestRig { ByRangeDataRequestIds::PrePeerDAS(id, peer_id) => { // Complete the request with a single stream termination self.log(&format!( - "Completing BlobsByRange request {id} with empty stream" + "Completing BlobsByRange request {id:?} with empty stream" )); self.send_sync_message(SyncMessage::RpcBlob { - request_id: SyncRequestId::RangeBlockAndBlobs { id }, + request_id: SyncRequestId::BlobsByRange(id), peer_id, blob_sidecar: None, seen_timestamp: D, @@ -252,10 +255,10 @@ impl TestRig { // Complete the request with a single stream termination for (id, peer_id) in data_column_req_ids { self.log(&format!( - "Completing DataColumnsByRange request {id} with empty stream" + "Completing DataColumnsByRange request {id:?} with empty stream" )); self.send_sync_message(SyncMessage::RpcDataColumn { - request_id: SyncRequestId::RangeBlockAndBlobs { id }, + request_id: SyncRequestId::DataColumnsByRange(id), peer_id, data_column: None, seen_timestamp: D,