diff --git a/ethexe/consensus/src/validator/batch/filler.rs b/ethexe/consensus/src/validator/batch/filler.rs index 9fcc74f8a1b..d1e4a70100c 100644 --- a/ethexe/consensus/src/validator/batch/filler.rs +++ b/ethexe/consensus/src/validator/batch/filler.rs @@ -16,16 +16,23 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use super::types::{BatchLimits, BatchParts, BatchSizeCounter, ValidationRejectReason}; - +use super::types::{BatchParts, BatchSizeCounter}; use ethexe_common::gear::{ ChainCommitment, CodeCommitment, RewardsCommitment, ValidatorsCommitment, }; +#[derive(Debug, derive_more::Display, Clone, Copy, PartialEq, Eq)] +pub enum BatchIncludeError { + #[display("batch size limit exceeded")] + SizeLimitExceeded, +} + +type FillerResult = Result<(), BatchIncludeError>; + // TODO #5356: squash transitions before charging size so repeated actors are // counted against the actual committed payload rather than the pre-squash input. /// Stateful helper used by [`BatchCommitmentManager`](super::manager::BatchCommitmentManager) -/// to assemble a candidate batch commitment under protocol size and deepness limits. +/// to assemble a candidate batch commitment under protocol size limits. /// /// The manager decides which commitments are eligible, while `BatchFiller` /// tracks the accumulated parts and rejects additions that would exceed the @@ -34,51 +41,22 @@ use ethexe_common::gear::{ pub struct BatchFiller { /// Parts accumulated for the candidate batch being assembled. parts: BatchParts, - /// Protocol limits that decide whether candidate parts may be included. - limits: BatchLimits, /// Running payload budget for the ABI-encoded batch commitment. size_counter: BatchSizeCounter, } -#[derive(Debug, derive_more::Display, Clone, Copy, PartialEq, Eq)] -pub enum BatchIncludeError { - #[display("batch size limit exceeded")] - SizeLimitExceeded, -} - -impl From for ValidationRejectReason { - fn from(value: BatchIncludeError) -> Self { - match value { - BatchIncludeError::SizeLimitExceeded => Self::BatchSizeLimitExceeded, - } - } -} - -type FillerResult = Result<(), BatchIncludeError>; - impl BatchFiller { - pub fn new(limits: BatchLimits) -> Self { + pub fn new(batch_size_limit: u64) -> Self { Self { parts: BatchParts::default(), - size_counter: BatchSizeCounter::new(limits.batch_size_limit), - limits, - } - } - - pub fn into_parts(mut self) -> BatchParts { - if let Some(chain) = &mut self.parts.chain_commitment { - chain.transitions = - super::utils::squash_transitions_by_actor(std::mem::take(&mut chain.transitions)); - super::utils::sort_transitions_by_value_to_receive(&mut chain.transitions); + size_counter: BatchSizeCounter::new(batch_size_limit), } - self.parts } pub fn include_validators_commitment( &mut self, commitment: ValidatorsCommitment, ) -> FillerResult { - let commitment = Some(commitment); if !self .size_counter .charge_for_validators_commitment(&commitment) @@ -86,17 +64,16 @@ impl BatchFiller { return Err(BatchIncludeError::SizeLimitExceeded); } - self.parts.validators_commitment = commitment; + self.parts.validators_commitment = Some(commitment); Ok(()) } pub fn include_rewards_commitment(&mut self, commitment: RewardsCommitment) -> FillerResult { - let commitment = Some(commitment); if !self.size_counter.charge_for_rewards_commitment(&commitment) { return Err(BatchIncludeError::SizeLimitExceeded); } - self.parts.rewards_commitment = commitment; + self.parts.rewards_commitment = Some(commitment); Ok(()) } @@ -109,11 +86,7 @@ impl BatchFiller { Ok(()) } - pub fn include_chain_commitment( - &mut self, - commitment: ChainCommitment, - deepness: u32, - ) -> FillerResult { + pub fn include_chain_commitment(&mut self, commitment: ChainCommitment) -> FillerResult { match self.parts.chain_commitment.as_mut() { Some(chain_commitment) => { // Once the chain header is present, only appended transitions consume extra space. @@ -123,27 +96,82 @@ impl BatchFiller { { return Err(BatchIncludeError::SizeLimitExceeded); } + chain_commitment.head_announce = commitment.head_announce; chain_commitment.transitions.extend(commitment.transitions); } None => { - // NOTE: Empty transition chains are skipped until they become old enough to force inclusion. - if !self.should_include_chain_commitment(&commitment, deepness) { - return Ok(()); - } - - let commitment = Some(commitment); if !self.size_counter.charge_for_chain_commitment(&commitment) { return Err(BatchIncludeError::SizeLimitExceeded); } - self.parts.chain_commitment = commitment; + + self.parts.chain_commitment = Some(commitment); } } + Ok(()) } - fn should_include_chain_commitment(&self, commitment: &ChainCommitment, deepness: u32) -> bool { - // A deep enough chain must eventually be committed even if it carries no transitions. - !commitment.transitions.is_empty() || deepness + 1 > self.limits.chain_deepness_threshold + pub fn into_parts(self) -> BatchParts { + self.parts + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + mock::{test_chain_commitment, test_code_commitment, test_state_transition}, + validator::batch::BatchManagerConfig, + }; + use alloy::sol_types::SolValue; + use ethexe_common::{Announce, HashOf}; + use ethexe_ethereum::abi::Gear; + + #[test] + fn size_limit_rejects_once_budget_exhausted() { + let first = test_code_commitment(1); + let one_encoded: Gear::CodeCommitment = first.clone().into(); + // Budget fits exactly one commitment, so the second include must be rejected. + let mut filler = BatchFiller::new(one_encoded.abi_encoded_size() as u64); + + filler.include_code_commitment(first.clone()).unwrap(); + assert_eq!( + filler.include_code_commitment(test_code_commitment(2)), + Err(BatchIncludeError::SizeLimitExceeded), + ); + + let parts = filler.into_parts(); + assert_eq!( + parts.code_commitments, + vec![first], + "rejected commitment must not leak into parts", + ); + } + + #[test] + fn include_chain_commitment_merges_transitions() { + let mut filler = BatchFiller::new(BatchManagerConfig::default().batch_size_limit); + + let head_1 = HashOf::::random(); + let head_2 = HashOf::::random(); + + let first = test_chain_commitment(head_1, 1); + filler.include_chain_commitment(first.clone()).unwrap(); + + let second = ChainCommitment { + head_announce: head_2, + transitions: vec![test_state_transition(999)], + }; + filler.include_chain_commitment(second.clone()).unwrap(); + + let chain = filler.into_parts().chain_commitment.unwrap(); + assert_eq!( + chain.head_announce, head_2, + "second include must advance head_announce to the new tip", + ); + let mut expected_transitions = first.transitions; + expected_transitions.extend(second.transitions); + assert_eq!(chain.transitions, expected_transitions); } } diff --git a/ethexe/consensus/src/validator/batch/manager.rs b/ethexe/consensus/src/validator/batch/manager.rs index eb8ed2ce11d..1d4062956ca 100644 --- a/ethexe/consensus/src/validator/batch/manager.rs +++ b/ethexe/consensus/src/validator/batch/manager.rs @@ -16,62 +16,60 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use super::types::{BatchLimits, CodeNotValidatedError, ValidationRejectReason, ValidationStatus}; +use super::{ + BatchManagerConfig, + filler::BatchFiller, + types::{BatchParts, ValidationRejectReason, ValidationStatus}, + utils, +}; use crate::{ announces, - validator::{ - batch::{filler::BatchFiller, types::BatchParts, utils}, - core::{ElectionRequest, MiddlewareWrapper}, - }, + validator::core::{ElectionRequest, MiddlewareWrapper}, }; - use alloy::sol_types::SolValue; -use anyhow::{Result, anyhow, bail}; +use anyhow::{Context, Result, anyhow, bail}; use ethexe_common::{ Announce, HashOf, SimpleBlockData, ToDigest, consensus::BatchCommitmentValidationRequest, db::{AnnounceStorageRO, BlockMetaStorageRO, ConfigStorageRO, OnChainStorageRO}, - gear::{BatchCommitment, ChainCommitment, RewardsCommitment, ValidatorsCommitment}, + gear::{ + BatchCommitment, ChainCommitment, CodeCommitment, RewardsCommitment, ValidatorsCommitment, + }, }; use ethexe_db::Database; use ethexe_ethereum::abi::Gear; +use gprimitives::{CodeId, H256}; use hashbrown::HashSet; #[derive(derive_more::Debug, Clone)] pub struct BatchCommitmentManager { - /// Limits for batch building and verifying - limits: BatchLimits, /// The ethexe database instance. #[debug(skip)] db: Database, /// The ethexe middleware for validators election. #[debug(skip)] middleware: MiddlewareWrapper, + /// Batch manager configuration. + config: BatchManagerConfig, } impl BatchCommitmentManager { /// Creates a new instance of batch commitment manager. - pub fn new(limits: BatchLimits, db: Database, middleware: MiddlewareWrapper) -> Self { + pub fn new(db: Database, middleware: MiddlewareWrapper, config: BatchManagerConfig) -> Self { Self { - limits, db, middleware, + config, } } - /// Replaces current limits with `new_limits` and returns the previous limits. - #[cfg(test)] - pub fn replace_limits(&mut self, new_limits: BatchLimits) -> BatchLimits { - std::mem::replace(&mut self.limits, new_limits) - } - /// Creates a new [`BatchCommitment`] for producer. pub async fn create_batch_commitment( self, block: SimpleBlockData, announce_hash: HashOf, ) -> Result> { - let mut batch_filler = BatchFiller::new(self.limits.clone()); + let mut batch_filler = BatchFiller::new(self.config.batch_size_limit); if let Some(validators_commitment) = self.aggregate_validators_commitment(&block).await? && let Err(err) = batch_filler.include_validators_commitment(validators_commitment) @@ -86,37 +84,38 @@ impl BatchCommitmentManager { } // NOTE: we prioritize state transitions over code commitments. So include them firstly. - super::utils::try_include_chain_commitment( + let waiting_for_commitment_amount = utils::aggregate_chain_commitment( &self.db, block.hash, announce_hash, &mut batch_filler, )?; - let queue = self.db.block_meta(block.hash).codes_queue.ok_or_else(|| { - anyhow!( - "Computed block {} codes queue is not in storage", - block.hash - ) - })?; - let code_commitments = super::utils::aggregate_code_commitments(&self.db, queue, false) - .expect("not errors because, fail_if_not_found is set to false"); + utils::aggregate_code_commitments_for_block(&self.db, block.hash, &mut batch_filler)?; - for commitment in code_commitments { - if let Err(err) = batch_filler.include_code_commitment(commitment) { - tracing::trace!( - "failed to include all code commitments into batch, because of error={err}" - ); - break; - } + let mut parts = batch_filler.into_parts(); + if let Some(chain_commitment) = &parts.chain_commitment + && chain_commitment.transitions.is_empty() + && waiting_for_commitment_amount <= self.config.chain_deepness_threshold as usize + && parts.code_commitments.is_empty() + && parts.validators_commitment.is_none() + && parts.rewards_commitment.is_none() + { + // if transitions are empty and not enough waiting announces + // and all other parts are empty as well, + // then we remove chain commitment in order to skip batch committing for this block. + tracing::trace!( + block = %block.hash, + announce = %announce_hash, + waiting_for_commitment_amount, + commitment = ?chain_commitment, + "Remove chain commitment" + ); + + parts.chain_commitment = None; } - super::utils::create_batch_commitment( - &self.db, - &block, - batch_filler.into_parts(), - self.limits.commitment_delay_limit, - ) + utils::create_batch_commitment(&self.db, &block, parts, self.config.commitment_delay_limit) } pub async fn validate_batch_commitment( @@ -133,13 +132,6 @@ impl BatchCommitmentManager { } = &request; let mut batch_parts = BatchParts::default(); - if crate::utils::has_duplicates(codes.as_slice()) { - return Ok(ValidationStatus::Rejected { - request, - reason: ValidationRejectReason::CodesHasDuplicates, - }); - } - if validators { match self.aggregate_validators_commitment(&block).await? { Some(commitment) => batch_parts.validators_commitment = Some(commitment), @@ -164,124 +156,30 @@ impl BatchCommitmentManager { } } - let waiting_codes = self - .db - .block_meta(block.hash) - .codes_queue - .ok_or_else(|| anyhow!("codes queue not found for block={}", block.hash))? - .into_iter() - .collect::>(); - - if let Some(&code_id) = codes.iter().find(|&id| !waiting_codes.contains(id)) { - return Ok(ValidationStatus::Rejected { - request, - reason: ValidationRejectReason::CodeNotWaitingForCommitment(code_id), - }); + if let Some(head_announce_hash) = head { + batch_parts.chain_commitment = + match self.validate_chain_commitment(block.hash, head_announce_hash)? { + CommitmentValidationStatus::Valid(chain_commitment) => Some(chain_commitment), + CommitmentValidationStatus::Invalid(reason) => { + return Ok(ValidationStatus::Rejected { request, reason }); + } + }; } - match super::utils::aggregate_code_commitments(&self.db, codes.iter().copied(), true) { - Ok(commitments) => batch_parts.code_commitments = commitments, - Err(CodeNotValidatedError(code_id)) => { - return Ok(ValidationStatus::Rejected { - request, - reason: ValidationRejectReason::CodeIsNotProcessedYet(code_id), - }); + batch_parts.code_commitments = match self.validate_code_commitments(block.hash, codes)? { + CommitmentValidationStatus::Valid(code_commitments) => code_commitments, + CommitmentValidationStatus::Invalid(reason) => { + return Ok(ValidationStatus::Rejected { request, reason }); } }; - if let Some(announce) = head { - // Head announce in validation request is best for `block`. - // This guarantees that announce is successor of last committed announce at `block`, - // but does not guarantee that announce is computed by this node. - if !self.db.announce_meta(announce).computed { - return Ok(ValidationStatus::Rejected { - request, - reason: ValidationRejectReason::HeadAnnounceNotComputed(announce), - }); - } - - let candidates = self - .db - .block_meta(block.hash) - .announces - .into_iter() - .flatten(); - - let best_announce_hash = - announces::best_announce(&self.db, candidates, self.limits.commitment_delay_limit)?; - - let Some(last_committed_announce) = - self.db.block_meta(block.hash).last_committed_announce - else { - anyhow::bail!( - "Last committed announce not found in db for prepared block: {}", - block.hash - ); - }; - - let not_committed_announces = match utils::collect_not_committed_predecessors( - &self.db, - last_committed_announce, - best_announce_hash, - ) { - Ok(announces) => announces, - Err(err) => { - tracing::debug!( - block = %block.hash, - best_announce = %best_announce_hash, - error = %err, - "failed to collect not committed predecessors for best announce during batch validation" - ); - return Ok(ValidationStatus::Rejected { - request, - reason: ValidationRejectReason::BestHeadAnnounceChainInvalid( - best_announce_hash, - ), - }); - } - }; - - if !not_committed_announces.contains(&announce) { - return Ok(ValidationStatus::Rejected { - request, - reason: ValidationRejectReason::HeadAnnounceIsNotFromBestChain { - requested: announce, - best: best_announce_hash, - }, - }); - } - // Set firstly for current announce. - let mut chain_commitment = ChainCommitment { - transitions: Vec::new(), - head_announce: announce, - }; - for announce_hash in not_committed_announces.into_iter() { - let Some(transitions) = self.db.announce_outcome(announce_hash) else { - anyhow::bail!("Computed announce {announce_hash:?} outcome not found in db"); - }; - chain_commitment.transitions.extend(transitions); - if announce_hash == announce { - break; - } - } - chain_commitment.transitions = super::utils::squash_transitions_by_actor( - std::mem::take(&mut chain_commitment.transitions), - ); - super::utils::sort_transitions_by_value_to_receive(&mut chain_commitment.transitions); - batch_parts.chain_commitment = Some(chain_commitment); - } - - let Some(batch) = super::utils::create_batch_commitment( + let Some(batch) = utils::create_batch_commitment( &self.db, &block, batch_parts, - self.limits.commitment_delay_limit, + self.config.commitment_delay_limit, )? else { - tracing::warn!( - "Batch commitment is empty for block({:?}), rejecting batch", - block.hash - ); return Ok(ValidationStatus::Rejected { request, reason: ValidationRejectReason::EmptyBatch, @@ -300,7 +198,7 @@ impl BatchCommitmentManager { } let batch_encoded_size = Gear::BatchCommitment::from(batch).abi_encoded_size() as u64; - if batch_encoded_size > self.limits.batch_size_limit { + if batch_encoded_size > self.config.batch_size_limit { return Ok(ValidationStatus::Rejected { request, reason: ValidationRejectReason::BatchSizeLimitExceeded, @@ -310,7 +208,106 @@ impl BatchCommitmentManager { Ok(ValidationStatus::Accepted(digest)) } - pub async fn aggregate_validators_commitment( + #[cfg(test)] + pub(super) fn replace_config(&mut self, mut config: BatchManagerConfig) -> BatchManagerConfig { + std::mem::swap(&mut config, &mut self.config); + config + } + + fn validate_chain_commitment( + &self, + block_hash: H256, + head_announce_hash: HashOf, + ) -> Result> { + if !self.db.announce_meta(head_announce_hash).computed { + return Ok(ValidationRejectReason::HeadAnnounceNotComputed(head_announce_hash).into()); + } + + let candidates = self + .db + .block_meta(block_hash) + .announces + .into_iter() + .flatten(); + + let best_announce_hash = + announces::best_announce(&self.db, candidates, self.config.commitment_delay_limit)?; + + let last_committed_announce_hash = self + .db + .block_meta(block_hash) + .last_committed_announce + .with_context(|| { + format!("Last committed announce not found for prepared block: {block_hash}") + })?; + + let best_chain = utils::collect_pending_announces( + &self.db, + best_announce_hash, + last_committed_announce_hash, + )?; + + if !best_chain.contains(&head_announce_hash) { + return Ok(ValidationRejectReason::HeadAnnounceIsNotFromBestChain { + requested: head_announce_hash, + best: best_announce_hash, + } + .into()); + } + + // Checks done, collect transitions from the oldest pending announce up to + // and including the requested head announce — the producer includes head's + // own outcome, so the validator must mirror that to reach the same digest. + let mut transitions = Vec::new(); + for announce_hash in best_chain { + transitions.extend( + self.db + .announce_outcome(announce_hash) + .with_context(|| format!("Outcome not found for {announce_hash:?}"))?, + ); + if announce_hash == head_announce_hash { + break; + } + } + + Ok(CommitmentValidationStatus::Valid(ChainCommitment { + head_announce: head_announce_hash, + transitions, + })) + } + + fn validate_code_commitments( + &self, + block_hash: H256, + code_ids: &[CodeId], + ) -> Result>> { + if crate::utils::has_duplicates(code_ids) { + return Ok(ValidationRejectReason::CodesHaveDuplicates.into()); + } + + let waiting_codes = self + .db + .block_meta(block_hash) + .codes_queue + .ok_or_else(|| anyhow!("codes queue not found for block={block_hash}"))? + .into_iter() + .collect::>(); + + if let Some(&code_id) = code_ids.iter().find(|&id| !waiting_codes.contains(id)) { + return Ok(ValidationRejectReason::CodeNotWaitingForCommitment(code_id).into()); + } + + let code_commitments = + utils::aggregate_code_commitments(&self.db, code_ids.iter().copied()); + + if code_commitments.len() != code_ids.len() { + return Ok(ValidationRejectReason::SomeCodesNotProcessed.into()); + } + + Ok(CommitmentValidationStatus::Valid(code_commitments)) + } + + pub(super) async fn aggregate_validators_commitment( &self, block: &SimpleBlockData, ) -> Result> { @@ -351,7 +348,7 @@ impl BatchCommitmentManager { } else if latest_era_validators_committed > block_era + 1 { // This case considered as restricted, // because validators cannot be committed for eras later than the next one - anyhow::bail!("validators was committed for an era later than the next one"); + bail!("validators was committed for an era later than the next one"); } else if latest_era_validators_committed < block_era { tracing::warn!( current_era = %block_era, @@ -440,10 +437,17 @@ impl BatchCommitmentManager { } // TODO #4742 - pub async fn aggregate_rewards_commitment( + pub(super) async fn aggregate_rewards_commitment( &self, _block: &SimpleBlockData, ) -> Result> { Ok(None) } } + +#[derive(derive_more::From)] +enum CommitmentValidationStatus { + #[from(skip)] + Valid(C), + Invalid(ValidationRejectReason), +} diff --git a/ethexe/consensus/src/validator/batch/mod.rs b/ethexe/consensus/src/validator/batch/mod.rs index 21068f1e36c..d93cae6cbf8 100644 --- a/ethexe/consensus/src/validator/batch/mod.rs +++ b/ethexe/consensus/src/validator/batch/mod.rs @@ -20,7 +20,7 @@ mod manager; pub use manager::BatchCommitmentManager; mod types; -pub use types::{BatchLimits, ValidationStatus}; +pub use types::{BatchManagerConfig, ValidationStatus}; mod filler; diff --git a/ethexe/consensus/src/validator/batch/tests.rs b/ethexe/consensus/src/validator/batch/tests.rs index eee68fd8f77..c4c45e84730 100644 --- a/ethexe/consensus/src/validator/batch/tests.rs +++ b/ethexe/consensus/src/validator/batch/tests.rs @@ -23,7 +23,7 @@ use super::types::{ValidationRejectReason, ValidationStatus}; use crate::{ mock::*, validator::{ - batch::{BatchLimits, types::BatchParts}, + batch::{BatchManagerConfig, types::BatchParts}, mock::*, }, }; @@ -113,7 +113,7 @@ async fn rejects_duplicate_code_ids() { assert_eq!( unwrap_rejected_reason(status), - ValidationRejectReason::CodesHasDuplicates + ValidationRejectReason::CodesHaveDuplicates ); } @@ -181,20 +181,21 @@ async fn rejects_non_best_chain_head() { #[tokio::test] #[ntest::timeout(3000)] -async fn rejects_when_best_head_chain_is_invalid() { +async fn rejects_head_announce_not_computed() { gear_utils::init_default_logger(); let (ctx, _, _) = mock_validator_context(Database::memory()); let batch = prepare_chain_for_batch_commitment(&ctx.core.db); let block = ctx.core.db.simple_block_data(batch.block_hash); - let request = BatchCommitmentValidationRequest::new(&batch); - let head = request.head.expect("expect head"); - - ctx.core.db.mutate_block_meta(block.hash, |meta| { - meta.last_committed_announce = Some(HashOf::random()); - }); + let head = batch.chain_commitment.as_ref().unwrap().head_announce; + // Flip the announce meta back to "not computed" to simulate a peer that + // advertises an announce this node hasn't finished executing yet. + ctx.core + .db + .mutate_announce_meta(head, |meta| meta.computed = false); + let request = BatchCommitmentValidationRequest::new(&batch); let status = ctx .core .batch_manager @@ -204,7 +205,7 @@ async fn rejects_when_best_head_chain_is_invalid() { assert_eq!( unwrap_rejected_reason(status), - ValidationRejectReason::BestHeadAnnounceChainInvalid(head) + ValidationRejectReason::HeadAnnounceNotComputed(head), ); } @@ -295,7 +296,7 @@ async fn rejects_code_not_processed_yet() { assert_eq!( unwrap_rejected_reason(status), - ValidationRejectReason::CodeIsNotProcessedYet(code_id) + ValidationRejectReason::SomeCodesNotProcessed, ); } @@ -352,11 +353,11 @@ async fn rejects_batch_commitment_size_limit_exceeded() { { // Rebuilding batch with higher size_limits. - let new_limits = BatchLimits { + let new_config = BatchManagerConfig { batch_size_limit: DEFAULT_BATCH_SIZE_LIMIT + 10_000_000, ..Default::default() }; - let previous_limits = ctx.core.batch_manager.replace_limits(new_limits); + let previous_config = ctx.core.batch_manager.replace_config(new_config); let batch = ctx .core @@ -367,8 +368,8 @@ async fn rejects_batch_commitment_size_limit_exceeded() { .unwrap() .unwrap(); - // Set previous limits for validation. - ctx.core.batch_manager.replace_limits(previous_limits); + // Set previous config for validation. + ctx.core.batch_manager.replace_config(previous_config); let request = BatchCommitmentValidationRequest::new(&batch); let status = ctx diff --git a/ethexe/consensus/src/validator/batch/types.rs b/ethexe/consensus/src/validator/batch/types.rs index bab6d419de9..cc22c9f2960 100644 --- a/ethexe/consensus/src/validator/batch/types.rs +++ b/ethexe/consensus/src/validator/batch/types.rs @@ -18,11 +18,8 @@ use alloy::sol_types::SolValue; use ethexe_common::{ - Announce, COMMITMENT_DELAY_LIMIT, Digest, HashOf, - consensus::{ - BatchCommitmentValidationRequest, DEFAULT_BATCH_SIZE_LIMIT, - DEFAULT_CHAIN_DEEPNESS_THRESHOLD, - }, + Announce, Digest, HashOf, + consensus::BatchCommitmentValidationRequest, gear::{ ChainCommitment, CodeCommitment, RewardsCommitment, StateTransition, ValidatorsCommitment, }, @@ -30,23 +27,24 @@ use ethexe_common::{ use ethexe_ethereum::abi::Gear; use gprimitives::CodeId; -/// Batch building limits. +/// Batch building configuration parameters. #[derive(Debug, Clone)] -pub struct BatchLimits { +pub struct BatchManagerConfig { /// Minimum deepness threshold to create chain commitment even if there are no transitions. pub chain_deepness_threshold: u32, - /// Time limit in blocks for announce to be committed after its creation. + /// Limit in blocks for announce to be committed after its creation. pub commitment_delay_limit: u32, /// The maximum size of abi encoded [`ethexe_common::gear::BatchCommitment`]. pub batch_size_limit: u64, } -impl Default for BatchLimits { +#[cfg(test)] +impl Default for BatchManagerConfig { fn default() -> Self { - BatchLimits { - chain_deepness_threshold: DEFAULT_CHAIN_DEEPNESS_THRESHOLD, - commitment_delay_limit: COMMITMENT_DELAY_LIMIT, - batch_size_limit: DEFAULT_BATCH_SIZE_LIMIT, + BatchManagerConfig { + chain_deepness_threshold: ethexe_common::consensus::DEFAULT_CHAIN_DEEPNESS_THRESHOLD, + commitment_delay_limit: ethexe_common::COMMITMENT_DELAY_LIMIT, + batch_size_limit: ethexe_common::consensus::DEFAULT_BATCH_SIZE_LIMIT, } } } @@ -72,22 +70,18 @@ impl BatchSizeCounter { Self(max_size) } - pub fn charge_for_validators_commitment( - &mut self, - commitment: &Option, - ) -> bool { - self.charge_optional::(commitment.clone()) + pub fn charge_for_validators_commitment(&mut self, commitment: &ValidatorsCommitment) -> bool { + self.charge_optional::(Some( + commitment.clone(), + )) } - pub fn charge_for_rewards_commitment( - &mut self, - commitment: &Option, - ) -> bool { - self.charge_optional::<_, Gear::RewardsCommitment>(commitment.clone()) + pub fn charge_for_rewards_commitment(&mut self, commitment: &RewardsCommitment) -> bool { + self.charge_optional::<_, Gear::RewardsCommitment>(Some(commitment.clone())) } - pub fn charge_for_chain_commitment(&mut self, commitment: &Option) -> bool { - self.charge_optional::<_, Gear::ChainCommitment>(commitment.clone()) + pub fn charge_for_chain_commitment(&mut self, commitment: &ChainCommitment) -> bool { + self.charge_optional::<_, Gear::ChainCommitment>(Some(commitment.clone())) } /// Charges only for appended transitions after the chain commitment header @@ -162,11 +156,11 @@ pub enum ValidationRejectReason { #[display("batch commitment is empty")] EmptyBatch, #[display("batch commitment request contains duplicate code ids")] - CodesHasDuplicates, + CodesHaveDuplicates, #[display("code id {_0} is not waiting for commitment")] CodeNotWaitingForCommitment(CodeId), - #[display("code id {_0} is not processed yet")] - CodeIsNotProcessedYet(CodeId), + #[display("some codes are not processed yet")] + SomeCodesNotProcessed, #[display("requested head announce {requested} is not the best announce {best}")] HeadAnnounceIsNotFromBestChain { requested: HashOf, @@ -174,8 +168,6 @@ pub enum ValidationRejectReason { }, #[display("requested head announce {_0} is not computed by this node")] HeadAnnounceNotComputed(HashOf), - #[display("cannot collect not committed predecessors for best announce {_0}")] - BestHeadAnnounceChainInvalid(HashOf), #[display( "received batch contains validators commitment, but it's not time for validators election yet" )] @@ -189,7 +181,3 @@ pub enum ValidationRejectReason { #[display("batch size exceeded the maximum size limit")] BatchSizeLimitExceeded, } - -#[derive(Debug, derive_more::Display, Clone, Copy, PartialEq, Eq)] -#[display("Code not found: {_0}")] -pub struct CodeNotValidatedError(pub CodeId); diff --git a/ethexe/consensus/src/validator/batch/utils.rs b/ethexe/consensus/src/validator/batch/utils.rs index 6bde5fb36d7..e6cd71b66bf 100644 --- a/ethexe/consensus/src/validator/batch/utils.rs +++ b/ethexe/consensus/src/validator/batch/utils.rs @@ -16,11 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::validator::batch::{filler::BatchFiller, types::BatchParts}; - -use super::types::CodeNotValidatedError; - -use anyhow::{Result, anyhow, bail}; +use super::{filler::BatchFiller, types::BatchParts}; +use anyhow::{Context, Result, anyhow}; use ethexe_common::{ Announce, HashOf, SimpleBlockData, db::{AnnounceStorageRO, BlockMetaStorageRO, CodesStorageRO, OnChainStorageRO}, @@ -29,32 +26,26 @@ use ethexe_common::{ }, }; use gprimitives::{ActorId, CodeId, H256}; -use std::collections::{HashMap, hash_map::Entry}; +use std::collections::{HashMap, VecDeque, hash_map::Entry}; -pub fn collect_not_committed_predecessors( +/// Walk parent pointers from `head_announce_hash` down to (but excluding) +/// `last_committed_announce_hash`, returning the traversed announces in +/// chronological order — oldest first, head last. +pub fn collect_pending_announces( db: &DB, + head_announce_hash: HashOf, last_committed_announce_hash: HashOf, - announce_hash: HashOf, -) -> Result>> { - let mut announces = Vec::new(); - let mut current_announce = announce_hash; - - // Maybe remove this loop to prevent infinite searching - while current_announce != last_committed_announce_hash { - if !db.announce_meta(current_announce).computed { - // All announces till last committed must be computed. - // Even fast-sync guarantees that. - bail!("Not computed announce in chain {current_announce:?}") - } - - announces.push(current_announce); - current_announce = db - .announce(current_announce) - .ok_or_else(|| anyhow!("Computed announce {current_announce:?} body not found in db"))? +) -> Result>> { + let mut announces = VecDeque::new(); + let mut current = head_announce_hash; + while current != last_committed_announce_hash { + announces.push_front(current); + current = db + .announce(current) + .with_context(|| format!("Announce {current:?} not found in db"))? .parent; } - - Ok(announces.into_iter().rev().collect()) + Ok(announces) } pub fn create_batch_commitment( @@ -71,21 +62,25 @@ pub fn create_batch_commitment( db: &DB, codes: impl IntoIterator, - fail_if_not_found: bool, -) -> Result, CodeNotValidatedError> { - let mut commitments = Vec::new(); - - for id in codes { - match db.code_valid(id) { - Some(valid) => commitments.push(CodeCommitment { id, valid }), - None if fail_if_not_found => return Err(CodeNotValidatedError(id)), - None => {} +) -> Vec { + codes + .into_iter() + .filter_map(|id| db.code_valid(id).map(|valid| CodeCommitment { id, valid })) + .collect() +} + +/// Aggregate code commitments for the codes queue of the given block and push +/// them into the batch filler. Codes that are still not validated are skipped; +/// the remaining ones are included in the order they appear in the queue, until +/// the filler rejects further additions (e.g. due to size limits). +pub fn aggregate_code_commitments_for_block( + db: &DB, + block_hash: H256, + batch_filler: &mut BatchFiller, +) -> Result<()> { + let queue = db + .block_meta(block_hash) + .codes_queue + .with_context(|| format!("Computed block {block_hash} codes queue is not in storage"))?; + + for commitment in aggregate_code_commitments(db, queue) { + if let Err(err) = batch_filler.include_code_commitment(commitment) { + tracing::trace!( + "filler rejects code commitment: {err}, stop including more code commitments" + ); + break; } } - Ok(commitments) + Ok(()) } -pub fn try_include_chain_commitment( +/// Aggregate chain commitment for the given head announce and its ancestors until last committed announce. +/// Returns the amount of announces which are waiting for commitment in the chain (including head announce). +pub fn aggregate_chain_commitment( db: &DB, at_block: H256, head_announce_hash: HashOf, batch_filler: &mut BatchFiller, -) -> Result<(HashOf, u32)> { +) -> Result { + debug_assert_eq!( + db.announce(head_announce_hash) + .expect("Announce not found") + .block_hash, + at_block, + "Head announce is not from `at_block`" + ); + if !db.announce_meta(head_announce_hash).computed { anyhow::bail!( "Head announce {head_announce_hash:?} is not computed, cannot aggregate chain commitment" @@ -139,35 +162,33 @@ pub fn try_include_chain_commitment( } let Some(last_committed_announce) = db.block_meta(at_block).last_committed_announce else { - anyhow::bail!("Last committed announce not found in db for prepared block: {at_block}",); + anyhow::bail!("Last committed announce not found in db for prepared block: {at_block}"); }; - let pending = super::utils::collect_not_committed_predecessors( - &db, - last_committed_announce, - head_announce_hash, - )?; + let pending_announces = + collect_pending_announces(db, head_announce_hash, last_committed_announce)?; - let final_announce = pending.last().copied().unwrap_or(head_announce_hash); - let max_depth = pending.len() as u32; + let pending_announces_amount = pending_announces.len(); - for (depth, announce_hash) in pending.into_iter().enumerate() { + for announce_hash in pending_announces { let Some(transitions) = db.announce_outcome(announce_hash) else { anyhow::bail!("Computed announce {announce_hash:?} outcome not found in db"); }; + let commitment = ChainCommitment { head_announce: announce_hash, transitions, }; - if let Err(err) = batch_filler.include_chain_commitment(commitment, depth as u32) { + if let Err(err) = batch_filler.include_chain_commitment(commitment) { tracing::trace!( - "failed to include chain commitment for announce({announce_hash}) because of error={err}" + "filler rejects announce {announce_hash}: {err}, stop including more announces" ); - return Ok((announce_hash, depth as u32)); + break; } } - Ok((final_announce, max_depth)) + + Ok(pending_announces_amount) } pub fn calculate_batch_expiry( @@ -396,22 +417,36 @@ mod tests { use super::*; use crate::{ mock::*, - validator::batch::{BatchLimits, filler::BatchFiller}, - }; - use ethexe_common::{ - COMMITMENT_DELAY_LIMIT, DEFAULT_BLOCK_GAS_LIMIT, - consensus::DEFAULT_CHAIN_DEEPNESS_THRESHOLD, db::*, mock::*, + validator::batch::{BatchManagerConfig, filler::BatchFiller}, }; + use ethexe_common::{db::*, mock::*}; use ethexe_db::Database; - const BATCH_LIMITS: BatchLimits = BatchLimits { - chain_deepness_threshold: DEFAULT_CHAIN_DEEPNESS_THRESHOLD, - commitment_delay_limit: COMMITMENT_DELAY_LIMIT, - batch_size_limit: DEFAULT_BLOCK_GAS_LIMIT, - }; + #[test] + fn test_collect_pending_announces() { + let db = Database::memory(); + let chain = test_block_chain(5).setup(&db); + + let head = chain.block_top_announce_hash(5); + let last_committed = chain.block_top_announce_hash(2); + + let pending = collect_pending_announces(&db, head, last_committed).unwrap(); + + // Chronological order: oldest first, head last — last_committed itself is excluded. + assert_eq!( + pending.into_iter().collect::>(), + vec![ + chain.block_top_announce_hash(3), + chain.block_top_announce_hash(4), + chain.block_top_announce_hash(5), + ], + ); + } #[test] fn test_aggregate_chain_commitment() { + let batch_size_limit = BatchManagerConfig::default().batch_size_limit; + { // Valid case, two transitions in the chain, but only one must be included let db = Database::memory(); @@ -432,21 +467,16 @@ mod tests { }) .setup(&db); let block = chain.blocks[10].to_simple(); - let head_announce_hash = chain.block_top_announce_hash(9); - - let mut batch_filler = BatchFiller::new(BATCH_LIMITS); - let (_, deepness) = try_include_chain_commitment( - &db, - block.hash, - head_announce_hash, - &mut batch_filler, - ) - .unwrap(); + let head_announce_hash = chain.block_top_announce_hash(10); + let mut batch_filler = BatchFiller::new(batch_size_limit); + let deepness = + aggregate_chain_commitment(&db, block.hash, head_announce_hash, &mut batch_filler) + .unwrap(); let commitment = batch_filler.into_parts().chain_commitment.unwrap(); assert_eq!(commitment.head_announce, head_announce_hash); assert_eq!(commitment.transitions.len(), 1); - assert_eq!(deepness, 6); + assert_eq!(deepness, 7); } { @@ -457,9 +487,9 @@ mod tests { .setup(&db); let block = chain.blocks[3].to_simple(); let head_announce_hash = chain.block_top_announce_hash(3); - let mut batch_filler = BatchFiller::new(BATCH_LIMITS); + let mut batch_filler = BatchFiller::new(batch_size_limit); - try_include_chain_commitment(&db, block.hash, head_announce_hash, &mut batch_filler) + aggregate_chain_commitment(&db, block.hash, head_announce_hash, &mut batch_filler) .unwrap_err(); } @@ -472,22 +502,23 @@ mod tests { let block = chain.blocks[3].to_simple(); let head_announce_hash = chain.block_top_announce_hash(3); - let mut batch_filler = BatchFiller::new(BATCH_LIMITS); - try_include_chain_commitment(&db, block.hash, head_announce_hash, &mut batch_filler) + let mut batch_filler = BatchFiller::new(batch_size_limit); + aggregate_chain_commitment(&db, block.hash, head_announce_hash, &mut batch_filler) .unwrap_err(); } { // last committed announce missing in block meta let db = Database::memory(); - let chain = test_block_chain(3) - .tap_mut(|chain| chain.blocks[3].prepared = None) - .setup(&db); + let chain = test_block_chain(3).setup(&db); + db.mutate_block_meta(chain.blocks[3].hash, |meta| { + meta.last_committed_announce = None + }); let block = chain.blocks[3].to_simple(); - let head_announce_hash = chain.block_top_announce_hash(2); + let head_announce_hash = chain.block_top_announce_hash(3); - let mut batch_filler = BatchFiller::new(BATCH_LIMITS); - try_include_chain_commitment(&db, block.hash, head_announce_hash, &mut batch_filler) + let mut batch_filler = BatchFiller::new(batch_size_limit); + aggregate_chain_commitment(&db, block.hash, head_announce_hash, &mut batch_filler) .unwrap_err(); } } @@ -501,39 +532,23 @@ mod tests { db.set_code_valid(codes[0], true); db.set_code_valid(codes[1], false); - let commitments = aggregate_code_commitments(&db, codes.clone(), false).unwrap(); - assert_eq!( - commitments, - vec![ - CodeCommitment { - id: codes[0], - valid: true, - }, - CodeCommitment { - id: codes[1], - valid: false, - } - ] - ); + let expected = vec![ + CodeCommitment { + id: codes[0], + valid: true, + }, + CodeCommitment { + id: codes[1], + valid: false, + }, + ]; - let commitments = - aggregate_code_commitments(&db, vec![codes[0], CodeId::from([3; 32]), codes[1]], false) - .unwrap(); - assert_eq!( - commitments, - vec![ - CodeCommitment { - id: codes[0], - valid: true, - }, - CodeCommitment { - id: codes[1], - valid: false, - } - ] - ); + let commitments = aggregate_code_commitments(&db, codes.clone()); + assert_eq!(commitments, expected); - aggregate_code_commitments(&db, vec![CodeId::from([3; 32])], true).unwrap_err(); + let invalid_code = CodeId::from([3; 32]); + let commitments = aggregate_code_commitments(&db, vec![codes[0], invalid_code, codes[1]]); + assert_eq!(commitments, expected); } #[test] @@ -1035,6 +1050,60 @@ mod tests { assert!(!squashed[0].value_to_receive_negative_sign); } + #[test] + fn test_aggregate_code_commitments_for_block() { + let db = Database::memory(); + let batch_size_limit = BatchManagerConfig::default().batch_size_limit; + + let valid_code = CodeId::from([1; 32]); + let invalid_code = CodeId::from([2; 32]); + let pending_code = CodeId::from([3; 32]); + + db.set_code_valid(valid_code, true); + db.set_code_valid(invalid_code, false); + // pending_code intentionally has no `code_valid` entry. + + let chain = test_block_chain(1) + .tap_mut(|chain| { + let queue = &mut chain.blocks[1].as_prepared_mut().codes_queue; + queue.push_back(valid_code); + queue.push_back(pending_code); + queue.push_back(invalid_code); + }) + .setup(&db); + let block = chain.blocks[1].to_simple(); + + let mut batch_filler = BatchFiller::new(batch_size_limit); + aggregate_code_commitments_for_block(&db, block.hash, &mut batch_filler).unwrap(); + + let parts = batch_filler.into_parts(); + assert_eq!( + parts.code_commitments, + vec![ + CodeCommitment { + id: valid_code, + valid: true, + }, + CodeCommitment { + id: invalid_code, + valid: false, + }, + ], + "pending code with no `code_valid` entry must be skipped", + ); + } + + #[test] + fn test_aggregate_code_commitments_for_block_missing_queue() { + let db = Database::memory(); + let chain = test_block_chain(1).setup(&db); + db.mutate_block_meta(chain.blocks[1].hash, |meta| meta.codes_queue = None); + + let mut batch_filler = BatchFiller::new(BatchManagerConfig::default().batch_size_limit); + aggregate_code_commitments_for_block(&db, chain.blocks[1].hash, &mut batch_filler) + .unwrap_err(); + } + #[test] fn test_squash_exact_value_cancellation() { let actor = ActorId::from([0xAC; 32]); diff --git a/ethexe/consensus/src/validator/mock.rs b/ethexe/consensus/src/validator/mock.rs index b12b7114bf7..0e4a4d0226b 100644 --- a/ethexe/consensus/src/validator/mock.rs +++ b/ethexe/consensus/src/validator/mock.rs @@ -148,9 +148,9 @@ pub fn mock_validator_context(db: Database) -> (ValidatorContext, Vec let ethereum = MockEthereum::default(); let timelines = crate::mock::test_protocol_timelines_with_slot(1); - let limits = BatchLimits::default(); + let config = BatchManagerConfig::default(); let middleware = MiddlewareWrapper::from_inner(ethereum.clone()); - let batch_manager = BatchCommitmentManager::new(limits, db.clone(), middleware); + let batch_manager = BatchCommitmentManager::new(db.clone(), middleware, config); let ctx = ValidatorContext { core: ValidatorCore { diff --git a/ethexe/consensus/src/validator/mod.rs b/ethexe/consensus/src/validator/mod.rs index 5385040906d..c2e4f3bc173 100644 --- a/ethexe/consensus/src/validator/mod.rs +++ b/ethexe/consensus/src/validator/mod.rs @@ -42,7 +42,7 @@ use crate::{ BatchCommitmentValidationReply, ConsensusEvent, ConsensusService, validator::{ - batch::{BatchCommitmentManager, BatchLimits}, + batch::{BatchCommitmentManager, BatchManagerConfig}, coordinator::Coordinator, core::{MiddlewareWrapper, ValidatorCore}, participant::Participant, @@ -136,14 +136,15 @@ impl ValidatorService { config: ValidatorConfig, ) -> Result { let timelines = db.config().timelines; - let limits = BatchLimits { + let batch_manager_config = BatchManagerConfig { chain_deepness_threshold: config.chain_deepness_threshold, commitment_delay_limit: config.commitment_delay_limit, batch_size_limit: config.batch_size_limit, }; let middleware = MiddlewareWrapper::from_inner(election_provider); - let batch_manager = BatchCommitmentManager::new(limits, db.clone(), middleware); + let batch_manager = + BatchCommitmentManager::new(db.clone(), middleware, batch_manager_config); let ctx = ValidatorContext { core: ValidatorCore { diff --git a/ethexe/consensus/src/validator/participant.rs b/ethexe/consensus/src/validator/participant.rs index e31fc5b1c1a..f93495da135 100644 --- a/ethexe/consensus/src/validator/participant.rs +++ b/ethexe/consensus/src/validator/participant.rs @@ -328,7 +328,7 @@ mod tests { } #[tokio::test] - async fn process_validation_request_failure() { + async fn process_validation_request_reject_not_computed() { let (ctx, pub_keys, _) = mock_validator_context(ethexe_db::Database::memory()); let producer = pub_keys[0]; let block = test_simple_block_data(2); @@ -341,12 +341,13 @@ mod tests { let state = Participant::create(ctx, block, producer.to_address()).unwrap(); assert!(state.is_participant()); - state + let (_, event) = state .process_validation_request(verified_request) .unwrap() .wait_for_event() .await - .expect_err("database is empty - must fail"); + .unwrap(); + assert!(matches!(event, ConsensusEvent::Warning(_))); } #[tokio::test] diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 872df3207ca..598a85d7bc4 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -144,7 +144,7 @@ impl StateHandler for Producer { .map(|s| (Poll::Ready(()), s)); } Poll::Ready(Ok(None)) => { - tracing::info!("No commitments - skip batch commitment"); + tracing::debug!(block_hash = %self.block.hash, "No commitments, skip batch commitment"); return Initial::create(self.ctx).map(|s| (Poll::Ready(()), s)); } Poll::Ready(Err(err)) => { @@ -259,7 +259,9 @@ mod tests { validator::{PendingEvent, mock::*}, }; use async_trait::async_trait; - use ethexe_common::{HashOf, consensus::BatchCommitmentValidationRequest, db::*, mock::*}; + use ethexe_common::{ + HashOf, consensus::BatchCommitmentValidationRequest, db::*, gear::ChainCommitment, mock::*, + }; use futures::StreamExt; use nonempty::nonempty; @@ -461,7 +463,13 @@ mod tests { .clone() .expect("Expected that batch is committed"); assert_eq!(signatures.len(), 1); - assert_eq!(batch.chain_commitment, None); + assert_eq!( + batch.chain_commitment, + Some(ChainCommitment { + head_announce: announce_hash, + transitions: vec![], + }) + ); assert_eq!(batch.code_commitments.len(), 2); } diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index 7263ad03e99..aa744dee4b4 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -2072,21 +2072,17 @@ async fn validators_election() { }; let mut env = TestEnv::new(env_config).await.unwrap(); - let genesis_block_hash = env + // Start connect node, to be able to request missed announces (for next validators). + let mut connect_node = env.new_node(NodeConfig::named("connect")).await; + connect_node.start_service().await; + + let genesis_ts = env .ethereum .router() .query() - .genesis_block_hash() + .genesis_timestamp() .await .unwrap(); - let genesis_ts = env - .provider - .get_block_by_hash(genesis_block_hash.0.into()) - .await - .unwrap() - .unwrap() - .header - .timestamp; // Start initial validators let mut validators = vec![]; @@ -2165,6 +2161,7 @@ async fn validators_election() { env.validators = next_validators_configs; let mut new_validators = vec![]; for (i, v) in env.validators.clone().into_iter().enumerate() { + let i = i + 5; log::info!("📗 Starting validator-{i}"); let mut validator = env .new_node(NodeConfig::named(format!("validator-{i}")).validator(v)) @@ -4087,3 +4084,102 @@ async fn re_genesis_delayed_message() { }, ); } + +/// Regression test for the skip-commit branch in +/// `BatchCommitmentManager::create_batch_commitment` +/// (`ethexe/consensus/src/validator/batch/manager.rs`, lines ~117-136). +/// +/// When every batch part is empty and the chain of uncommitted announces has +/// not reached `chain_deepness_threshold`, the producer must drop the +/// chain commitment instead of submitting an empty batch on-chain. +#[tokio::test] +#[ntest::timeout(60_000)] +async fn no_batch_commit_for_short_empty_chain() { + init_logger(); + + // Threshold is large enough that 10 empty blocks cannot cross it. + let mut env = TestEnv::new(TestEnvConfig { + chain_deepness_threshold: 50, + ..Default::default() + }) + .await + .unwrap(); + + let mut node = env + .new_node(NodeConfig::default().validator(env.validators[0])) + .await; + node.start_service().await; + + // Subscribe for observer events after start_service to ignore any + // bootstrap noise. + let receiver = env.new_observer_events(); + + // Mine a batch of empty blocks. No user activity, no code uploads, no + // validator/rewards commitments — every producer turn's batch is empty. + env.skip_blocks(10).await; + + // Give the validator enough real time to process every mined block, then + // assert it never submitted a batch commitment during the window. + let result = tokio::time::timeout( + std::time::Duration::from_secs(10), + receiver + .filter_map_block_synced() + .find(|e| matches!(e, BlockEvent::Router(RouterEvent::BatchCommitted { .. }))), + ) + .await; + + assert!( + result.is_err(), + "Expected no BatchCommitted event for a short empty chain, got {result:?}" + ); +} + +/// Regression test for the inverse of the skip-commit branch in +/// `BatchCommitmentManager::create_batch_commitment` +/// (`ethexe/consensus/src/validator/batch/manager.rs`, lines ~117-136). +/// +/// Once the chain of uncommitted announces exceeds `chain_deepness_threshold`, +/// the producer must submit a chain commitment even with no transitions, so +/// the chain head does not lag arbitrarily behind. +#[tokio::test] +#[ntest::timeout(60_000)] +async fn batch_commit_for_deep_empty_chain() { + init_logger(); + + // Threshold small enough that a handful of empty blocks exceeds it. + let mut env = TestEnv::new(TestEnvConfig { + chain_deepness_threshold: 3, + ..Default::default() + }) + .await + .unwrap(); + + let mut node = env + .new_node(NodeConfig::default().validator(env.validators[0])) + .await; + node.start_service().await; + + let receiver = env.new_observer_events(); + + // Mine enough empty blocks to cross the threshold. With no user activity + // every announce is a base one, so the batch contains only a chain + // commitment with empty transitions. + env.skip_blocks(10).await; + + let committed = tokio::time::timeout( + std::time::Duration::from_secs(30), + receiver + .filter_map_block_synced() + .find(|e| matches!(e, BlockEvent::Router(RouterEvent::BatchCommitted { .. }))), + ) + .await + .expect("timed out waiting for BatchCommitted event"); + + assert!( + matches!( + committed, + BlockEvent::Router(RouterEvent::BatchCommitted { .. }) + ), + "Expected a BatchCommitted event for a deep empty chain, got {committed:?}" + ); +} diff --git a/ethexe/service/src/tests/utils/env.rs b/ethexe/service/src/tests/utils/env.rs index 020d8a50b36..021ea335994 100644 --- a/ethexe/service/src/tests/utils/env.rs +++ b/ethexe/service/src/tests/utils/env.rs @@ -101,6 +101,7 @@ pub struct TestEnv { pub block_time: Duration, pub continuous_block_generation: bool, pub commitment_delay_limit: u32, + pub chain_deepness_threshold: u32, pub compute_config: ComputeConfig, pub db: Database, @@ -127,6 +128,7 @@ impl TestEnv { network, deploy_params, commitment_delay_limit, + chain_deepness_threshold, compute_config, } = config; @@ -369,6 +371,7 @@ impl TestEnv { block_time, continuous_block_generation, commitment_delay_limit, + chain_deepness_threshold, compute_config, router_query, observer_events, @@ -432,6 +435,7 @@ impl TestEnv { fast_sync, compute_config: self.compute_config, commitment_delay_limit: self.commitment_delay_limit, + chain_deepness_threshold: self.chain_deepness_threshold, running_service_handle: None, } } @@ -789,6 +793,8 @@ pub struct TestEnvConfig { pub deploy_params: ContractsDeploymentParams, /// Commitment delay limit in blocks. pub commitment_delay_limit: u32, + /// Minimum deepness threshold to commit chain even if there are no transitions. + pub chain_deepness_threshold: u32, /// Compute service configuration pub compute_config: ComputeConfig, } @@ -812,6 +818,7 @@ impl Default for TestEnvConfig { network: EnvNetworkConfig::Disabled, deploy_params: Default::default(), commitment_delay_limit: COMMITMENT_DELAY_LIMIT, + chain_deepness_threshold: DEFAULT_CHAIN_DEEPNESS_THRESHOLD, compute_config: ComputeConfig::without_quarantine(), } } @@ -940,6 +947,7 @@ pub struct Node { fast_sync: bool, compute_config: ComputeConfig, commitment_delay_limit: u32, + chain_deepness_threshold: u32, running_service_handle: Option>, } @@ -1008,7 +1016,7 @@ impl Node { commitment_delay_limit: self.commitment_delay_limit, producer_delay: self.block_time / 6, router_address: self.eth_cfg.router_address, - chain_deepness_threshold: DEFAULT_CHAIN_DEEPNESS_THRESHOLD, + chain_deepness_threshold: self.chain_deepness_threshold, batch_size_limit: DEFAULT_BATCH_SIZE_LIMIT, }, )