Skip to content

Commit

Permalink
Updateeth2_libp2p & move blob count validations of received request…
Browse files Browse the repository at this point in the history
…s to `eth2_libp2p`
  • Loading branch information
Tumas committed Jan 13, 2025
1 parent aadb443 commit eb84363
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 44 deletions.
26 changes: 24 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 20 additions & 41 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ impl<P: Preset> Network<P> {
self.request_blobs_by_range(request_id, peer_id, start_slot, count);
}
SyncToP2p::RequestBlobsByRoot(request_id, peer_id, identifiers) => {
self.request_blobs_by_root(request_id, peer_id, identifiers);
if let Err(error) = self.request_blobs_by_root(request_id, peer_id, identifiers) {
warn!("cannot request blobs by root: {error:?}");
}
}
SyncToP2p::RequestBlocksByRange(request_id, peer_id, start_slot, count) => {
self.request_blocks_by_range(request_id, peer_id, start_slot, count);
Expand Down Expand Up @@ -926,7 +928,8 @@ impl<P: Preset> Network<P> {
self.handle_blobs_by_range_request(peer_id, peer_request_id, request_id, request)
}
RequestType::BlobsByRoot(request) => {
self.handle_blobs_by_root_request(peer_id, peer_request_id, request_id, request)
self.handle_blobs_by_root_request(peer_id, peer_request_id, request_id, request);
Ok(())
}
RequestType::Goodbye(goodbye_reason) => {
debug!("received GoodBye request (peer_id: {peer_id}, reason: {goodbye_reason:?})");
Expand Down Expand Up @@ -1046,19 +1049,8 @@ impl<P: Preset> Network<P> {
debug!("received BlobSidecarsByRange request (peer_id: {peer_id}, request: {request:?})");

let BlobsByRangeRequest { start_slot, count } = request;
let chain_config = self.controller.chain_config();
let phase = chain_config.phase_at_slot::<P>(start_slot);
let Some(max_request_blob_sidecars) = chain_config.max_request_blob_sidecars(phase) else {
return Err(Error::InvalidPhaseRequest {
phase,
protocol: "blob_sidecars_by_range".to_owned(),
}
.into());
};

let difference = count
.min(max_request_blob_sidecars)
.min(MAX_FOR_DOS_PREVENTION);
let difference = count.min(MAX_FOR_DOS_PREVENTION);

let end_slot = start_slot
.checked_add(difference)
Expand Down Expand Up @@ -1116,35 +1108,21 @@ impl<P: Preset> Network<P> {
peer_request_id: PeerRequestId,
request_id: IncomingRequestId,
request: BlobsByRootRequest,
) -> Result<()> {
) {
debug!("received BlobsByRootRequest request (peer_id: {peer_id}, request: {request:?})");

// TODO(feature/deneb): MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
let BlobsByRootRequest { blob_ids } = request;
let phase = self.controller.phase();
let Some(max_request_blob_sidecars) = self
.controller
.chain_config()
.max_request_blob_sidecars(phase)
else {
return Err(Error::InvalidPhaseRequest {
phase,
protocol: "blob_sidecars_by_root".to_owned(),
}
.into());
};

let controller = self.controller.clone_arc();
let network_to_service_tx = self.network_to_service_tx.clone();

self.dedicated_executor
.spawn(async move {
// > Clients MAY limit the number of blocks and sidecars in the response.
let blob_ids = blob_ids.into_iter().take(
MAX_FOR_DOS_PREVENTION
.min(max_request_blob_sidecars)
.try_into()?,
);
let blob_ids = blob_ids
.into_iter()
.take(MAX_FOR_DOS_PREVENTION.try_into()?);

let blob_sidecars = controller.blob_sidecars_by_ids(blob_ids)?;

Expand Down Expand Up @@ -1179,9 +1157,7 @@ impl<P: Preset> Network<P> {

Ok::<_, anyhow::Error>(())
})
.detach();

Ok(())
.detach()
}

fn handle_blocks_by_root_request(
Expand Down Expand Up @@ -1778,7 +1754,7 @@ impl<P: Preset> Network<P> {
request_id: RequestId,
peer_id: PeerId,
blob_identifiers: Vec<BlobIdentifier>,
) {
) -> Result<()> {
let blob_identifiers = blob_identifiers
.into_iter()
.filter(|blob_identifier| !self.received_blob_sidecars.contains_key(blob_identifier))
Expand All @@ -1789,18 +1765,23 @@ impl<P: Preset> Network<P> {
"cannot request BlobSidecarsByRoot: all requested blob sidecars have been received",
);

return;
return Ok(());
}

let request =
BlobsByRootRequest::new(self.controller.chain_config(), blob_identifiers.into_iter());
let request = BlobsByRootRequest::new(
self.controller.chain_config(),
self.controller.phase(),
blob_identifiers.into_iter(),
)?;

debug!(
"sending BlobSidecarsByRoot request (request_id: {request_id}, peer_id: {peer_id}, \
request: {request:?})",
);

self.request(peer_id, request_id, RequestType::BlobsByRoot(request));

Ok(())
}

fn request_blocks_by_range(
Expand Down Expand Up @@ -1982,8 +1963,6 @@ impl<P: Preset> Network<P> {
enum Error {
#[error("end slot overflowed ({start_slot} + {difference})")]
EndSlotOverflow { start_slot: u64, difference: u64 },
#[error("cannot request {protocol} at {phase}")]
InvalidPhaseRequest { protocol: String, phase: Phase },
}

fn fork_digest(fork_context: &ForkContext) -> ForkDigest {
Expand Down

0 comments on commit eb84363

Please sign in to comment.