Skip to content

Commit

Permalink
Add individual by_range sync requests (#6497)
Browse files Browse the repository at this point in the history
Part of
- #6258

To address PeerDAS sync issues we need to make individual by_range requests within a batch retriable. We should adopt the same pattern for lookup sync where each request (block/blobs/columns) is tracked individually within a "meta" request that group them all and handles retry logic.


  - Building on #6398

second step is to add individual request accumulators for `blocks_by_range`, `blobs_by_range`, and `data_columns_by_range`. This will allow each request to progress independently and be retried separately.

Most of the logic is just piping, excuse the large diff. This PR does not change the logic of how requests are handled or retried. This will be done in a future PR changing the logic of `RangeBlockComponentsRequest`.

### Before

- Sync manager receives block with `SyncRequestId::RangeBlockAndBlobs`
- Insert block into `SyncNetworkContext::range_block_components_requests`
- (If received stream terminators of all requests)
- Return `Vec<RpcBlock>`, and insert into `range_sync`

### Now

- Sync manager receives block with `SyncRequestId::RangeBlockAndBlobs`
- Insert block into `SyncNetworkContext:: blocks_by_range_requests`
- (If received stream terminator of this request)
- Return `Vec<SignedBlock>`, and insert into `SyncNetworkContext::components_by_range_requests `
- (If received a result for all requests)
- Return `Vec<RpcBlock>`, and insert into `range_sync`
  • Loading branch information
dapplion authored Feb 5, 2025
1 parent 7bfdb33 commit 2193f6a
Show file tree
Hide file tree
Showing 15 changed files with 776 additions and 502 deletions.
21 changes: 21 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,27 @@ impl OldBlocksByRangeRequest {
}
}

impl From<BlocksByRangeRequest> for OldBlocksByRangeRequest {
fn from(req: BlocksByRangeRequest) -> Self {
match req {
BlocksByRangeRequest::V1(ref req) => {
OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
BlocksByRangeRequest::V2(ref req) => {
OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
}
}
}

/// Request a number of beacon block bodies from a peer.
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
Expand Down
22 changes: 13 additions & 9 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod tests {
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{Ping, Protocol, RequestType};
use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId};
use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use std::time::Duration;
use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot};
Expand All @@ -238,12 +238,16 @@ mod tests {
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, fork_context, log).unwrap();
let peer_id = PeerId::random();
let lookup_id = 0;

for i in 1..=5u32 {
let _ = limiter.allows(
peer_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id: i,
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id,
req_id: i,
},
})),
RequestType::Ping(Ping { data: i as u64 }),
);
Expand All @@ -261,9 +265,9 @@ mod tests {
for i in 2..=5u32 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id,
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}

Expand All @@ -286,9 +290,9 @@ mod tests {
for i in 3..=5 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}

Expand Down
58 changes: 55 additions & 3 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use libp2p::swarm::ConnectionId;
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap,
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
};

Expand Down Expand Up @@ -31,8 +31,12 @@ pub enum SyncRequestId {
SingleBlob { id: SingleLookupReqId },
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(DataColumnsByRootRequestId),
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
/// Blocks by range request
BlocksByRange(BlocksByRangeRequestId),
/// Blobs by range request
BlobsByRange(BlobsByRangeRequestId),
/// Data columns by range request
DataColumnsByRange(DataColumnsByRangeRequestId),
}

/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
Expand All @@ -43,12 +47,60 @@ pub struct DataColumnsByRootRequestId {
pub requester: DataColumnsByRootRequester,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlocksByRangeRequestId {
/// Id to identify this attempt at a blocks_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlobsByRangeRequestId {
/// Id to identify this attempt at a blobs_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRangeRequestId {
/// Id to identify this attempt at a data_columns_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}

/// Block components by range request for range sync. Includes an ID for downstream consumers to
/// handle retries and tie all their sub requests together.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct ComponentsByRangeRequestId {
/// Each `RangeRequestId` may request the same data in a later retry. This Id identifies the
/// current attempt.
pub id: Id,
/// What sync component is issuing a components by range request and expecting data back
pub requester: RangeRequestId,
}

/// Range sync chain or backfill batch
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequestId {
RangeSync { chain_id: Id, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Sampling(SamplingId),
Custody(CustodyId),
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequester {
RangeSync { chain_id: u64, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct SamplingId {
pub id: SamplingRequester,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::RangeBlockAndBlobs { .. } => id,
id @ SyncRequestId::BlocksByRange { .. } => id,
other => {
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
return;
Expand Down
Loading

0 comments on commit 2193f6a

Please sign in to comment.