Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion ethexe/compute/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::{ComputeError, ComputeEvent, ProcessorExt, Result, service::SubService};
use ethexe_common::{
Announce, HashOf, PromisePolicy, SimpleBlockData,
Announce, HashOf, ProgramStates, PromisePolicy, SimpleBlockData,
db::{
AnnounceStorageRO, AnnounceStorageRW, BlockMetaStorageRO, CodesStorageRW, ConfigStorageRO,
GlobalsStorageRW, OnChainStorageRO,
Expand Down Expand Up @@ -87,6 +87,11 @@ pub struct ComputeSubService<P: ProcessorExt> {
computation: Option<ComputationFuture>,
promises_stream: Option<utils::AnnouncePromisesStream>,
pending_event: Option<Result<ComputeEvent>>,

/// Input for canonical-only computation (block_hash, parent_announce, gas_allowance).
canonical_input: Option<(H256, HashOf<Announce>, u64)>,
/// Active canonical-only computation future.
canonical_computation: Option<BoxFuture<'static, Result<(H256, ProgramStates)>>>,
}

impl<P: ProcessorExt> ComputeSubService<P> {
Expand All @@ -100,6 +105,8 @@ impl<P: ProcessorExt> ComputeSubService<P> {
computation: None,
promises_stream: None,
pending_event: None,
canonical_input: None,
canonical_computation: None,
}
}

Expand All @@ -111,6 +118,17 @@ impl<P: ProcessorExt> ComputeSubService<P> {
self.input.push_back((announce, promise_policy));
}

/// Request canonical-only computation. Cancels any stale canonical computation.
pub fn receive_canonical_to_compute(
&mut self,
block_hash: H256,
parent_announce: HashOf<Announce>,
gas_allowance: u64,
) {
self.canonical_computation = None;
self.canonical_input = Some((block_hash, parent_announce, gas_allowance));
}

async fn compute(
db: Database,
config: ComputeConfig,
Expand Down Expand Up @@ -191,12 +209,95 @@ impl<P: ProcessorExt> ComputeSubService<P> {

Ok(announce_hash)
}

/// Compute canonical events only, returning ProgramStates without announce metadata writes.
/// If the parent announce is not yet computed, computes predecessors first (those ARE
/// real announces that get full DB writes). Only the synthetic announce is ephemeral.
async fn compute_canonical_only(
db: Database,
config: ComputeConfig,
mut processor: P,
block_hash: H256,
parent_announce: HashOf<Announce>,
gas_allowance: u64,
) -> Result<(H256, ProgramStates)> {
if !db.block_meta(block_hash).prepared {
return Err(ComputeError::BlockNotPrepared(block_hash));
}

// Build synthetic announce with empty TXs — never stored in DB
let synthetic = Announce {
block_hash,
parent: parent_announce,
gas_allowance: Some(gas_allowance),
injected_transactions: vec![],
};

// Compute any uncomputed predecessors. These are real announces that need
// full DB writes (announce_outcome, announce_program_states, etc.).
// On a fast chain, the parent announce may still be computing when we start.
let predecessors = utils::collect_not_computed_predecessors(&synthetic, &db)?;
if !predecessors.is_empty() {
log::trace!(
"compute-canonical: {} uncomputed predecessor(s) for parent {parent_announce}",
predecessors.len(),
);
for (hash, announce) in predecessors {
Self::compute_one(&db, &mut processor, config, hash, announce, None).await?;
}
}

// Run canonical events through processor. CAS/state blobs are written (idempotent),
// but we skip announce-level metadata writes for the synthetic announce.
let executable =
utils::prepare_executable_for_announce(&db, synthetic, config.canonical_quarantine())?;
let result = processor.process_programs(executable, None).await?;

Ok((block_hash, result.states))
}
}

impl<P: ProcessorExt> SubService for ComputeSubService<P> {
type Output = ComputeEvent;

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
// NOTE: Canonical computation and announce computation use separate future slots
// and can run concurrently. This is by design — they are sequential in the producer
// state machine (canonical finishes before announce starts), but the compute layer
// doesn't enforce ordering. Two processor clones may be alive simultaneously,
// doubling WASM runtime memory briefly.

// Poll canonical computation: start if idle, then poll until pending.
loop {
if let Some(ref mut computation) = self.canonical_computation {
if let Poll::Ready(result) = computation.poll_unpin(cx) {
self.canonical_computation = None;
return Poll::Ready(result.map(|(block_hash, program_states)| {
ComputeEvent::CanonicalEventsComputed(block_hash, program_states)
}));
}
break;
}

if let Some((block_hash, parent_announce, gas_allowance)) = self.canonical_input.take()
{
self.canonical_computation = Some(
Self::compute_canonical_only(
self.db.clone(),
self.config,
self.processor.clone(),
block_hash,
parent_announce,
gas_allowance,
)
.boxed(),
);
continue;
}

break;
}

if self.computation.is_none()
&& self.promises_stream.is_none()
&& let Some((announce, promise_policy)) = self.input.pop_front()
Expand Down
5 changes: 4 additions & 1 deletion ethexe/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use compute::{
ComputeConfig, ComputeSubService,
utils::{find_canonical_events_post_quarantine, prepare_executable_for_announce},
};
use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, injected::Promise};
use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, ProgramStates, injected::Promise};
use ethexe_processor::{ExecutableData, ProcessedCodeInfo, Processor, ProcessorError};
use ethexe_runtime_common::FinalizedBlockTransitions;
use gprimitives::{CodeId, H256};
Expand All @@ -47,6 +47,9 @@ pub enum ComputeEvent {
CodeProcessed(CodeId),
BlockPrepared(H256),
AnnounceComputed(HashOf<Announce>),
/// Canonical events computed for a block. Contains (block_hash, program_states).
/// The program_states are ephemeral — not stored in DB as an announce.
CanonicalEventsComputed(H256, ProgramStates),
Promise(Promise, HashOf<Announce>),
}

Expand Down
68 changes: 66 additions & 2 deletions ethexe/compute/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
compute::{ComputeConfig, ComputeSubService},
prepare::PrepareSubService,
};
use ethexe_common::{Announce, CodeAndIdUnchecked, PromisePolicy};
use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, PromisePolicy};
use ethexe_db::Database;
use ethexe_processor::Processor;
use futures::{Stream, stream::FusedStream};
Expand Down Expand Up @@ -87,6 +87,21 @@ impl<P: ProcessorExt> ComputeService<P> {
self.compute_sub_service
.receive_announce_to_compute(announce, promise_policy);
}

/// Request canonical-only computation for a block.
/// Returns ProgramStates without writing announce metadata to DB.
pub fn compute_canonical_events(
&mut self,
block_hash: H256,
parent_announce: HashOf<Announce>,
gas_allowance: u64,
) {
self.compute_sub_service.receive_canonical_to_compute(
block_hash,
parent_announce,
gas_allowance,
);
}
}

impl<P: ProcessorExt> Stream for ComputeService<P> {
Expand Down Expand Up @@ -138,12 +153,61 @@ pub(crate) trait SubService: Unpin + Send + 'static {
mod tests {

use super::*;
use ethexe_common::{CodeAndIdUnchecked, db::*, mock::*};
use ethexe_common::{Announce, CodeAndIdUnchecked, db::*, mock::*};
use ethexe_db::Database as DB;
use futures::StreamExt;
use gear_core::ids::prelude::CodeIdExt;
use gprimitives::CodeId;

/// Test canonical-only computation returns ProgramStates without announce metadata writes.
#[tokio::test]
#[ntest::timeout(10000)]
async fn compute_canonical_events() {
gear_utils::init_default_logger();

let db = DB::memory();
let mut service = ComputeService::new_mock_processor(db.clone());

// Setup: chain of 2 blocks. Block 1 has a computed announce.
// We'll prepare block 2 and run canonical compute on it.
let chain = BlockChain::mock(2).setup(&db);
let block = chain.blocks[2].to_simple();

// Block 2 is already prepared by BlockChain::mock().setup()
assert!(db.block_meta(block.hash).prepared, "block must be prepared");

let parent_announce_hash = chain.block_top_announce_hash(1);
assert!(
db.announce_meta(parent_announce_hash).computed,
"parent announce must be computed"
);

// Request canonical-only computation
service.compute_canonical_events(block.hash, parent_announce_hash, 42);

// Poll service — should get CanonicalEventsComputed
let event = service.next().await.unwrap().unwrap();
match event {
ComputeEvent::CanonicalEventsComputed(hash, _states) => {
assert_eq!(hash, block.hash);
}
other => panic!("Expected CanonicalEventsComputed, got {other:?}"),
}

// Verify NO announce metadata was written for the synthetic announce.
let synthetic = Announce {
block_hash: block.hash,
parent: parent_announce_hash,
gas_allowance: Some(42),
injected_transactions: vec![],
};
let synthetic_hash = synthetic.to_hash();
assert!(
!db.announce_meta(synthetic_hash).computed,
"Synthetic announce must NOT be marked as computed in DB"
);
}

/// Test ComputeService block preparation functionality
#[tokio::test]
async fn prepare_block() {
Expand Down
97 changes: 92 additions & 5 deletions ethexe/consensus/src/announces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,18 +719,28 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result<A
// Verify for parent announce, because of the current is not processed.
let tx_checker = TxValidityChecker::new_for_announce(db, block, announce.parent)?;

// Validate TXs but defer DB persistence until after full acceptance.
// This prevents a malicious producer from forcing peers to store junk TXs
// in announces that will be rejected by later checks (touched-programs limit, etc.).
for tx in announce.injected_transactions.iter() {
let validity_status = tx_checker.check_tx_validity(tx)?;

match validity_status {
TxValidity::Valid => {
db.set_injected_transaction(tx.clone());
}

// Valid TX or state-dependent conditions that may resolve after canonical
// events are computed. The producer validates against post-canonical states
// (two-phase compute), but accept_announce validates against parent states.
// Programs created in the current block appear as UnknownDestination here
// but become valid after canonical execution.
TxValidity::Valid
| TxValidity::UnknownDestination
| TxValidity::UninitializedDestination
| TxValidity::InsufficientBalanceForInjectedMessages => {}

// Structural violations that cannot resolve after canonical execution.
validity => {
tracing::trace!(
announce = ?announce.to_hash(),
"announce contains invalid transition with status {validity_status:?}, rejecting announce."
"announce contains structurally invalid tx: {validity_status:?}, rejecting announce."
);

return Ok(AnnounceStatus::Rejected {
Expand Down Expand Up @@ -768,6 +778,11 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result<A
});
}

// All checks passed — now persist TXs to DB.
for tx in announce.injected_transactions.iter() {
db.set_injected_transaction(tx.clone());
}

Ok(AnnounceStatus::Accepted(announce_hash))
}

Expand Down Expand Up @@ -1127,4 +1142,76 @@ mod tests {
AnnounceRejectionReason::TooManyTouchedPrograms(MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE + 1)
);
}

#[test]
fn accept_announce_lenient_for_unknown_destination() {
let db = Database::memory();
let chain = BlockChain::mock(10).setup(&db);

// Create a TX targeting a program NOT in the parent announce's ProgramStates.
// This simulates a TX for a program created in the current block's canonical events.
let unknown_program = ActorId::from([42u8; 32]);
let tx = SignedMessage::create(
PrivateKey::random(),
InjectedTransaction {
destination: unknown_program,
reference_block: chain.blocks[9].hash,
value: 0,
..InjectedTransaction::mock(())
},
)
.unwrap();

let announce = Announce {
block_hash: chain.blocks[10].hash,
parent: chain.block_top_announce_hash(9),
gas_allowance: Some(42),
injected_transactions: vec![tx],
};

// accept_announce must accept this — the producer validated against
// post-canonical states where the program exists.
let status = accept_announce(&db, announce).unwrap();
assert!(
matches!(status, AnnounceStatus::Accepted(_)),
"Announce with UnknownDestination TX should be accepted (state-dependent leniency), got {status:?}"
);
}

#[test]
fn accept_announce_rejects_structural_invalidity() {
let db = Database::memory();
let chain = BlockChain::mock(10).setup(&db);

// Create a TX with non-zero value — structural violation.
let tx = SignedMessage::create(
PrivateKey::random(),
InjectedTransaction {
destination: ActorId::zero(),
reference_block: chain.blocks[9].hash,
value: 100,
..InjectedTransaction::mock(())
},
)
.unwrap();

let announce = Announce {
block_hash: chain.blocks[10].hash,
parent: chain.block_top_announce_hash(9),
gas_allowance: Some(42),
injected_transactions: vec![tx],
};

let status = accept_announce(&db, announce).unwrap();
assert!(
matches!(
status,
AnnounceStatus::Rejected {
reason: AnnounceRejectionReason::TxValidity(TxValidity::NonZeroValue),
..
}
),
"Announce with NonZeroValue TX must be rejected, got {status:?}"
);
}
}
Loading
Loading