Skip to content

Commit

Permalink
POST /eth/v2/beacon/pool/attestations bugfixes (#6867)
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev authored Jan 31, 2025
1 parent d47b3e3 commit 276eda3
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'_, T> {

pub fn single_attestation(&self) -> Option<SingleAttestation> {
Some(SingleAttestation {
committee_index: self.attestation.committee_index()? as usize,
attester_index: self.validator_index,
committee_index: self.attestation.committee_index()?,
attester_index: self.validator_index as u64,
data: self.attestation.data().clone(),
signature: self.attestation.signature().clone(),
})
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,15 +1131,15 @@ where
.unwrap();

let single_attestation =
attestation.to_single_attestation_with_attester_index(attester_index)?;
attestation.to_single_attestation_with_attester_index(attester_index as u64)?;

let attestation: Attestation<E> = single_attestation.to_attestation(committee.committee)?;

assert_eq!(
single_attestation.committee_index,
attestation.committee_index().unwrap() as usize
attestation.committee_index().unwrap()
);
assert_eq!(single_attestation.attester_index, validator_index);
assert_eq!(single_attestation.attester_index, validator_index as u64);
Ok(single_attestation)
}

Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rand = { workspace = true }
safe_arith = { workspace = true }
sensitive_url = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
slog = { workspace = true }
slot_clock = { workspace = true }
state_processing = { workspace = true }
Expand Down
58 changes: 39 additions & 19 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ mod validator;
mod validator_inclusion;
mod validators;
mod version;

use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
Expand Down Expand Up @@ -63,6 +62,7 @@ pub use publish_blocks::{
publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
Expand All @@ -83,14 +83,13 @@ use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use types::ChainSpec;
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit,
SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData,
AttesterSlashing, BeaconStateError, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -1279,6 +1278,9 @@ pub fn serve<T: BeaconChainTypes>(
let consensus_version_header_filter =
warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER);

let optional_consensus_version_header_filter =
warp::header::optional::<ForkName>(CONSENSUS_VERSION_HEADER);

// POST beacon/blocks
let post_beacon_blocks = eth_v1
.and(warp::path("beacon"))
Expand Down Expand Up @@ -1829,20 +1831,19 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

let beacon_pool_path_any = any_version
let beacon_pool_path_v2 = eth_v2
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

let beacon_pool_path_v2 = eth_v2
let beacon_pool_path_any = any_version
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

// POST beacon/pool/attestations
let post_beacon_pool_attestations = beacon_pool_path
let post_beacon_pool_attestations_v1 = beacon_pool_path
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
Expand All @@ -1851,9 +1852,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(reprocess_send_filter.clone())
.and(log_filter.clone())
.then(
// V1 and V2 are identical except V2 has a consensus version header in the request.
// We only require this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
Expand All @@ -1879,18 +1877,40 @@ pub fn serve<T: BeaconChainTypes>(
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(warp_utils::json::json::<Value>())
.and(optional_consensus_version_header_filter)
.and(network_tx_filter.clone())
.and(reprocess_send_filter)
.and(reprocess_send_filter.clone())
.and(log_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
payload: Value,
fork_name: Option<ForkName>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
let attestations = attestations.into_iter().map(Either::Right).collect();
let attestations =
match crate::publish_attestations::deserialize_attestation_payload::<T>(
payload, fork_name, &log,
) {
Ok(attestations) => attestations,
Err(err) => {
warn!(
log,
"Unable to deserialize attestation POST request";
"error" => ?err
);
return warp::reply::with_status(
warp::reply::json(
&"Unable to deserialize request body".to_string(),
),
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
}
};

let result = crate::publish_attestations::publish_attestations(
task_spawner,
chain,
Expand Down Expand Up @@ -4765,7 +4785,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_beacon_blinded_blocks)
.uor(post_beacon_blocks_v2)
.uor(post_beacon_blinded_blocks_v2)
.uor(post_beacon_pool_attestations)
.uor(post_beacon_pool_attestations_v1)
.uor(post_beacon_pool_attestations_v2)
.uor(post_beacon_pool_attester_slashings)
.uor(post_beacon_pool_proposer_slashings)
Expand Down
42 changes: 37 additions & 5 deletions beacon_node/http_api/src/publish_attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use either::Either;
use eth2::types::Failure;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use serde_json::Value;
use slog::{debug, error, warn, Logger};
use std::borrow::Cow;
use std::sync::Arc;
Expand All @@ -52,18 +53,19 @@ use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use types::{Attestation, EthSpec, SingleAttestation};
use types::{Attestation, EthSpec, ForkName, SingleAttestation};

// Error variants are only used in `Debug` and considered `dead_code` by the compiler.
#[derive(Debug)]
enum Error {
pub enum Error {
Validation(AttestationError),
Publication,
ForkChoice(#[allow(dead_code)] BeaconChainError),
AggregationPool(#[allow(dead_code)] AttestationError),
ReprocessDisabled,
ReprocessFull,
ReprocessTimeout,
InvalidJson(#[allow(dead_code)] serde_json::Error),
FailedConversion(#[allow(dead_code)] BeaconChainError),
}

Expand All @@ -74,6 +76,36 @@ enum PublishAttestationResult {
Failure(Error),
}

#[allow(clippy::type_complexity)]
pub fn deserialize_attestation_payload<T: BeaconChainTypes>(
payload: Value,
fork_name: Option<ForkName>,
log: &Logger,
) -> Result<Vec<Either<Attestation<T::EthSpec>, SingleAttestation>>, Error> {
if fork_name.is_some_and(|fork_name| fork_name.electra_enabled()) || fork_name.is_none() {
if fork_name.is_none() {
warn!(
log,
"No Consensus Version header specified.";
);
}

Ok(serde_json::from_value::<Vec<SingleAttestation>>(payload)
.map_err(Error::InvalidJson)?
.into_iter()
.map(Either::Right)
.collect())
} else {
Ok(
serde_json::from_value::<Vec<Attestation<T::EthSpec>>>(payload)
.map_err(Error::InvalidJson)?
.into_iter()
.map(Either::Left)
.collect(),
)
}
}

fn verify_and_publish_attestation<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
either_attestation: &Either<Attestation<T::EthSpec>, SingleAttestation>,
Expand Down Expand Up @@ -163,12 +195,12 @@ fn convert_to_attestation<'a, T: BeaconChainTypes>(
|committee_cache, _| {
let Some(committee) = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
single_attestation.committee_index,
) else {
return Err(BeaconChainError::AttestationError(
types::AttestationError::NoCommitteeForSlotAndIndex {
slot: single_attestation.data.slot,
index: single_attestation.committee_index as u64,
index: single_attestation.committee_index,
},
));
};
Expand Down Expand Up @@ -199,7 +231,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
.iter()
.map(|att| match att {
Either::Left(att) => (att.data().slot, att.committee_index()),
Either::Right(att) => (att.data.slot, Some(att.committee_index as u64)),
Either::Right(att) => (att.data.slot, Some(att.committee_index)),
})
.collect::<Vec<_>>();

Expand Down
5 changes: 4 additions & 1 deletion beacon_node/http_api/tests/interactive_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use beacon_chain::{
ChainConfig,
};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use either::Either;
use eth2::types::ProduceBlockV3Response;
use eth2::types::{DepositContractData, StateId};
use execution_layer::{ForkchoiceState, PayloadAttributes};
Expand Down Expand Up @@ -906,9 +907,11 @@ async fn queue_attestations_from_http() {
.flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att))
.collect::<Vec<_>>();

let attestations = Either::Right(single_attestations);

tokio::spawn(async move {
client
.post_beacon_pool_attestations_v2(&single_attestations, fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.expect("attestations should be processed successfully")
})
Expand Down
32 changes: 24 additions & 8 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use beacon_chain::{
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
BeaconChain, ChainConfig, StateSkipConfig, WhenSlotSkipped,
};
use either::Either;
use eth2::{
mixin::{RequestAccept, ResponseForkName, ResponseOptional},
reqwest::RequestBuilder,
Expand Down Expand Up @@ -1810,12 +1811,25 @@ impl ApiTester {
self
}

pub async fn test_post_beacon_pool_attestations_valid_v1(mut self) -> Self {
pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self {
self.client
.post_beacon_pool_attestations_v1(self.attestations.as_slice())
.await
.unwrap();

let fork_name = self
.attestations
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();

let attestations = Either::Left(self.attestations.clone());

self.client
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();

assert!(
self.network_rx.network_recv.recv().await.is_some(),
"valid attestation should be sent to network"
Expand All @@ -1833,8 +1847,10 @@ impl ApiTester {
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data.slot))
.unwrap();

let attestations = Either::Right(self.single_attestations.clone());
self.client
.post_beacon_pool_attestations_v2(self.single_attestations.as_slice(), fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();
assert!(
Expand Down Expand Up @@ -1900,10 +1916,10 @@ impl ApiTester {
.first()
.map(|att| self.chain.spec.fork_name_at_slot::<E>(att.data().slot))
.unwrap();

let attestations = Either::Right(attestations);
let err_v2 = self
.client
.post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap_err();

Expand Down Expand Up @@ -6054,9 +6070,9 @@ impl ApiTester {
.chain
.spec
.fork_name_at_slot::<E>(self.chain.slot().unwrap());

let attestations = Either::Right(self.single_attestations.clone());
self.client
.post_beacon_pool_attestations_v2(&self.single_attestations, fork_name)
.post_beacon_pool_attestations_v2::<E>(attestations, fork_name)
.await
.unwrap();

Expand Down Expand Up @@ -6375,10 +6391,10 @@ async fn post_beacon_blocks_duplicate() {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_attestations_valid_v1() {
async fn beacon_pools_post_attestations_valid() {
ApiTester::new()
.await
.test_post_beacon_pool_attestations_valid_v1()
.test_post_beacon_pool_attestations_valid()
.await;
}

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|committee_cache, _| {
let Some(committee) = committee_cache.get_beacon_committee(
single_attestation.data.slot,
single_attestation.committee_index as u64,
single_attestation.committee_index,
) else {
warn!(
self.log,
Expand Down
Loading

0 comments on commit 276eda3

Please sign in to comment.