From deacd221befb1878ae704cb41af0f5d2905fdb9b Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 12:38:54 +0400 Subject: [PATCH 01/11] feat(ethexe-consensus): two-phase compute for instant injected TX promises MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split announce production into two phases so TXs targeting programs created in the current block are validated against post-canonical ProgramStates instead of stale parent-block states. This eliminates the ~12s wait for same-block program TXs. Phase 1: canonical-only compute (no announce metadata writes) returns ephemeral ProgramStates. Phase 2: TX selection against those states, then build and gossip a single announce. Key changes: - New ConsensusEvent::ComputeCanonicalEvents / ComputeEvent::CanonicalEventsComputed - ComputeSubService::compute_canonical_only (assert parent computed, skip DB metadata) - TxValidityChecker::new_with_states + InjectedTxPool::select_for_announce_with_states - Producer: Delay → WaitingCanonicalComputed → ReadyForTxCollection → WaitingAnnounceComputed - accept_announce lenient for state-dependent TX validations (UnknownDestination, UninitializedDestination, InsufficientBalance) to prevent consensus split - Subordinate gossip-reordering fix (defer announces with unknown parent) Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/compute/src/compute.rs | 83 +++++++++++- ethexe/compute/src/lib.rs | 5 +- ethexe/compute/src/service.rs | 14 +- ethexe/consensus/src/announces.rs | 13 +- ethexe/consensus/src/connect/mod.rs | 11 +- ethexe/consensus/src/lib.rs | 13 +- ethexe/consensus/src/tx_validation.rs | 18 +++ ethexe/consensus/src/validator/mod.rs | 40 +++++- ethexe/consensus/src/validator/producer.rs | 123 ++++++++++++++++-- ethexe/consensus/src/validator/subordinate.rs | 34 +++-- ethexe/consensus/src/validator/tx_pool.rs | 27 +++- ethexe/service/src/lib.rs | 15 +++ 12 files changed, 365 insertions(+), 31 deletions(-) diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index cb665ec9e99..8fe692007bb 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -18,7 +18,7 @@ use crate::{ComputeError, ComputeEvent, ProcessorExt, Result, service::SubService}; use ethexe_common::{ - Announce, HashOf, PromisePolicy, SimpleBlockData, + Announce, HashOf, ProgramStates, PromisePolicy, SimpleBlockData, db::{ AnnounceStorageRO, AnnounceStorageRW, BlockMetaStorageRO, CodesStorageRW, ConfigStorageRO, GlobalsStorageRW, OnChainStorageRO, @@ -87,6 +87,11 @@ pub struct ComputeSubService { computation: Option, promises_stream: Option, pending_event: Option>, + + /// Input for canonical-only computation (block_hash, parent_announce, gas_allowance). + canonical_input: Option<(H256, HashOf, u64)>, + /// Active canonical-only computation future. + canonical_computation: Option>>, } impl ComputeSubService

{ @@ -100,6 +105,8 @@ impl ComputeSubService

{ computation: None, promises_stream: None, pending_event: None, + canonical_input: None, + canonical_computation: None, } } @@ -111,6 +118,17 @@ impl ComputeSubService

{ self.input.push_back((announce, promise_policy)); } + /// Request canonical-only computation. Cancels any stale canonical computation. + pub fn receive_canonical_to_compute( + &mut self, + block_hash: H256, + parent_announce: HashOf, + gas_allowance: u64, + ) { + self.canonical_computation = None; + self.canonical_input = Some((block_hash, parent_announce, gas_allowance)); + } + async fn compute( db: Database, config: ComputeConfig, @@ -191,12 +209,75 @@ impl ComputeSubService

{ Ok(announce_hash) } + + /// Compute canonical events only, returning ProgramStates without announce metadata writes. + /// PRECONDITION: parent_announce must already be computed. + async fn compute_canonical_only( + db: Database, + config: ComputeConfig, + mut processor: P, + block_hash: H256, + parent_announce: HashOf, + gas_allowance: u64, + ) -> Result<(H256, ProgramStates)> { + if !db.block_meta(block_hash).prepared { + return Err(ComputeError::BlockNotPrepared(block_hash)); + } + + if !db.announce_meta(parent_announce).computed { + return Err(ComputeError::AnnounceNotFound(parent_announce)); + } + + // Build synthetic announce with empty TXs — never stored in DB + let synthetic = Announce { + block_hash, + parent: parent_announce, + gas_allowance: Some(gas_allowance), + injected_transactions: vec![], + }; + + // Run through processor. CAS/state blobs are written (idempotent), + // but we skip announce-level metadata writes. + let executable = + utils::prepare_executable_for_announce(&db, synthetic, config.canonical_quarantine())?; + let result = processor.process_programs(executable, None).await?; + + Ok((block_hash, result.states)) + } } impl SubService for ComputeSubService

{ type Output = ComputeEvent; fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + // Poll canonical-only computation (if active). + if let Some(ref mut computation) = self.canonical_computation + && let Poll::Ready(result) = computation.poll_unpin(cx) + { + self.canonical_computation = None; + return Poll::Ready(result.map(|(block_hash, program_states)| { + ComputeEvent::CanonicalEventsComputed(block_hash, program_states) + })); + } + + // Start new canonical computation if idle. + if self.canonical_computation.is_none() + && let Some((block_hash, parent_announce, gas_allowance)) = + self.canonical_input.take() + { + self.canonical_computation = Some( + Self::compute_canonical_only( + self.db.clone(), + self.config, + self.processor.clone(), + block_hash, + parent_announce, + gas_allowance, + ) + .boxed(), + ); + } + if self.computation.is_none() && self.promises_stream.is_none() && let Some((announce, promise_policy)) = self.input.pop_front() diff --git a/ethexe/compute/src/lib.rs b/ethexe/compute/src/lib.rs index a5c3b8618db..6f5a3f7b6d8 100644 --- a/ethexe/compute/src/lib.rs +++ b/ethexe/compute/src/lib.rs @@ -20,7 +20,7 @@ pub use compute::{ ComputeConfig, ComputeSubService, utils::{find_canonical_events_post_quarantine, prepare_executable_for_announce}, }; -use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, injected::Promise}; +use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, ProgramStates, injected::Promise}; use ethexe_processor::{ExecutableData, ProcessedCodeInfo, Processor, ProcessorError}; use ethexe_runtime_common::FinalizedBlockTransitions; use gprimitives::{CodeId, H256}; @@ -47,6 +47,9 @@ pub enum ComputeEvent { CodeProcessed(CodeId), BlockPrepared(H256), AnnounceComputed(HashOf), + /// Canonical events computed for a block. Contains (block_hash, program_states). + /// The program_states are ephemeral — not stored in DB as an announce. + CanonicalEventsComputed(H256, ProgramStates), Promise(Promise, HashOf), } diff --git a/ethexe/compute/src/service.rs b/ethexe/compute/src/service.rs index 5b96f0256a0..3a6143621c1 100644 --- a/ethexe/compute/src/service.rs +++ b/ethexe/compute/src/service.rs @@ -24,7 +24,7 @@ use crate::{ compute::{ComputeConfig, ComputeSubService}, prepare::PrepareSubService, }; -use ethexe_common::{Announce, CodeAndIdUnchecked, PromisePolicy}; +use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, PromisePolicy}; use ethexe_db::Database; use ethexe_processor::Processor; use futures::{Stream, stream::FusedStream}; @@ -87,6 +87,18 @@ impl ComputeService

{ self.compute_sub_service .receive_announce_to_compute(announce, promise_policy); } + + /// Request canonical-only computation for a block. + /// Returns ProgramStates without writing announce metadata to DB. + pub fn compute_canonical_events( + &mut self, + block_hash: H256, + parent_announce: HashOf, + gas_allowance: u64, + ) { + self.compute_sub_service + .receive_canonical_to_compute(block_hash, parent_announce, gas_allowance); + } } impl Stream for ComputeService

{ diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 337a086c95d..2ad509c61b3 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -723,14 +723,23 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result { + // Valid TX or state-dependent conditions that may resolve after canonical + // events are computed. The producer validates against post-canonical states + // (two-phase compute), but accept_announce validates against parent states. + // Programs created in the current block appear as UnknownDestination here + // but become valid after canonical execution. + TxValidity::Valid + | TxValidity::UnknownDestination + | TxValidity::UninitializedDestination + | TxValidity::InsufficientBalanceForInjectedMessages => { db.set_injected_transaction(tx.clone()); } + // Structural violations that cannot resolve after canonical execution. validity => { tracing::trace!( announce = ?announce.to_hash(), - "announce contains invalid transition with status {validity_status:?}, rejecting announce." + "announce contains structurally invalid tx: {validity_status:?}, rejecting announce." ); return Ok(AnnounceStatus::Rejected { diff --git a/ethexe/consensus/src/connect/mod.rs b/ethexe/consensus/src/connect/mod.rs index bb007c0e279..36437e3ccb4 100644 --- a/ethexe/consensus/src/connect/mod.rs +++ b/ethexe/consensus/src/connect/mod.rs @@ -26,7 +26,7 @@ use crate::{ }; use anyhow::{Result, anyhow}; use ethexe_common::{ - Address, Announce, HashOf, PromisePolicy, ProtocolTimelines, SimpleBlockData, + Address, Announce, HashOf, ProgramStates, PromisePolicy, ProtocolTimelines, SimpleBlockData, consensus::{VerifiedAnnounce, VerifiedValidationRequest}, db::{ConfigStorageRO, OnChainStorageRO}, injected::{Promise, SignedInjectedTransaction}, @@ -310,6 +310,15 @@ impl ConsensusService for ConnectService { Ok(()) } + fn receive_canonical_events_computed( + &mut self, + _block_hash: H256, + _program_states: ProgramStates, + ) -> Result<()> { + // Connect node does not produce announces, so canonical events are irrelevant. + Ok(()) + } + fn receive_announces_response(&mut self, response: AnnouncesResponse) -> Result<()> { let State::WaitingForMissingAnnounces { block, diff --git a/ethexe/consensus/src/lib.rs b/ethexe/consensus/src/lib.rs index c1b8ae43850..25e478f027d 100644 --- a/ethexe/consensus/src/lib.rs +++ b/ethexe/consensus/src/lib.rs @@ -34,7 +34,7 @@ use anyhow::Result; use ethexe_common::{ - Announce, Digest, HashOf, PromisePolicy, SimpleBlockData, + Announce, Digest, HashOf, ProgramStates, PromisePolicy, SimpleBlockData, consensus::{BatchCommitmentValidationReply, VerifiedAnnounce, VerifiedValidationRequest}, injected::{Promise, SignedInjectedTransaction, SignedPromise}, network::{AnnouncesRequest, AnnouncesResponse, SignedValidatorMessage}, @@ -93,6 +93,13 @@ pub trait ConsensusService: /// Process a received injected transaction from network fn receive_injected_transaction(&mut self, tx: SignedInjectedTransaction) -> Result<()>; + + /// Process computed canonical events (ephemeral ProgramStates for TX validation) + fn receive_canonical_events_computed( + &mut self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result<()>; } #[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)] @@ -127,6 +134,10 @@ pub enum ConsensusEvent { /// Informational event: commitment was successfully submitted #[from] CommitmentSubmitted(CommitmentSubmitted), + /// Outer service must compute canonical events only (no announce metadata writes). + /// CAS/state blob writes still occur during process_programs execution. + /// Contains (block_hash, parent_announce, gas_allowance). + ComputeCanonicalEvents(H256, HashOf, u64), /// Informational event: during service processing, a warning situation was detected Warning(String), } diff --git a/ethexe/consensus/src/tx_validation.rs b/ethexe/consensus/src/tx_validation.rs index 77ec477c7aa..802205e683e 100644 --- a/ethexe/consensus/src/tx_validation.rs +++ b/ethexe/consensus/src/tx_validation.rs @@ -95,6 +95,24 @@ impl TxVa }) } + /// Create checker with externally-provided ProgramStates (from canonical compute). + /// `parent_announce` is used only for duplicate TX detection, not for state lookup. + pub fn new_with_states( + db: DB, + chain_head: SimpleBlockData, + parent_announce: HashOf, + program_states: ProgramStates, + ) -> Result { + let start_block_hash = db.globals().start_block_hash; + Ok(Self { + recent_included_txs: Self::collect_recent_included_txs(&db, parent_announce)?, + latest_states: program_states, + db, + chain_head, + start_block_hash, + }) + } + /// Determine [`TxValidity`] status for injected transaction, based on current: /// - `chain_head` - Ethereum chain header /// - `latest_included_transactions` - see [`Self::collect_recent_included_txs`]. diff --git a/ethexe/consensus/src/validator/mod.rs b/ethexe/consensus/src/validator/mod.rs index 5385040906d..33b8dfff5a6 100644 --- a/ethexe/consensus/src/validator/mod.rs +++ b/ethexe/consensus/src/validator/mod.rs @@ -55,7 +55,7 @@ use anyhow::Result; pub use core::BatchCommitter; use derive_more::{Debug, From}; use ethexe_common::{ - Address, Announce, HashOf, SimpleBlockData, + Address, Announce, HashOf, ProgramStates, SimpleBlockData, consensus::{VerifiedAnnounce, VerifiedValidationRequest}, db::ConfigStorageRO, ecdsa::PublicKey, @@ -249,6 +249,16 @@ impl ConsensusService for ValidatorService { fn receive_injected_transaction(&mut self, tx: SignedInjectedTransaction) -> Result<()> { self.update_inner(|inner| inner.process_injected_transaction(tx)) } + + fn receive_canonical_events_computed( + &mut self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result<()> { + self.update_inner(|inner| { + inner.process_canonical_events_computed(block_hash, program_states) + }) + } } impl Stream for ValidatorService { @@ -368,6 +378,14 @@ where DefaultProcessing::injected_transaction(self, tx) } + fn process_canonical_events_computed( + self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result { + DefaultProcessing::canonical_events_computed(self, block_hash, program_states) + } + fn poll_next_state(self, _cx: &mut Context<'_>) -> Result<(Poll<()>, ValidatorState)> { Ok((Poll::Pending, self.into())) } @@ -467,6 +485,14 @@ impl StateHandler for ValidatorState { fn process_injected_transaction(self, tx: SignedInjectedTransaction) -> Result { delegate_call!(self => process_injected_transaction(tx)) } + + fn process_canonical_events_computed( + self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result { + delegate_call!(self => process_canonical_events_computed(block_hash, program_states)) + } } struct DefaultProcessing; @@ -497,6 +523,18 @@ impl DefaultProcessing { Ok(s) } + fn canonical_events_computed( + s: impl Into, + block_hash: H256, + _program_states: ProgramStates, + ) -> Result { + let mut s = s.into(); + s.warning(format!( + "unexpected canonical events computed for block: {block_hash}" + )); + Ok(s) + } + fn promise_for_signing( s: impl Into, promise: Promise, diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 8639d80084b..78cc67b05f1 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -27,11 +27,13 @@ use crate::{ use anyhow::{Result, anyhow}; use derive_more::{Debug, Display}; use ethexe_common::{ - Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, db::BlockMetaStorageRO, - gear::BatchCommitment, injected::Promise, network::ValidatorMessage, + Announce, HashOf, ProgramStates, PromisePolicy, SimpleBlockData, ValidatorsVec, + db::BlockMetaStorageRO, gear::BatchCommitment, injected::Promise, + network::ValidatorMessage, }; use ethexe_service_utils::Timer; use futures::{FutureExt, future::BoxFuture}; +use gprimitives::H256; use gsigner::secp256k1::Secp256k1SignerExt; use std::task::{Context, Poll}; @@ -53,6 +55,19 @@ enum State { #[debug(skip)] timer: Option, }, + /// Waiting for canonical-only compute to return ProgramStates. + WaitingCanonicalComputed { + parent_announce: HashOf, + }, + /// Collecting TXs against post-canonical ProgramStates. + /// Poll timer gives TXs time to arrive before building the announce. + ReadyForTxCollection { + parent_announce: HashOf, + #[debug(skip)] + program_states: ProgramStates, + #[debug(skip)] + poll_timer: Timer, + }, WaitingAnnounceComputed(HashOf), AggregateBatchCommitment { #[debug(skip)] @@ -129,6 +144,35 @@ impl StateHandler for Producer { } } + fn process_canonical_events_computed( + mut self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result { + match &self.state { + State::WaitingCanonicalComputed { parent_announce } + if block_hash == self.block.hash => + { + let parent = *parent_announce; + + // Enter TX collection window. The poll timer gives TXs + // time to arrive before building the announce. + let mut poll_timer = + Timer::new("tx-collection poll", self.ctx.core.producer_delay); + poll_timer.start(()); + + self.state = State::ReadyForTxCollection { + parent_announce: parent, + program_states, + poll_timer, + }; + + Ok(self.into()) + } + _ => DefaultProcessing::canonical_events_computed(self, block_hash, program_states), + } + } + fn poll_next_state(mut self, cx: &mut Context<'_>) -> Result<(Poll<()>, ValidatorState)> { match &mut self.state { State::Delay { timer: Some(timer) } => { @@ -137,6 +181,27 @@ impl StateHandler for Producer { return Ok((Poll::Ready(()), state)); } } + State::ReadyForTxCollection { poll_timer, .. } => { + if poll_timer.poll_unpin(cx).is_ready() { + // Timer fired — collect TXs and build announce. + let State::ReadyForTxCollection { + parent_announce, + program_states, + .. + } = std::mem::replace( + &mut self.state, + // Temporary placeholder, will be overwritten by build_announce_with_states + State::Delay { timer: None }, + ) + else { + unreachable!() + }; + + let state = + self.build_announce_with_states(parent_announce, &program_states)?; + return Ok((Poll::Ready(()), state)); + } + } State::AggregateBatchCommitment { future } => match future.poll_unpin(cx) { Poll::Ready(Ok(Some(batch))) => { tracing::debug!(batch.block_hash = %batch.block_hash, "Batch commitment aggregated, switch to Coordinator"); @@ -184,6 +249,7 @@ impl Producer { .into()) } + /// Phase 1: Request canonical-only compute to get fresh ProgramStates for TX validation. fn produce_announce(mut self) -> Result { if !self.ctx.core.db.block_meta(self.block.hash).prepared { return Err(anyhow!( @@ -197,11 +263,32 @@ impl Producer { self.ctx.core.commitment_delay_limit, )?; + // Phase 1: ask compute to run canonical events only (no TXs). + // The result (ProgramStates) arrives via process_canonical_events_computed. + self.ctx + .output(ConsensusEvent::ComputeCanonicalEvents( + self.block.hash, + parent, + self.ctx.core.block_gas_limit, + )); + self.state = State::WaitingCanonicalComputed { + parent_announce: parent, + }; + + Ok(self.into()) + } + + /// Phase 2: Select TXs using post-canonical ProgramStates, build and gossip announce. + fn build_announce_with_states( + mut self, + parent: HashOf, + program_states: &ProgramStates, + ) -> Result { let injected_transactions = self .ctx .core .injected_pool - .select_for_announce(self.block, parent)?; + .select_for_announce_with_states(self.block, parent, program_states)?; let announce = Announce { block_hash: self.block.hash, @@ -213,14 +300,10 @@ impl Producer { let (announce_hash, newly_included) = self.ctx.core.db.include_announce(announce.clone())?; if !newly_included { - // This can happen in case of abuse from rpc - the same eth block is announced multiple times, - // then the same announce is created multiple times, and include_announce would return already included. - // In this case we just go to initial state, without publishing anything and computing announce again. self.warning(format!( "Announce created {announce:?} is already included at {}", self.block.hash )); - return Initial::create(self.ctx); } @@ -466,6 +549,9 @@ mod tests { #[async_trait] trait ProducerExt: Sized { + /// Skip the delay timer and complete two-phase flow: + /// 1. Timer fires → ComputeCanonicalEvents + /// 2. process_canonical_events_computed → PublishMessage + ComputeAnnounce async fn skip_timer(self) -> Result<(Self, HashOf)>; } @@ -487,13 +573,32 @@ mod tests { let state = ValidatorState::from(producer); + // Phase 1: timer fires → ComputeCanonicalEvents + let (state, event) = state.wait_for_event().await?; + assert!(state.is_producer(), "Expected producer state, got {state}"); + assert!( + event.is_compute_canonical_events(), + "Expected ComputeCanonicalEvents, got {event:?}" + ); + + // Extract block_hash from the event before consuming state + let (block_hash, _, _) = event.unwrap_compute_canonical_events(); + + // Phase 2: deliver empty ProgramStates → enters ReadyForTxCollection + let state = state.process_canonical_events_computed( + block_hash, + ethexe_common::ProgramStates::new(), + )?; + assert!(state.is_producer(), "Expected producer state, got {state}"); + + // Phase 3: poll timer fires → builds announce → PublishMessage + ComputeAnnounce let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); - assert!(event.is_publish_message()); + assert!(event.is_publish_message(), "Expected PublishMessage, got {event:?}"); let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); - assert!(event.is_compute_announce()); + assert!(event.is_compute_announce(), "Expected ComputeAnnounce, got {event:?}"); Ok((state, event.unwrap_compute_announce().0.to_hash())) } diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index 2c2a550238c..1c59ac3c6e4 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -22,7 +22,7 @@ use super::{ }; use crate::{ ConsensusEvent, - announces::{self, AnnounceStatus}, + announces::{self, AnnounceStatus, DBAnnouncesExt}, validator::participant::Participant, }; use anyhow::Result; @@ -94,6 +94,17 @@ impl StateHandler for Subordinate { if verified_announce.address() == self.producer && verified_announce.data().block_hash == self.block.hash => { + // Guard against gossip reordering: if the announce's parent isn't + // in DB yet (e.g., squashed arrived before base), defer instead of + // rejecting — accept_announce would reject with UnknownParent and + // the announce would be permanently lost. + if !self.ctx.core.db.is_announce_included(verified_announce.data().parent) { + tracing::trace!( + "Announce parent not yet included, deferring to pending" + ); + self.ctx.pending(verified_announce); + return Ok(self.into()); + } let (announce, _pub_key) = verified_announce.into_parts(); self.send_announce_for_computation(announce) } @@ -443,7 +454,7 @@ mod tests { } #[test] - fn reject_announce_from_producer() { + fn defer_announce_with_unknown_parent() { let (ctx, pub_keys, _) = mock_validator_context(); let producer = pub_keys[0]; let chain = BlockChain::mock(1).setup(&ctx.core.db); @@ -455,18 +466,15 @@ mod tests { assert!(s.is_subordinate(), "got {s:?}"); assert_eq!(s.context().output, vec![]); - // After receiving invalid announce - subordinate rejects it and switches to initial state. - let s = s.process_announce(announce.clone()).unwrap(); - assert!(s.is_initial(), "got {s:?}"); - assert_eq!(s.context().output.len(), 2); + // Announce with unknown parent is deferred to pending (not rejected), + // supporting gossip reordering where a child arrives before its parent. + let s = s.process_announce(announce).unwrap(); + assert!(s.is_subordinate(), "got {s:?}"); + assert_eq!(s.context().output.len(), 0); assert_eq!( - s.context().output[0], - ConsensusEvent::AnnounceRejected(announce.data().to_hash()) - ); - assert!( - s.context().output[1].is_warning(), - "got {:?}", - s.context().output[1] + s.context().pending_events.len(), + 1, + "Announce should be saved to pending for later replay" ); } } diff --git a/ethexe/consensus/src/validator/tx_pool.rs b/ethexe/consensus/src/validator/tx_pool.rs index f15e240fb45..8100c7d5161 100644 --- a/ethexe/consensus/src/validator/tx_pool.rs +++ b/ethexe/consensus/src/validator/tx_pool.rs @@ -19,7 +19,7 @@ use crate::tx_validation::{TxValidity, TxValidityChecker}; use anyhow::Result; use ethexe_common::{ - Announce, HashOf, MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE, SimpleBlockData, + Announce, HashOf, MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE, ProgramStates, SimpleBlockData, db::{ AnnounceStorageRO, CodesStorageRO, GlobalsStorageRO, InjectedStorageRW, OnChainStorageRO, }, @@ -81,7 +81,32 @@ where let tx_checker = TxValidityChecker::new_for_announce(self.db.clone(), block, parent_announce)?; + self.select_with_checker(block, &tx_checker) + } + + /// Returns injected transactions validated against provided ProgramStates (from canonical compute). + pub fn select_for_announce_with_states( + &mut self, + block: SimpleBlockData, + parent_announce: HashOf, + program_states: &ProgramStates, + ) -> Result> { + tracing::trace!(block = ?block.hash, "start collecting injected transactions with post-canonical states"); + + let tx_checker = TxValidityChecker::new_with_states( + self.db.clone(), + block, + parent_announce, + program_states.clone(), + )?; + self.select_with_checker(block, &tx_checker) + } + fn select_with_checker( + &mut self, + block: SimpleBlockData, + tx_checker: &TxValidityChecker, + ) -> Result> { let mut touched_programs = crate::utils::block_touched_programs(&self.db, block.hash)?; if touched_programs.len() > MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE as usize { tracing::error!( diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 64f8330a8ef..bb0c61342ff 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -610,6 +610,10 @@ impl Service { ComputeEvent::Promise(promise, announce_hash) => { consensus.receive_promise_for_signing(promise, announce_hash)?; } + ComputeEvent::CanonicalEventsComputed(block_hash, program_states) => { + consensus + .receive_canonical_events_computed(block_hash, program_states)?; + } }, Event::Network(event) => { let Some(_) = network.as_mut() else { @@ -741,6 +745,17 @@ impl Service { ConsensusEvent::AnnounceAccepted(_) | ConsensusEvent::AnnounceRejected(_) => { // TODO #4940: consider to publish network message } + ConsensusEvent::ComputeCanonicalEvents( + block_hash, + parent_announce, + gas_allowance, + ) => { + compute.compute_canonical_events( + block_hash, + parent_announce, + gas_allowance, + ) + } }, Event::Prometheus(event) => match event { PrometheusEvent::CollectMetrics { libp2p_metrics } => { From e468d8921407f2edc0738ec6702215c545d8d8e1 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 13:00:22 +0400 Subject: [PATCH 02/11] test(ethexe-consensus): add tests and docs for two-phase compute review findings - Add accept_announce leniency tests (UnknownDestination accepted, NonZeroValue rejected) - Add compute_canonical_events test (verifies ProgramStates returned, no DB metadata writes) - Add new_head tests for WaitingCanonicalComputed and ReadyForTxCollection states - Fix poll_next to immediately poll newly created canonical computation future - Document concurrent compute slots and mem::replace placeholder in producer Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/compute/src/compute.rs | 18 +++++- ethexe/compute/src/service.rs | 51 ++++++++++++++- ethexe/consensus/src/announces.rs | 72 ++++++++++++++++++++++ ethexe/consensus/src/validator/producer.rs | 61 +++++++++++++++++- 4 files changed, 199 insertions(+), 3 deletions(-) diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index 8fe692007bb..b09561af3ae 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -250,6 +250,12 @@ impl SubService for ComputeSubService

{ type Output = ComputeEvent; fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + // NOTE: Canonical computation and announce computation use separate future slots + // and can run concurrently. This is by design — they are sequential in the producer + // state machine (canonical finishes before announce starts), but the compute layer + // doesn't enforce ordering. Two processor clones may be alive simultaneously, + // doubling WASM runtime memory briefly. + // Poll canonical-only computation (if active). if let Some(ref mut computation) = self.canonical_computation && let Poll::Ready(result) = computation.poll_unpin(cx) @@ -260,7 +266,7 @@ impl SubService for ComputeSubService

{ })); } - // Start new canonical computation if idle. + // Start new canonical computation if idle, then immediately poll it. if self.canonical_computation.is_none() && let Some((block_hash, parent_announce, gas_allowance)) = self.canonical_input.take() @@ -276,6 +282,16 @@ impl SubService for ComputeSubService

{ ) .boxed(), ); + + // Poll immediately — the future may already be ready (e.g., in tests with MockProcessor). + if let Some(ref mut computation) = self.canonical_computation + && let Poll::Ready(result) = computation.poll_unpin(cx) + { + self.canonical_computation = None; + return Poll::Ready(result.map(|(block_hash, program_states)| { + ComputeEvent::CanonicalEventsComputed(block_hash, program_states) + })); + } } if self.computation.is_none() diff --git a/ethexe/compute/src/service.rs b/ethexe/compute/src/service.rs index 3a6143621c1..c37eacef367 100644 --- a/ethexe/compute/src/service.rs +++ b/ethexe/compute/src/service.rs @@ -150,12 +150,61 @@ pub(crate) trait SubService: Unpin + Send + 'static { mod tests { use super::*; - use ethexe_common::{CodeAndIdUnchecked, db::*, mock::*}; + use ethexe_common::{Announce, CodeAndIdUnchecked, db::*, mock::*}; use ethexe_db::Database as DB; use futures::StreamExt; use gear_core::ids::prelude::CodeIdExt; use gprimitives::CodeId; + /// Test canonical-only computation returns ProgramStates without announce metadata writes. + #[tokio::test] + #[ntest::timeout(10000)] + async fn compute_canonical_events() { + gear_utils::init_default_logger(); + + let db = DB::memory(); + let mut service = ComputeService::new_mock_processor(db.clone()); + + // Setup: chain of 2 blocks. Block 1 has a computed announce. + // We'll prepare block 2 and run canonical compute on it. + let chain = BlockChain::mock(2).setup(&db); + let block = chain.blocks[2].to_simple(); + + // Block 2 is already prepared by BlockChain::mock().setup() + assert!(db.block_meta(block.hash).prepared, "block must be prepared"); + + let parent_announce_hash = chain.block_top_announce_hash(1); + assert!( + db.announce_meta(parent_announce_hash).computed, + "parent announce must be computed" + ); + + // Request canonical-only computation + service.compute_canonical_events(block.hash, parent_announce_hash, 42); + + // Poll service — should get CanonicalEventsComputed + let event = service.next().await.unwrap().unwrap(); + match event { + ComputeEvent::CanonicalEventsComputed(hash, _states) => { + assert_eq!(hash, block.hash); + } + other => panic!("Expected CanonicalEventsComputed, got {other:?}"), + } + + // Verify NO announce metadata was written for the synthetic announce. + let synthetic = Announce { + block_hash: block.hash, + parent: parent_announce_hash, + gas_allowance: Some(42), + injected_transactions: vec![], + }; + let synthetic_hash = synthetic.to_hash(); + assert!( + !db.announce_meta(synthetic_hash).computed, + "Synthetic announce must NOT be marked as computed in DB" + ); + } + /// Test ComputeService block preparation functionality #[tokio::test] async fn prepare_block() { diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 2ad509c61b3..93e778ffd5f 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -1136,4 +1136,76 @@ mod tests { AnnounceRejectionReason::TooManyTouchedPrograms(MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE + 1) ); } + + #[test] + fn accept_announce_lenient_for_unknown_destination() { + let db = Database::memory(); + let chain = BlockChain::mock(10).setup(&db); + + // Create a TX targeting a program NOT in the parent announce's ProgramStates. + // This simulates a TX for a program created in the current block's canonical events. + let unknown_program = ActorId::from([42u8; 32]); + let tx = SignedMessage::create( + PrivateKey::random(), + InjectedTransaction { + destination: unknown_program, + reference_block: chain.blocks[9].hash, + value: 0, + ..InjectedTransaction::mock(()) + }, + ) + .unwrap(); + + let announce = Announce { + block_hash: chain.blocks[10].hash, + parent: chain.block_top_announce_hash(9), + gas_allowance: Some(42), + injected_transactions: vec![tx], + }; + + // accept_announce must accept this — the producer validated against + // post-canonical states where the program exists. + let status = accept_announce(&db, announce).unwrap(); + assert!( + matches!(status, AnnounceStatus::Accepted(_)), + "Announce with UnknownDestination TX should be accepted (state-dependent leniency), got {status:?}" + ); + } + + #[test] + fn accept_announce_rejects_structural_invalidity() { + let db = Database::memory(); + let chain = BlockChain::mock(10).setup(&db); + + // Create a TX with non-zero value — structural violation. + let tx = SignedMessage::create( + PrivateKey::random(), + InjectedTransaction { + destination: ActorId::zero(), + reference_block: chain.blocks[9].hash, + value: 100, + ..InjectedTransaction::mock(()) + }, + ) + .unwrap(); + + let announce = Announce { + block_hash: chain.blocks[10].hash, + parent: chain.block_top_announce_hash(9), + gas_allowance: Some(42), + injected_transactions: vec![tx], + }; + + let status = accept_announce(&db, announce).unwrap(); + assert!( + matches!( + status, + AnnounceStatus::Rejected { + reason: AnnounceRejectionReason::TxValidity(TxValidity::NonZeroValue), + .. + } + ), + "Announce with NonZeroValue TX must be rejected, got {status:?}" + ); + } } diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 78cc67b05f1..622bf0f4e5f 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -184,13 +184,16 @@ impl StateHandler for Producer { State::ReadyForTxCollection { poll_timer, .. } => { if poll_timer.poll_unpin(cx).is_ready() { // Timer fired — collect TXs and build announce. + // We use mem::replace to move ProgramStates out of self.state. + // The Delay { timer: None } placeholder is a dead state (never fires). + // If build_announce_with_states errors, the `?` propagates and the + // producer is dropped, so the placeholder is never observed. let State::ReadyForTxCollection { parent_announce, program_states, .. } = std::mem::replace( &mut self.state, - // Temporary placeholder, will be overwritten by build_announce_with_states State::Delay { timer: None }, ) else { @@ -547,6 +550,62 @@ mod tests { // TODO: test that zero timer works as expected + #[tokio::test] + #[ntest::timeout(3000)] + async fn new_head_during_canonical_compute() { + let (ctx, keys, _) = mock_validator_context(); + let validators = nonempty![ctx.core.pub_key.to_address(), keys[0].to_address()].into(); + let chain = BlockChain::mock(1).setup(&ctx.core.db); + let block = chain.blocks[1].to_simple(); + + let state = Producer::create(ctx, block, validators).unwrap(); + + // Wait for timer to fire → ComputeCanonicalEvents + let (state, event) = state.wait_for_event().await.unwrap(); + assert!(event.is_compute_canonical_events()); + + // Now in WaitingCanonicalComputed. Send a new head. + let new_block = SimpleBlockData::mock(()); + let state = state.process_new_head(new_block).unwrap(); + + // Should transition to Initial (canonical compute discarded) + assert!( + state.is_initial(), + "new_head during WaitingCanonicalComputed must go to Initial, got {state}" + ); + } + + #[tokio::test] + #[ntest::timeout(3000)] + async fn new_head_during_tx_collection() { + let (ctx, keys, _) = mock_validator_context(); + let validators = nonempty![ctx.core.pub_key.to_address(), keys[0].to_address()].into(); + let chain = BlockChain::mock(1).setup(&ctx.core.db); + let block = chain.blocks[1].to_simple(); + + let state = Producer::create(ctx, block, validators).unwrap(); + + // Wait for timer to fire → ComputeCanonicalEvents + let (state, event) = state.wait_for_event().await.unwrap(); + let (block_hash, _, _) = event.unwrap_compute_canonical_events(); + + // Deliver canonical events → enters ReadyForTxCollection + let state = state + .process_canonical_events_computed(block_hash, ethexe_common::ProgramStates::new()) + .unwrap(); + assert!(state.is_producer()); + + // Now in ReadyForTxCollection. Send a new head before the poll timer fires. + let new_block = SimpleBlockData::mock(()); + let state = state.process_new_head(new_block).unwrap(); + + // Should transition to Initial (TX collection discarded) + assert!( + state.is_initial(), + "new_head during ReadyForTxCollection must go to Initial, got {state}" + ); + } + #[async_trait] trait ProducerExt: Sized { /// Skip the delay timer and complete two-phase flow: From 35a66d6634db0368e0eb55477c71163ab737f57b Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 13:12:00 +0400 Subject: [PATCH 03/11] fix(ethexe-processor): process canonical events before injected TXs The processor's handle_injected_and_events processed injected TXs before canonical block events. This meant TXs targeting programs created in the same block would panic at update_state ("failed to find program in known states") because ProgramCreated hadn't been handled yet. Swap the order: canonical events first (establishes program state), then injected TXs (can now reference newly created programs). Required for two-phase compute where the producer includes TXs for same-block programs. Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/processor/src/lib.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 073552e3f46..eb56a32c324 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -217,12 +217,8 @@ impl Processor { ) -> Result { let mut handler = ProcessingHandler::new(self.db.clone(), transitions); - for tx in injected_transactions { - let source = tx.address().into(); - let tx = tx.into_parts().0; - handler.handle_injected_transaction(source, tx)?; - } - + // Process canonical events FIRST: ProgramCreated, ExecutableBalanceTopUp, etc. + // must establish program state before injected TXs can target those programs. for event in events { match event { BlockRequestEvent::Router(event) => { @@ -234,6 +230,13 @@ impl Processor { } } + // Then process injected TXs against the post-canonical state. + for tx in injected_transactions { + let source = tx.address().into(); + let tx = tx.into_parts().0; + handler.handle_injected_transaction(source, tx)?; + } + Ok(handler.into_transitions()) } From ac0fd21656071ff99445de1991a2f8837f027782 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 13:29:29 +0400 Subject: [PATCH 04/11] fix(ethexe): address edge cases from review - compute_canonical_only: replace hard assertion on parent computed with predecessor computation loop. On fast chains, the parent announce may still be computing when the producer starts. Errors would crash the service via the `?` propagation in the event loop. - accept_announce: defer set_injected_transaction until after all acceptance checks pass (touched-programs limit, duplicate inclusion). Prevents a malicious producer from forcing peers to persist junk TX blobs via announces that will be rejected by later checks. - subordinate: inline accept_announce call in process_announce instead of pre-checking is_announce_included. On UnknownParent rejection, defer to pending (gossip reordering). Other rejections handled normally. Cleaner separation: the defer guard now uses the same acceptance path as the happy path. Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/compute/src/compute.rs | 25 +++++++--- ethexe/consensus/src/announces.rs | 12 +++-- ethexe/consensus/src/validator/subordinate.rs | 48 ++++++++++++++----- 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index b09561af3ae..47eaec9ceaf 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -211,7 +211,8 @@ impl ComputeSubService

{ } /// Compute canonical events only, returning ProgramStates without announce metadata writes. - /// PRECONDITION: parent_announce must already be computed. + /// If the parent announce is not yet computed, computes predecessors first (those ARE + /// real announces that get full DB writes). Only the synthetic announce is ephemeral. async fn compute_canonical_only( db: Database, config: ComputeConfig, @@ -224,10 +225,6 @@ impl ComputeSubService

{ return Err(ComputeError::BlockNotPrepared(block_hash)); } - if !db.announce_meta(parent_announce).computed { - return Err(ComputeError::AnnounceNotFound(parent_announce)); - } - // Build synthetic announce with empty TXs — never stored in DB let synthetic = Announce { block_hash, @@ -236,8 +233,22 @@ impl ComputeSubService

{ injected_transactions: vec![], }; - // Run through processor. CAS/state blobs are written (idempotent), - // but we skip announce-level metadata writes. + // Compute any uncomputed predecessors. These are real announces that need + // full DB writes (announce_outcome, announce_program_states, etc.). + // On a fast chain, the parent announce may still be computing when we start. + let predecessors = utils::collect_not_computed_predecessors(&synthetic, &db)?; + if !predecessors.is_empty() { + log::trace!( + "compute-canonical: {} uncomputed predecessor(s) for parent {parent_announce}", + predecessors.len(), + ); + for (hash, announce) in predecessors { + Self::compute_one(&db, &mut processor, config, hash, announce, None).await?; + } + } + + // Run canonical events through processor. CAS/state blobs are written (idempotent), + // but we skip announce-level metadata writes for the synthetic announce. let executable = utils::prepare_executable_for_announce(&db, synthetic, config.canonical_quarantine())?; let result = processor.process_programs(executable, None).await?; diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 93e778ffd5f..7d66c027b0b 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -719,6 +719,9 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result Result { - db.set_injected_transaction(tx.clone()); - } + | TxValidity::InsufficientBalanceForInjectedMessages => {} // Structural violations that cannot resolve after canonical execution. validity => { @@ -777,6 +778,11 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result { - // Guard against gossip reordering: if the announce's parent isn't - // in DB yet (e.g., squashed arrived before base), defer instead of - // rejecting — accept_announce would reject with UnknownParent and - // the announce would be permanently lost. - if !self.ctx.core.db.is_announce_included(verified_announce.data().parent) { - tracing::trace!( - "Announce parent not yet included, deferring to pending" - ); - self.ctx.pending(verified_announce); - return Ok(self.into()); + let (announce, _pub_key) = verified_announce.clone().into_parts(); + match announces::accept_announce(&self.ctx.core.db, announce.clone())? { + AnnounceStatus::Accepted(announce_hash) => { + self.ctx + .output(ConsensusEvent::AnnounceAccepted(announce_hash)); + self.ctx.output(ConsensusEvent::ComputeAnnounce( + announce, + PromisePolicy::Disabled, + )); + self.state = State::WaitingAnnounceComputed { announce_hash }; + Ok(self.into()) + } + AnnounceStatus::Rejected { + reason: AnnounceRejectionReason::UnknownParent { .. }, + .. + } => { + // Parent not yet included — defer to pending instead of rejecting. + // Gossip reordering can cause the child to arrive before the parent. + // The announce will be retried when Subordinate::create runs next block, + // or recovered via collect_not_committed_predecessors. + tracing::trace!( + "Announce parent not yet included, deferring to pending" + ); + self.ctx.pending(verified_announce); + Ok(self.into()) + } + AnnounceStatus::Rejected { announce, reason } => { + self.ctx + .output(ConsensusEvent::AnnounceRejected(announce.to_hash())); + self.warning(format!( + "Received announce {announce:?} is rejected: {reason:?}" + )); + Initial::create(self.ctx) + } } - let (announce, _pub_key) = verified_announce.into_parts(); - self.send_announce_for_computation(announce) } _ => DefaultProcessing::announce_from_producer(self, verified_announce), } From 49d58e122331b165eeec92886298517daad5eeed Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 13:35:11 +0400 Subject: [PATCH 05/11] fix(ethexe-processor): preserve injected TX priority over canonical messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous fix swapped the entire event ordering (canonical before injected), which changed execution priority. Injected TXs should be processed before canonical messages. New ordering: Router events first (ProgramCreated, CodeValidated — registers programs), then injected TXs (preserving priority), then Mirror events (canonical messages). This fixes UnknownDestination panics for same-block programs without deprioritizing injected TXs. Also refactors duplicated canonical poll logic in compute.rs per Gemini review feedback (loop instead of duplicated if-blocks). Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/compute/src/compute.rs | 58 ++++++++++++++++------------------- ethexe/processor/src/lib.rs | 16 +++++++--- 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index 47eaec9ceaf..cc11b537a9d 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -267,42 +267,36 @@ impl SubService for ComputeSubService

{ // doesn't enforce ordering. Two processor clones may be alive simultaneously, // doubling WASM runtime memory briefly. - // Poll canonical-only computation (if active). - if let Some(ref mut computation) = self.canonical_computation - && let Poll::Ready(result) = computation.poll_unpin(cx) - { - self.canonical_computation = None; - return Poll::Ready(result.map(|(block_hash, program_states)| { - ComputeEvent::CanonicalEventsComputed(block_hash, program_states) - })); - } + // Poll canonical computation: start if idle, then poll until pending. + loop { + if let Some(ref mut computation) = self.canonical_computation { + if let Poll::Ready(result) = computation.poll_unpin(cx) { + self.canonical_computation = None; + return Poll::Ready(result.map(|(block_hash, program_states)| { + ComputeEvent::CanonicalEventsComputed(block_hash, program_states) + })); + } + break; + } - // Start new canonical computation if idle, then immediately poll it. - if self.canonical_computation.is_none() - && let Some((block_hash, parent_announce, gas_allowance)) = + if let Some((block_hash, parent_announce, gas_allowance)) = self.canonical_input.take() - { - self.canonical_computation = Some( - Self::compute_canonical_only( - self.db.clone(), - self.config, - self.processor.clone(), - block_hash, - parent_announce, - gas_allowance, - ) - .boxed(), - ); - - // Poll immediately — the future may already be ready (e.g., in tests with MockProcessor). - if let Some(ref mut computation) = self.canonical_computation - && let Poll::Ready(result) = computation.poll_unpin(cx) { - self.canonical_computation = None; - return Poll::Ready(result.map(|(block_hash, program_states)| { - ComputeEvent::CanonicalEventsComputed(block_hash, program_states) - })); + self.canonical_computation = Some( + Self::compute_canonical_only( + self.db.clone(), + self.config, + self.processor.clone(), + block_hash, + parent_announce, + gas_allowance, + ) + .boxed(), + ); + continue; } + + break; } if self.computation.is_none() diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index eb56a32c324..084863e920c 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -217,26 +217,34 @@ impl Processor { ) -> Result { let mut handler = ProcessingHandler::new(self.db.clone(), transitions); - // Process canonical events FIRST: ProgramCreated, ExecutableBalanceTopUp, etc. - // must establish program state before injected TXs can target those programs. + // Split events: Router events (ProgramCreated, CodeValidated, etc.) register + // programs and must run before injected TXs can reference them. + // Mirror events (MessageQueueingRequested, etc.) enqueue canonical messages + // and run after injected TXs to preserve injected priority. + let mut mirror_events = Vec::new(); for event in events { match event { BlockRequestEvent::Router(event) => { handler.handle_router_event(event)?; } BlockRequestEvent::Mirror { actor_id, event } => { - handler.handle_mirror_event(actor_id, event)?; + mirror_events.push((actor_id, event)); } } } - // Then process injected TXs against the post-canonical state. + // Injected TXs second — they have execution priority over canonical messages. for tx in injected_transactions { let source = tx.address().into(); let tx = tx.into_parts().0; handler.handle_injected_transaction(source, tx)?; } + // Mirror events last — canonical messages enqueued after injected. + for (actor_id, event) in mirror_events { + handler.handle_mirror_event(actor_id, event)?; + } + Ok(handler.into_transitions()) } From 6fcaf60375fdba8d98c52b8b0190bd88f73b5f97 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 13:42:32 +0400 Subject: [PATCH 06/11] refactor(ethexe-consensus): borrow ProgramStates in TxValidityChecker Use Cow so new_for_announce owns the states (from DB) while new_with_states borrows them (from canonical compute). Avoids cloning the entire BTreeMap during TX selection. Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/consensus/src/tx_validation.rs | 32 +++++++++++++---------- ethexe/consensus/src/validator/tx_pool.rs | 4 +-- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/ethexe/consensus/src/tx_validation.rs b/ethexe/consensus/src/tx_validation.rs index 802205e683e..c9f4a3fd3eb 100644 --- a/ethexe/consensus/src/tx_validation.rs +++ b/ethexe/consensus/src/tx_validation.rs @@ -26,6 +26,7 @@ use ethexe_common::{ use ethexe_runtime_common::state::Storage; use gprimitives::H256; use hashbrown::HashSet; +use std::borrow::Cow; /// Minimum executable balance for a program to receive injected transactions. /// 100 - is value per gas @@ -54,15 +55,17 @@ pub enum TxValidity { InsufficientBalanceForInjectedMessages, } -pub struct TxValidityChecker { +pub struct TxValidityChecker<'a, DB> { db: DB, chain_head: SimpleBlockData, start_block_hash: H256, recent_included_txs: HashSet>, - latest_states: ProgramStates, + latest_states: Cow<'a, ProgramStates>, } -impl TxValidityChecker { +impl<'a, DB: OnChainStorageRO + AnnounceStorageRO + GlobalsStorageRO + Storage> + TxValidityChecker<'a, DB> +{ pub fn new_for_announce( db: DB, chain_head: SimpleBlockData, @@ -82,31 +85,32 @@ impl TxVa let start_block_hash = db.globals().start_block_hash; Ok(Self { recent_included_txs: Self::collect_recent_included_txs(&db, announce)?, - latest_states: db - .announce_program_states(last_computed_predecessor) - .ok_or_else(|| { - anyhow!( - "Cannot find computed announce {last_computed_predecessor} programs states in db" - ) - })?, + latest_states: Cow::Owned( + db.announce_program_states(last_computed_predecessor) + .ok_or_else(|| { + anyhow!( + "Cannot find computed announce {last_computed_predecessor} programs states in db" + ) + })?, + ), db, chain_head, start_block_hash, }) } - /// Create checker with externally-provided ProgramStates (from canonical compute). - /// `parent_announce` is used only for duplicate TX detection, not for state lookup. + /// Create checker with borrowed ProgramStates (from canonical compute). + /// Avoids cloning the BTreeMap. `parent_announce` is used only for duplicate TX detection. pub fn new_with_states( db: DB, chain_head: SimpleBlockData, parent_announce: HashOf, - program_states: ProgramStates, + program_states: &'a ProgramStates, ) -> Result { let start_block_hash = db.globals().start_block_hash; Ok(Self { recent_included_txs: Self::collect_recent_included_txs(&db, parent_announce)?, - latest_states: program_states, + latest_states: Cow::Borrowed(program_states), db, chain_head, start_block_hash, diff --git a/ethexe/consensus/src/validator/tx_pool.rs b/ethexe/consensus/src/validator/tx_pool.rs index 8100c7d5161..1b5be26f53e 100644 --- a/ethexe/consensus/src/validator/tx_pool.rs +++ b/ethexe/consensus/src/validator/tx_pool.rs @@ -97,7 +97,7 @@ where self.db.clone(), block, parent_announce, - program_states.clone(), + program_states, )?; self.select_with_checker(block, &tx_checker) } @@ -105,7 +105,7 @@ where fn select_with_checker( &mut self, block: SimpleBlockData, - tx_checker: &TxValidityChecker, + tx_checker: &TxValidityChecker<'_, DB>, ) -> Result> { let mut touched_programs = crate::utils::block_touched_programs(&self.db, block.hash)?; if touched_programs.len() > MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE as usize { From d4caa19c9b3f23edef791183acf7fddd36714241 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 13:49:53 +0400 Subject: [PATCH 07/11] chore: apply rustfmt Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/compute/src/compute.rs | 3 +- ethexe/compute/src/service.rs | 7 +++- ethexe/consensus/src/validator/producer.rs | 40 ++++++++++--------- ethexe/consensus/src/validator/subordinate.rs | 4 +- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index cc11b537a9d..f092d288feb 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -279,8 +279,7 @@ impl SubService for ComputeSubService

{ break; } - if let Some((block_hash, parent_announce, gas_allowance)) = - self.canonical_input.take() + if let Some((block_hash, parent_announce, gas_allowance)) = self.canonical_input.take() { self.canonical_computation = Some( Self::compute_canonical_only( diff --git a/ethexe/compute/src/service.rs b/ethexe/compute/src/service.rs index c37eacef367..4fb3928c610 100644 --- a/ethexe/compute/src/service.rs +++ b/ethexe/compute/src/service.rs @@ -96,8 +96,11 @@ impl ComputeService

{ parent_announce: HashOf, gas_allowance: u64, ) { - self.compute_sub_service - .receive_canonical_to_compute(block_hash, parent_announce, gas_allowance); + self.compute_sub_service.receive_canonical_to_compute( + block_hash, + parent_announce, + gas_allowance, + ); } } diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 622bf0f4e5f..57b7f8e7e1f 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -28,8 +28,7 @@ use anyhow::{Result, anyhow}; use derive_more::{Debug, Display}; use ethexe_common::{ Announce, HashOf, ProgramStates, PromisePolicy, SimpleBlockData, ValidatorsVec, - db::BlockMetaStorageRO, gear::BatchCommitment, injected::Promise, - network::ValidatorMessage, + db::BlockMetaStorageRO, gear::BatchCommitment, injected::Promise, network::ValidatorMessage, }; use ethexe_service_utils::Timer; use futures::{FutureExt, future::BoxFuture}; @@ -157,8 +156,7 @@ impl StateHandler for Producer { // Enter TX collection window. The poll timer gives TXs // time to arrive before building the announce. - let mut poll_timer = - Timer::new("tx-collection poll", self.ctx.core.producer_delay); + let mut poll_timer = Timer::new("tx-collection poll", self.ctx.core.producer_delay); poll_timer.start(()); self.state = State::ReadyForTxCollection { @@ -192,10 +190,7 @@ impl StateHandler for Producer { parent_announce, program_states, .. - } = std::mem::replace( - &mut self.state, - State::Delay { timer: None }, - ) + } = std::mem::replace(&mut self.state, State::Delay { timer: None }) else { unreachable!() }; @@ -268,12 +263,11 @@ impl Producer { // Phase 1: ask compute to run canonical events only (no TXs). // The result (ProgramStates) arrives via process_canonical_events_computed. - self.ctx - .output(ConsensusEvent::ComputeCanonicalEvents( - self.block.hash, - parent, - self.ctx.core.block_gas_limit, - )); + self.ctx.output(ConsensusEvent::ComputeCanonicalEvents( + self.block.hash, + parent, + self.ctx.core.block_gas_limit, + )); self.state = State::WaitingCanonicalComputed { parent_announce: parent, }; @@ -608,9 +602,11 @@ mod tests { #[async_trait] trait ProducerExt: Sized { - /// Skip the delay timer and complete two-phase flow: - /// 1. Timer fires → ComputeCanonicalEvents - /// 2. process_canonical_events_computed → PublishMessage + ComputeAnnounce + /// Skip the initial producer delay and complete the full two-phase announce production flow: + /// 1. produce_announce is triggered, emitting ComputeCanonicalEvents. + /// 2. process_canonical_events_computed is called, transitioning to ReadyForTxCollection. + /// 3. The poll timer fires, triggering build_announce_with_states, + /// which emits PublishMessage and ComputeAnnounce. async fn skip_timer(self) -> Result<(Self, HashOf)>; } @@ -653,11 +649,17 @@ mod tests { // Phase 3: poll timer fires → builds announce → PublishMessage + ComputeAnnounce let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); - assert!(event.is_publish_message(), "Expected PublishMessage, got {event:?}"); + assert!( + event.is_publish_message(), + "Expected PublishMessage, got {event:?}" + ); let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); - assert!(event.is_compute_announce(), "Expected ComputeAnnounce, got {event:?}"); + assert!( + event.is_compute_announce(), + "Expected ComputeAnnounce, got {event:?}" + ); Ok((state, event.unwrap_compute_announce().0.to_hash())) } diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index 7a5ff21dde5..749142b90df 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -114,9 +114,7 @@ impl StateHandler for Subordinate { // Gossip reordering can cause the child to arrive before the parent. // The announce will be retried when Subordinate::create runs next block, // or recovered via collect_not_committed_predecessors. - tracing::trace!( - "Announce parent not yet included, deferring to pending" - ); + tracing::trace!("Announce parent not yet included, deferring to pending"); self.ctx.pending(verified_announce); Ok(self.into()) } From eef074e17a4d30b0bef7bb0ba6409263f993eaa8 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 13:52:04 +0400 Subject: [PATCH 08/11] chore: fix ethexe-service formatting Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/service/src/lib.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index bb0c61342ff..9fe9ed8e2d6 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -611,8 +611,7 @@ impl Service { consensus.receive_promise_for_signing(promise, announce_hash)?; } ComputeEvent::CanonicalEventsComputed(block_hash, program_states) => { - consensus - .receive_canonical_events_computed(block_hash, program_states)?; + consensus.receive_canonical_events_computed(block_hash, program_states)?; } }, Event::Network(event) => { @@ -750,11 +749,7 @@ impl Service { parent_announce, gas_allowance, ) => { - compute.compute_canonical_events( - block_hash, - parent_announce, - gas_allowance, - ) + compute.compute_canonical_events(block_hash, parent_announce, gas_allowance) } }, Event::Prometheus(event) => match event { From b4f6c6486e425426b50695929cecdf0ee738bc91 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 14:07:57 +0400 Subject: [PATCH 09/11] chore: suppress dead_code warning for select_for_announce Method is used by tests and accept_announce but not by the producer directly (producer uses select_for_announce_with_states instead). Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/consensus/src/validator/tx_pool.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethexe/consensus/src/validator/tx_pool.rs b/ethexe/consensus/src/validator/tx_pool.rs index 1b5be26f53e..f2252902b9a 100644 --- a/ethexe/consensus/src/validator/tx_pool.rs +++ b/ethexe/consensus/src/validator/tx_pool.rs @@ -72,6 +72,8 @@ where } /// Returns the injected transactions that are valid and can be included to announce. + /// Used by accept_announce validation and tests. The producer uses select_for_announce_with_states. + #[allow(dead_code)] pub fn select_for_announce( &mut self, block: SimpleBlockData, From cf85e722015f9bb6a8f3e39c7082c782ce80627b Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 14:21:00 +0400 Subject: [PATCH 10/11] fix(ci): pass model via claude_args, not as action input `anthropics/claude-code-action@v1` doesn't have a `model` input. Pass `--model claude-opus-4-6` through `claude_args` instead. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/claude-code-review.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/claude-code-review.yml b/.github/workflows/claude-code-review.yml index 79c85e77e70..45e9f64ca4b 100644 --- a/.github/workflows/claude-code-review.yml +++ b/.github/workflows/claude-code-review.yml @@ -71,9 +71,8 @@ jobs: - name: Run Claude Code Review uses: anthropics/claude-code-action@v1 with: - model: claude-opus-4-6 claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} - claude_args: '--mcp-config /tmp/mcp-config.json --thinking-budget high --allowedTools "Bash,Read,Glob,Grep,mcp__code-review-graph__detect_changes_tool,mcp__code-review-graph__get_review_context_tool,mcp__code-review-graph__get_impact_radius_tool"' + claude_args: '--model claude-opus-4-6 --thinking-budget high --mcp-config /tmp/mcp-config.json --allowedTools "Bash,Read,Glob,Grep,mcp__code-review-graph__detect_changes_tool,mcp__code-review-graph__get_review_context_tool,mcp__code-review-graph__get_impact_radius_tool"' prompt: | Review PR #${{ github.event.pull_request.number }} in gear (114-crate Rust monorepo, Solidity contracts in ethexe/). @@ -213,9 +212,8 @@ jobs: - name: Run Claude Delta Review uses: anthropics/claude-code-action@v1 with: - model: claude-opus-4-6 claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} - claude_args: '--mcp-config /tmp/mcp-config.json --thinking-budget high --allowedTools "Bash,Read,Glob,Grep,mcp__code-review-graph__detect_changes_tool,mcp__code-review-graph__get_review_context_tool,mcp__code-review-graph__get_impact_radius_tool"' + claude_args: '--model claude-opus-4-6 --thinking-budget high --mcp-config /tmp/mcp-config.json --allowedTools "Bash,Read,Glob,Grep,mcp__code-review-graph__detect_changes_tool,mcp__code-review-graph__get_review_context_tool,mcp__code-review-graph__get_impact_radius_tool"' prompt: | Delta review of new commits on PR #${{ steps.pr.outputs.number }} in gear (114-crate Rust monorepo). Diff: ${{ steps.last-review.outputs.sha }}..${{ steps.pr.outputs.head_sha }}. Do NOT re-review old code. From 51ee77eee44bfe810fe8af12447138113d1c2678 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Thu, 16 Apr 2026 15:08:24 +0400 Subject: [PATCH 11/11] revert(ethexe-processor): restore original event ordering Reverts the processor event ordering changes (commits 35a66d66, 49d58e12). Programs only initialize via Ethereum canonical events. Injected TXs targeting same-block programs will correctly fail regardless of registration order because the program isn't initialized until process_queues executes the init message (after handle_injected_and_events). The two-phase compute is still useful for TXs targeting programs that existed before this block but whose state changed (balance top-up, etc.). The processor doesn't need to change for that case. Co-Authored-By: Claude Opus 4.6 (1M context) --- ethexe/processor/src/lib.rs | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 084863e920c..073552e3f46 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -217,34 +217,23 @@ impl Processor { ) -> Result { let mut handler = ProcessingHandler::new(self.db.clone(), transitions); - // Split events: Router events (ProgramCreated, CodeValidated, etc.) register - // programs and must run before injected TXs can reference them. - // Mirror events (MessageQueueingRequested, etc.) enqueue canonical messages - // and run after injected TXs to preserve injected priority. - let mut mirror_events = Vec::new(); + for tx in injected_transactions { + let source = tx.address().into(); + let tx = tx.into_parts().0; + handler.handle_injected_transaction(source, tx)?; + } + for event in events { match event { BlockRequestEvent::Router(event) => { handler.handle_router_event(event)?; } BlockRequestEvent::Mirror { actor_id, event } => { - mirror_events.push((actor_id, event)); + handler.handle_mirror_event(actor_id, event)?; } } } - // Injected TXs second — they have execution priority over canonical messages. - for tx in injected_transactions { - let source = tx.address().into(); - let tx = tx.into_parts().0; - handler.handle_injected_transaction(source, tx)?; - } - - // Mirror events last — canonical messages enqueued after injected. - for (actor_id, event) in mirror_events { - handler.handle_mirror_event(actor_id, event)?; - } - Ok(handler.into_transitions()) }