Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7914128
Implement Cell Dissemination
dknopik Oct 28, 2025
424dc79
Merge branch 'unstable' into partial-columns
dknopik Oct 28, 2025
303c638
fix compilation
dknopik Oct 29, 2025
ad85059
fix some lints
dknopik Oct 29, 2025
7b8cb0d
impl get_blobs_v2 fallback
dknopik Oct 29, 2025
efa9366
update gossipsub
dknopik Oct 29, 2025
56f7a1f
remove redundant log
dknopik Oct 29, 2025
2a721d7
try to upgrade column using block immediately
dknopik Oct 29, 2025
5709408
publish data after block availability
dknopik Oct 29, 2025
02194da
fix remaining lints and release test compilation
dknopik Oct 29, 2025
bd8000d
implement caching of partial messages
dknopik Oct 31, 2025
bb575ad
update gossipsub
dknopik Oct 31, 2025
eeaf03b
add CLI flags for disable request and disable support
dknopik Oct 31, 2025
76eb32e
Merge branch 'unstable' into partial-columns
dknopik Oct 31, 2025
47a4119
update CLI docs
dknopik Oct 31, 2025
65c236a
do not error for column on failed observation check
dknopik Oct 31, 2025
ae9f493
Fix get blobs v2 tests
dknopik Oct 31, 2025
8442bf3
Reintroduce sanity check for get blobs v2
dknopik Oct 31, 2025
7c68ba4
Only count actually received blobs
dknopik Oct 31, 2025
09e220b
Log full vs partial
dknopik Oct 31, 2025
7cbb5c6
fix and improve logging
dknopik Oct 31, 2025
b6626f6
update gossipsub
dknopik Nov 3, 2025
fb956b9
Merge branch 'refs/heads/unstable' into partial-columns
dknopik Nov 3, 2025
f0b7d8f
look for block in data availability checker
dknopik Nov 3, 2025
99d3568
re-enable SSE
dknopik Nov 3, 2025
646fef5
Properly handly observing in new_for_block_publishing
dknopik Nov 3, 2025
6794e5f
Fix cache and better logging
dknopik Nov 5, 2025
e98dce0
Fix invalid `UnexpectedDataColumn` error
dknopik Nov 5, 2025
5e1f05d
remove custom cache in favour of reprocessing queue
dknopik Nov 5, 2025
7067c7a
Merge branch 'unstable' into partial-columns
dknopik Nov 5, 2025
695c648
publish partials on http API block
dknopik Nov 5, 2025
6fb3721
experimentally full eager publish for API blocks
dknopik Nov 5, 2025
870fa0d
experimental libp2p changes
dknopik Nov 10, 2025
995344e
joaos libp2p changes
dknopik Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 72 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ fs2 = "0.4"
futures = "0.3"
genesis = { path = "beacon_node/genesis" }
# This is tracking the sigp-gossipsub branch on sigp/rust-libp2p commit: Aug 20 2025
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/sigp/rust-libp2p.git", rev = "5acdf89a65d64098f9346efa5769e57bcd19dea9", "features" = ["metrics"] }
gossipsub = { package = "libp2p-gossipsub", git = "https://github.com/jxs/rust-libp2p.git", branch = "gossipsub-partial-messages", features = ["partial_messages", "metrics"] }
graffiti_file = { path = "validator_client/graffiti_file" }
hashlink = "0.9.0"
health_metrics = { path = "common/health_metrics" }
Expand Down
83 changes: 59 additions & 24 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::chain_config::ChainConfig;
use crate::custody_context::CustodyContextSsz;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
DataAvailabilityChecker, DataColumnReconstructionResult,
DataAvailabilityChecker, DataColumnReconstructionResult, MergedData,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -130,7 +130,9 @@ use tokio_stream::Stream;
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::das_column::DasColumn;
use types::data_column_sidecar::ColumnIndex;
use types::partial_data_column_sidecar::VerifiablePartialDataColumn;
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -2200,14 +2202,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: DataColumnSubnetId,
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError> {
) -> Result<GossipVerifiedDataColumn<T, DataColumnSidecar<T::EthSpec>>, GossipDataColumnError>
{
metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).inspect(|_| {
metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES);
})
}

#[instrument(skip_all, level = "trace")]
pub fn verify_partial_data_column_sidecar_for_gossip(
self: &Arc<Self>,
data_column_sidecar: Arc<VerifiablePartialDataColumn<T::EthSpec>>,
) -> Result<
GossipVerifiedDataColumn<T, VerifiablePartialDataColumn<T::EthSpec>>,
GossipDataColumnError,
> {
metrics::inc_counter(&metrics::PARTIAL_DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer =
metrics::start_timer(&metrics::PARTIAL_DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumn::new_partial(data_column_sidecar, self).inspect(|_| {
metrics::inc_counter(&metrics::PARTIAL_DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES);
})
}

#[instrument(skip_all, level = "trace")]
pub fn verify_blob_sidecar_for_gossip(
self: &Arc<Self>,
Expand Down Expand Up @@ -2916,6 +2935,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
notify_execution_layer,
BlockImportSource::RangeSync,
|| Ok(()),
|_| (),
)
.await
{
Expand Down Expand Up @@ -3048,10 +3068,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
#[instrument(skip_all, level = "debug")]
pub async fn process_gossip_data_columns(
pub async fn process_gossip_data_columns<C: DasColumn<T::EthSpec>>(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
data_columns: Vec<GossipVerifiedDataColumn<T, C>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
data_publish_fn: impl FnOnce(MergedData<T::EthSpec>),
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = data_columns
.iter()
Expand Down Expand Up @@ -3084,6 +3105,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root,
data_columns,
publish_fn,
data_publish_fn,
)
.await
}
Expand Down Expand Up @@ -3182,26 +3204,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

fn emit_sse_data_column_sidecar_events<'a, I>(
fn emit_sse_data_column_sidecar_events<'a, I, C>(
self: &Arc<Self>,
block_root: &Hash256,
data_columns_iter: I,
) where
I: Iterator<Item = &'a DataColumnSidecar<T::EthSpec>>,
I: Iterator<Item = &'a C>,
C: DasColumn<T::EthSpec> + 'a,
{
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_data_column_sidecar_subscribers()
{
let imported_data_columns = self
.data_availability_checker
.cached_data_column_indexes(block_root)
.get_data_columns(*block_root)
.unwrap_or_default();
let new_data_columns =
data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index));

for data_column in new_data_columns {
let new_data_columns_indices = data_columns_iter.map(|c| c.index()).collect::<Vec<_>>();

for data_column in imported_data_columns
.into_iter()
.filter(|c| new_data_columns_indices.contains(&c.index))
{
event_handler.register(EventKind::DataColumnSidecar(
SseDataColumnSidecar::from_data_column_sidecar(data_column),
SseDataColumnSidecar::from_data_column_sidecar(data_column.as_ref()),
));
}
}
Expand Down Expand Up @@ -3343,6 +3369,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
notify_execution_layer: NotifyExecutionLayer,
block_source: BlockImportSource,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
data_publish_fn: impl FnOnce(MergedData<T::EthSpec>),
) -> Result<AvailabilityProcessingStatus, BlockError> {
let block_slot = unverified_block.block().slot();

Expand Down Expand Up @@ -3412,7 +3439,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
self.check_block_availability_and_import(block, data_publish_fn)
.await
}
}
};
Expand Down Expand Up @@ -3522,9 +3550,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_block_availability_and_import(
self: &Arc<Self>,
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
data_publish_fn: impl FnOnce(MergedData<T::EthSpec>),
) -> Result<AvailabilityProcessingStatus, BlockError> {
let slot = block.block.slot();
let availability = self.data_availability_checker.put_executed_block(block)?;
let availability = self
.data_availability_checker
.put_executed_block(block, data_publish_fn)?;
self.process_availability(slot, availability, || Ok(()))
.await
}
Expand All @@ -3549,22 +3580,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_columns_availability_and_import(
async fn check_gossip_data_columns_availability_and_import<C: DasColumn<T::EthSpec>>(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
data_columns: Vec<GossipVerifiedDataColumn<T, C>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
data_publish_fn: impl FnOnce(MergedData<T::EthSpec>),
) -> Result<AvailabilityProcessingStatus, BlockError> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
for header in data_columns.iter().filter_map(|c| c.signed_block_header()) {
slasher.accept_block_header(header.clone());
}
}

let availability = self
.data_availability_checker
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
.put_gossip_verified_data_columns(block_root, slot, data_columns, data_publish_fn)?;

self.process_availability(slot, availability, publish_fn)
.await
Expand Down Expand Up @@ -3641,7 +3673,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_columns.iter().map(|c| c.as_data_column()),
)?;
self.data_availability_checker
.put_kzg_verified_custody_data_columns(block_root, data_columns)?
.put_kzg_verified_custody_data_columns(block_root, data_columns, |_| ())?
}
};

Expand Down Expand Up @@ -3674,24 +3706,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
}

fn check_data_column_sidecar_header_signature_and_slashability<'a>(
fn check_data_column_sidecar_header_signature_and_slashability<
'a,
C: DasColumn<T::EthSpec> + 'a,
>(
self: &Arc<Self>,
block_root: Hash256,
custody_columns: impl IntoIterator<Item = &'a DataColumnSidecar<T::EthSpec>>,
custody_columns: impl IntoIterator<Item = &'a C>,
) -> Result<(), BlockError> {
let mut slashable_cache = self.observed_slashable.write();
// Process all unique block headers - previous logic assumed all headers were identical and
// only processed the first one. However, we should not make assumptions about data received
// from RPC.
for header in custody_columns
.into_iter()
.map(|c| c.signed_block_header.clone())
.filter_map(|c| c.signed_block_header())
.unique()
{
// Return an error if *any* header signature is invalid, we do not want to import this
// list of blobs into the DA checker. However, we will process any valid headers prior
// to the first invalid header in the slashable cache & slasher.
verify_header_signature::<T, BlockError>(self, &header)?;
verify_header_signature::<T, BlockError>(self, header)?;

slashable_cache
.observe_slashable(
Expand All @@ -3701,7 +3736,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
.map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
slasher.accept_block_header(header.clone());
}
}
Ok(())
Expand Down
Loading