From 1a92e49fd331d9b000a9bd057de5ad94b381f9e9 Mon Sep 17 00:00:00 2001 From: Gleb Smirnov Date: Thu, 15 Jan 2026 15:15:37 +0300 Subject: [PATCH 1/5] refactor: unify `ChargeResult` usage Add `ChargeResult::is_enough` and `ChargeResult::is_not_enough` methods and use them instead of (in)equality check with a random one of `ChargeResult::Enought` and `ChargeResult::NotEnought`. --- core-processor/src/ext.rs | 44 ++++++++++++++++++++++---------- core/src/gas.rs | 26 ++++++++++++++----- utils/builtins-common/src/lib.rs | 10 +++++--- 3 files changed, 56 insertions(+), 24 deletions(-) diff --git a/core-processor/src/ext.rs b/core-processor/src/ext.rs index cd0fcb15d19..67a896d516d 100644 --- a/core-processor/src/ext.rs +++ b/core-processor/src/ext.rs @@ -399,11 +399,15 @@ impl<'a, LP: LazyPagesInterface> ExtMutator<'a, LP> { .alloc::>(ctx, mem, pages, |pages| { let cost = gas_for_call.saturating_add(gas_for_pages.cost_for(pages)); // Inline charge_gas_if_enough because otherwise we have borrow error due to access to `allocations_context` mutable - if self.gas_counter.charge_if_enough(cost) == ChargeResult::NotEnough { + if self.gas_counter.charge_if_enough(cost).is_not_enough() { return Err(ChargeError::GasLimitExceeded); } - if self.gas_allowance_counter.charge_if_enough(cost) == ChargeResult::NotEnough { + if self + .gas_allowance_counter + .charge_if_enough(cost) + .is_not_enough() + { return Err(ChargeError::GasAllowanceExceeded); } Ok(()) @@ -411,7 +415,7 @@ impl<'a, LP: LazyPagesInterface> ExtMutator<'a, LP> { } fn reduce_gas(&mut self, limit: GasLimit) -> Result<(), FallibleExtError> { - if self.gas_counter.reduce(limit) == ChargeResult::NotEnough { + if self.gas_counter.reduce(limit).is_not_enough() { return Err(FallibleExecutionError::NotEnoughGas.into()); } @@ -419,7 +423,7 @@ impl<'a, LP: LazyPagesInterface> ExtMutator<'a, LP> { } fn charge_message_value(&mut self, value: u128) -> Result<(), FallibleExtError> { - if self.value_counter.reduce(value) == ChargeResult::NotEnough { + if self.value_counter.reduce(value).is_not_enough() { return Err(FallibleExecutionError::NotEnoughValue.into()); } @@ -440,11 +444,15 @@ impl<'a, LP: LazyPagesInterface> ExtMutator<'a, LP> { } fn charge_gas_if_enough(&mut self, gas: u64) -> Result<(), ChargeError> { - if self.gas_counter.charge_if_enough(gas) == ChargeResult::NotEnough { + if self.gas_counter.charge_if_enough(gas).is_not_enough() { return Err(ChargeError::GasLimitExceeded); } - if self.gas_allowance_counter.charge_if_enough(gas) == ChargeResult::NotEnough { + if self + .gas_allowance_counter + .charge_if_enough(gas) + .is_not_enough() + { return Err(ChargeError::GasAllowanceExceeded); } Ok(()) @@ -737,10 +745,13 @@ impl Ext { gas_allowance_counter: &mut GasAllowanceCounter, amount: u64, ) -> Result<(), ChargeError> { - if gas_counter.charge_if_enough(amount) != ChargeResult::Enough { + if gas_counter.charge_if_enough(amount).is_not_enough() { return Err(ChargeError::GasLimitExceeded); } - if gas_allowance_counter.charge_if_enough(amount) != ChargeResult::Enough { + if gas_allowance_counter + .charge_if_enough(amount) + .is_not_enough() + { // Here might be refunds for gas counter, but it's meaningless since // on gas allowance exceed we totally roll up the message and give // it another try in next block with the same initial resources. @@ -825,7 +836,7 @@ impl CountersOwner for Ext { unreachable!("{err_msg}") }); - if self.context.gas_counter.charge(diff) == ChargeResult::NotEnough { + if self.context.gas_counter.charge(diff).is_not_enough() { let err_msg = format!( "CounterOwner::decrease_current_counter_to: Tried to set gas limit left bigger than before. \ Message id - {message_id}, program id - {program_id}, gas counter - {gas_counter:?}, diff - {diff}", @@ -838,7 +849,12 @@ impl CountersOwner for Ext { unreachable!("{err_msg}") } - if self.context.gas_allowance_counter.charge(diff) == ChargeResult::NotEnough { + if self + .context + .gas_allowance_counter + .charge(diff) + .is_not_enough() + { let err_msg = format!( "CounterOwner::decrease_current_counter_to: Tried to set gas allowance left bigger than before. \ Message id - {message_id}, program id - {program_id}, gas allowance counter - {gas_allowance_counter:?}, diff - {diff}", @@ -1237,7 +1253,7 @@ impl Externalities for Ext { return Err(ReservationError::ZeroReservationAmount.into()); } - if self.context.gas_counter.reduce(amount) == ChargeResult::NotEnough { + if self.context.gas_counter.reduce(amount).is_not_enough() { return Err(FallibleExecutionError::NotEnoughGas.into()); } @@ -1303,7 +1319,7 @@ impl Externalities for Ext { .waitlist .cost_for(mutator.context.reserve_for.saturating_add(duration).into()); - if mutator.gas_counter.reduce(reserve) != ChargeResult::Enough { + if mutator.gas_counter.reduce(reserve).is_not_enough() { return Err(UnrecoverableExecutionError::NotEnoughGas.into()); } @@ -1330,7 +1346,7 @@ impl Externalities for Ext { .waitlist .cost_for(mutator.context.reserve_for.saturating_add(1).into()); - if mutator.gas_counter.reduce(reserve) != ChargeResult::Enough { + if mutator.gas_counter.reduce(reserve).is_not_enough() { return Err(UnrecoverableExecutionError::NotEnoughGas.into()); } @@ -1343,7 +1359,7 @@ impl Externalities for Ext { let reserve_diff = reserve_full - reserve; - Ok(mutator.gas_counter.reduce(reserve_diff) == ChargeResult::Enough) + Ok(mutator.gas_counter.reduce(reserve_diff).is_enough()) }) } diff --git a/core/src/gas.rs b/core/src/gas.rs index 6a630310055..c113b52b32f 100644 --- a/core/src/gas.rs +++ b/core/src/gas.rs @@ -43,10 +43,22 @@ pub enum LockId { pub enum ChargeResult { /// There was enough gas and it has been charged. Enough, - /// There was not enough gas and it hasn't been charged. + /// There was not enough gas. NotEnough, } +impl ChargeResult { + /// Checks whether there was enough gas to charge. + pub const fn is_enough(&self) -> bool { + matches!(self, Self::Enough) + } + + /// Checks whether there was not enough gas to charge. + pub const fn is_not_enough(self) -> bool { + matches!(self, Self::NotEnough) + } +} + /// Gas counter with some predefined maximum gas. /// /// `Copy` and `Clone` traits aren't implemented for the type (however could be) @@ -343,7 +355,7 @@ impl From<(i64, i64)> for GasLeft { #[cfg(test)] mod tests { - use super::{ChargeResult, GasCounter}; + use super::GasCounter; use crate::gas::GasAllowanceCounter; #[test] @@ -355,30 +367,30 @@ mod tests { let result = counter.charge_if_enough(100u64); - assert_eq!(result, ChargeResult::Enough); + assert!(result.is_enough()); assert_eq!(counter.left(), 100); let result = counter.charge_if_enough(101u64); - assert_eq!(result, ChargeResult::NotEnough); + assert!(result.is_not_enough()); assert_eq!(counter.left(), 100); } #[test] fn charge_fails() { let mut counter = GasCounter::new(100); - assert_eq!(counter.charge_if_enough(200u64), ChargeResult::NotEnough); + assert!(counter.charge_if_enough(200u64).is_not_enough()); } #[test] fn charge_token_fails() { let mut counter = GasCounter::new(10); - assert_eq!(counter.charge(1000u64), ChargeResult::NotEnough); + assert!(counter.charge(1000u64).is_not_enough()); } #[test] fn charge_allowance_token_fails() { let mut counter = GasAllowanceCounter::new(10); - assert_eq!(counter.charge(1000u64), ChargeResult::NotEnough); + assert!(counter.charge(1000u64).is_not_enough()); } } diff --git a/utils/builtins-common/src/lib.rs b/utils/builtins-common/src/lib.rs index 0f004f4a9fa..00d317c4839 100644 --- a/utils/builtins-common/src/lib.rs +++ b/utils/builtins-common/src/lib.rs @@ -31,7 +31,7 @@ pub mod bls12_381; pub mod eth_bridge; use gear_core::{ - gas::{ChargeResult, GasAllowanceCounter, GasAmount, GasCounter}, + gas::{GasAllowanceCounter, GasAmount, GasCounter}, limited::LimitedStr, }; use parity_scale_codec::{Decode, Encode}; @@ -54,11 +54,15 @@ impl BuiltinContext { /// Tries to charge the gas amount from the gas counters. pub fn try_charge_gas(&mut self, amount: u64) -> Result<(), BuiltinActorError> { - if self.gas_counter.charge_if_enough(amount) == ChargeResult::NotEnough { + if self.gas_counter.charge_if_enough(amount).is_not_enough() { return Err(BuiltinActorError::InsufficientGas); } - if self.gas_allowance_counter.charge_if_enough(amount) == ChargeResult::NotEnough { + if self + .gas_allowance_counter + .charge_if_enough(amount) + .is_not_enough() + { return Err(BuiltinActorError::GasAllowanceExceeded); } From f65b449075b30e1ba2a1b855958bbe4242e2c3d1 Mon Sep 17 00:00:00 2001 From: Gleb Smirnov Date: Thu, 15 Jan 2026 15:28:55 +0300 Subject: [PATCH 2/5] feat(ethexe-processor): charge fee for the first message in announce This is the initial implementation of a fee for uploading program state transition onto Ethereum. --- core-processor/src/precharge.rs | 48 ++++++++++++++- ethexe/processor/src/handling/run.rs | 27 ++++++++- ethexe/processor/src/host/mod.rs | 2 + ethexe/runtime/common/src/lib.rs | 90 +++++++++++++++++----------- ethexe/runtime/src/wasm/api/mod.rs | 2 + ethexe/runtime/src/wasm/api/run.rs | 2 + 6 files changed, 132 insertions(+), 39 deletions(-) diff --git a/core-processor/src/precharge.rs b/core-processor/src/precharge.rs index 167419e54f6..6e2620ddbad 100644 --- a/core-processor/src/precharge.rs +++ b/core-processor/src/precharge.rs @@ -29,7 +29,7 @@ use core::marker::PhantomData; use gear_core::{ code::{CodeMetadata, InstantiatedSectionSizes, SectionName}, costs::{BytesAmount, ProcessCosts}, - gas::{ChargeResult, GasAllowanceCounter, GasCounter}, + gas::{GasAllowanceCounter, GasCounter}, ids::ActorId, message::IncomingDispatch, }; @@ -61,6 +61,27 @@ pub enum PreChargeGasOperation { /// Obtain program allocations. #[display("obtain program allocations")] Allocations, + /// Other operations. + #[display("{description}")] + Other { + /// Operation description. + description: &'static str, + + /// Whether the operation affects block gas allowance. + affects_allowance: bool, + }, +} + +impl PreChargeGasOperation { + /// Checks whether the operation affects block gas allowance. + pub const fn affects_allowance(&self) -> bool { + match self { + &Self::Other { + affects_allowance, .. + } => affects_allowance, + _ => true, + } + } } /// Defines result variants of the precharge functions. @@ -142,13 +163,34 @@ impl ContextCharged { self.gas_counter.left() } + /// Charges gas for a non-standard operation. + pub fn charge_extra_fee( + self, + description: &'static str, + affects_allowance: bool, + amount: u64, + ) -> PrechargeResult { + self.charge_gas( + PreChargeGasOperation::Other { + description, + affects_allowance, + }, + amount, + ) + } + /// Charges gas for the operation. fn charge_gas( mut self, operation: PreChargeGasOperation, amount: u64, ) -> PrechargeResult> { - if self.gas_allowance_counter.charge_if_enough(amount) != ChargeResult::Enough { + if operation.affects_allowance() + && self + .gas_allowance_counter + .charge_if_enough(amount) + .is_not_enough() + { let gas_burned = self.gas_counter.burned(); return Err(process_allowance_exceed( @@ -158,7 +200,7 @@ impl ContextCharged { )); } - if self.gas_counter.charge_if_enough(amount) != ChargeResult::Enough { + if self.gas_counter.charge_if_enough(amount).is_not_enough() { let gas_burned = self.gas_counter.burned(); let system_reservation_ctx = SystemReservationContext::from_dispatch(&self.dispatch); diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index b93fac40add..ef3de1d25dc 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -122,6 +122,7 @@ use ethexe_runtime_common::{ use gear_core::gas::GasAllowanceCounter; use gprimitives::{ActorId, H256}; use itertools::Itertools; +use std::collections::HashSet; #[derive(Debug, Clone)] pub struct RunnerConfig { @@ -163,11 +164,16 @@ pub async fn run( let mut allowance_counter = GasAllowanceCounter::new(config.block_gas_limit); let chunk_size = config.chunk_processing_threads; + // Set of programs which has already processed + // their queue. Used to charge first message fee. + let mut processed_first_queue = HashSet::new(); + // Start with injected queues processing. let (is_out_of_gas_for_block, run_ctx) = run_inner( run_ctx, db.clone(), instance_creator.clone(), + &mut processed_first_queue, &mut allowance_counter, chunk_size, MessageType::Injected, @@ -180,6 +186,7 @@ pub async fn run( run_ctx, db, instance_creator, + &mut processed_first_queue, &mut allowance_counter, chunk_size, MessageType::Canonical, @@ -203,6 +210,7 @@ pub async fn run_overlaid( run_ctx, db, instance_creator, + &mut HashSet::new(), &mut allowance_counter, chunk_size, MessageType::Canonical, @@ -215,6 +223,7 @@ async fn run_inner( mut run_ctx: C, db: Database, instance_creator: InstanceCreator, + processed_first_queue: &mut HashSet, allowance_counter: &mut GasAllowanceCounter, chunk_size: usize, processing_queue_type: MessageType, @@ -244,6 +253,7 @@ async fn run_inner( chunk, db.clone(), instance_creator.clone(), + processed_first_queue, allowance_counter.left().min(CHUNK_PROCESSING_GAS_LIMIT), processing_queue_type, ) @@ -590,13 +600,25 @@ mod chunk_execution_spawn { chunk: Vec<(ActorId, H256)>, db: Database, instance_creator: InstanceCreator, + processed_first_queue: &mut HashSet, gas_allowance_for_chunk: u64, processing_queue_type: MessageType, ) -> Vec { + let chunk = chunk + .into_iter() + .map(|(program_id, state_hash)| { + ( + program_id, + state_hash, + processed_first_queue.insert(program_id), + ) + }) + .collect::>(); + tokio::task::spawn_blocking(move || { chunk .into_par_iter() - .map(|(program_id, state_hash)| { + .map(|(program_id, state_hash, is_first_queue)| { let db = db.clone(); let mut executor = instance_creator .instantiate() @@ -608,6 +630,7 @@ mod chunk_execution_spawn { program_id, state_hash, processing_queue_type, + is_first_queue, gas_allowance_for_chunk, ); (program_id, new_state_hash, jn, gas_spent) @@ -624,6 +647,7 @@ mod chunk_execution_spawn { program_id: ActorId, state_hash: H256, queue_type: MessageType, + is_first_queue: bool, gas_allowance: u64, ) -> (ProgramJournals, H256, u64) { let code_id = db.program_code_id(program_id).expect("Code ID must be set"); @@ -639,6 +663,7 @@ mod chunk_execution_spawn { queue_type, instrumented_code, code_metadata, + is_first_queue, gas_allowance, ) .expect("Some error occurs while running program in instance") diff --git a/ethexe/processor/src/host/mod.rs b/ethexe/processor/src/host/mod.rs index 2bfa36914d8..315e2191937 100644 --- a/ethexe/processor/src/host/mod.rs +++ b/ethexe/processor/src/host/mod.rs @@ -160,6 +160,7 @@ impl InstanceWrapper { queue_type: MessageType, maybe_instrumented_code: Option, maybe_code_metadata: Option, + is_first_queue: bool, gas_allowance: u64, ) -> Result<(ProgramJournals, H256, u64)> { let chain_head = self.chain_head.expect("chain head must be set before run"); @@ -171,6 +172,7 @@ impl InstanceWrapper { queue_type, maybe_instrumented_code, maybe_code_metadata, + is_first_queue, gas_allowance, ); diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index a5863fdf78f..1869a8467b8 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -116,6 +116,19 @@ impl TransitionController<'_, S> { } } +/// Configuration for Ethereum-related constants. +pub struct EthereumConfig { + /// The amount of gas charged for the first message in announce. + first_message_fee: u64, +} + +/// Processes the program message queue of given type. +/// +/// Panics if the queue is empty. It's needed to guarantee +/// that the function always charges for the first message. +/// +/// Returns journals and the amount of gas burned. +#[allow(clippy::too_many_arguments)] pub fn process_queue( program_id: ActorId, mut program_state: ProgramState, @@ -123,6 +136,7 @@ pub fn process_queue( instrumented_code: Option, code_metadata: Option, ri: &RI, + is_first_queue: bool, gas_allowance: u64, ) -> (ProgramJournals, u64) where @@ -139,10 +153,10 @@ where MessageType::Injected => program_state.injected_queue.hash.is_empty(), }; - if is_queue_empty { - // Queue is empty, nothing to process. - return (Vec::new(), 0); - } + assert!( + !is_queue_empty, + "the function must not be run with empty queues" + ); let queue = program_state .queue_from_msg_type(queue_type) @@ -196,21 +210,31 @@ where reserve_for: 0, }; + // TODO: must be set somewhere by some runtime configuration + let ethereum_config = EthereumConfig { + // TODO: the value of the fee must be something sensible, + // not just some arbitrary number. + first_message_fee: 1000, + }; + let mut mega_journal = Vec::new(); let mut queue_gas_allowance_counter = GasAllowanceCounter::new(gas_allowance); ri.init_lazy_pages(); - for dispatch in queue { + for (i, dispatch) in queue.into_iter().enumerate() { let origin = dispatch.message_type; let call_reply = dispatch.call; + let is_first_message = i == 0 && is_first_queue; let is_first_execution = dispatch.context.is_none(); - let journal = process_dispatch( + let (Ok(journal) | Err(journal)) = process_dispatch( dispatch, &block_config, + ðereum_config, program_id, &program_state, + is_first_message, &instrumented_code, &code_metadata, ri, @@ -250,13 +274,15 @@ where fn process_dispatch( dispatch: Dispatch, block_config: &BlockConfig, + ethereum_config: &EthereumConfig, program_id: ActorId, program_state: &ProgramState, + is_first_message: bool, instrumented_code: &Option, code_metadata: &Option, ri: &RI, gas_allowance: u64, -) -> Vec +) -> Result, Vec> where S: Storage, RI: RuntimeInterface, @@ -285,22 +311,27 @@ where let dispatch = IncomingDispatch::new(kind, incoming_message, context); - let context = ContextCharged::new(program_id, dispatch, gas_allowance); + let mut context = ContextCharged::new(program_id, dispatch, gas_allowance); - let context = match context.charge_for_program(block_config) { - Ok(context) => context, - Err(journal) => return journal, - }; + if is_first_message { + context = context.charge_extra_fee( + "process the first message", + false, + ethereum_config.first_message_fee, + )?; + } + + let context = context.charge_for_program(block_config)?; let active_state = match &program_state.program { state::Program::Active(state) => state, state::Program::Terminated(program_id) => { log::trace!("Program {program_id} has failed init"); - return core_processor::process_failed_init(context); + return Err(core_processor::process_failed_init(context)); } state::Program::Exited(program_id) => { log::trace!("Program {program_id} has exited"); - return core_processor::process_program_exited(context, *program_id); + return Err(core_processor::process_program_exited(context, *program_id)); } }; @@ -319,13 +350,10 @@ where log::trace!( "Program {program_id} is not yet finished initialization, so cannot process handle message" ); - return core_processor::process_uninitialized(context); + return Err(core_processor::process_uninitialized(context)); } - let context = match context.charge_for_code_metadata(block_config) { - Ok(context) => context, - Err(journal) => return journal, - }; + let context = context.charge_for_code_metadata(block_config)?; let code = instrumented_code .as_ref() @@ -334,11 +362,7 @@ where .as_ref() .expect("Code metadata must be provided if program is active"); - let context = - match context.charge_for_instrumented_code(block_config, code.bytes().len() as u32) { - Ok(context) => context, - Err(journal) => return journal, - }; + let context = context.charge_for_instrumented_code(block_config, code.bytes().len() as u32)?; let allocations = active_state.allocations_hash.map_or_default(|hash| { ri.storage() @@ -346,10 +370,7 @@ where .expect("Cannot get allocations") }); - let context = match context.charge_for_allocations(block_config, allocations.tree_len()) { - Ok(context) => context, - Err(journal) => return journal, - }; + let context = context.charge_for_allocations(block_config, allocations.tree_len())?; let actor_data = ExecutableActorData { allocations: allocations.into(), @@ -357,15 +378,12 @@ where memory_infix: active_state.memory_infix, }; - let context = match context.charge_for_module_instantiation( + let context = context.charge_for_module_instantiation( block_config, actor_data, code.instantiated_section_sizes(), code_metadata, - ) { - Ok(context) => context, - Err(journal) => return journal, - }; + )?; let execution_context = ProcessExecutionContext::new( context, @@ -378,8 +396,10 @@ where let random_data = ri.random_data(); - core_processor::process::>(block_config, execution_context, random_data) - .unwrap_or_else(|err| unreachable!("{err}")) + Ok( + core_processor::process::>(block_config, execution_context, random_data) + .unwrap_or_else(|err| unreachable!("{err}")), + ) } pub const fn pack_u32_to_i64(low: u32, high: u32) -> i64 { diff --git a/ethexe/runtime/src/wasm/api/mod.rs b/ethexe/runtime/src/wasm/api/mod.rs index 5b32242b3aa..75a28a24464 100644 --- a/ethexe/runtime/src/wasm/api/mod.rs +++ b/ethexe/runtime/src/wasm/api/mod.rs @@ -50,6 +50,7 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { queue_kind, maybe_instrumented_code, maybe_code_metadata, + is_first_queue, gas_allowance, ) = Decode::decode(&mut get_slice(arg_ptr, arg_len)).unwrap(); @@ -59,6 +60,7 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { queue_kind, maybe_instrumented_code, maybe_code_metadata, + is_first_queue, gas_allowance, ); diff --git a/ethexe/runtime/src/wasm/api/run.rs b/ethexe/runtime/src/wasm/api/run.rs index 0dd9260dc25..b845e5dbd60 100644 --- a/ethexe/runtime/src/wasm/api/run.rs +++ b/ethexe/runtime/src/wasm/api/run.rs @@ -32,6 +32,7 @@ pub fn run( queue_type: MessageType, maybe_instrumented_code: Option, code_metadata: Option, + is_first_queue: bool, gas_allowance: u64, ) -> (ProgramJournals, u64) { log::debug!("You're calling 'run(..)'"); @@ -55,6 +56,7 @@ pub fn run( maybe_instrumented_code, code_metadata, &ri, + is_first_queue, gas_allowance, ); From 18146f7da3b844157a04b9e479bb13ad4dde38ac Mon Sep 17 00:00:00 2001 From: Gleb Smirnov Date: Wed, 21 Jan 2026 12:36:19 +0300 Subject: [PATCH 3/5] Add TODO for `process_queue` --- ethexe/runtime/common/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 1869a8467b8..3dd3b9bf2a2 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -128,6 +128,8 @@ pub struct EthereumConfig { /// that the function always charges for the first message. /// /// Returns journals and the amount of gas burned. +// +// TODO: refactor the function to reduce the number of arguments (#5100) #[allow(clippy::too_many_arguments)] pub fn process_queue( program_id: ActorId, From dac5465060eb8e94ddfd58f805d61fddf5e71791 Mon Sep 17 00:00:00 2001 From: Gleb Smirnov Date: Wed, 21 Jan 2026 12:39:51 +0300 Subject: [PATCH 4/5] Add TODO for `process_dispatch` too --- ethexe/runtime/common/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 3dd3b9bf2a2..676bb68e6bb 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -272,6 +272,7 @@ where (mega_journal, gas_spent) } +// TODO: refactor the function to reduce the number of arguments (#5100) #[allow(clippy::too_many_arguments)] fn process_dispatch( dispatch: Dispatch, From cf86bb3d211e65adef788221fe6470ad3a85031f Mon Sep 17 00:00:00 2001 From: Gleb Smirnov Date: Sat, 24 Jan 2026 17:24:32 +0300 Subject: [PATCH 5/5] Split reading and writing `processed_first_queue` --- ethexe/processor/src/handling/run.rs | 134 +++++++++++++++++---------- 1 file changed, 84 insertions(+), 50 deletions(-) diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index efe475ccdef..cd0ce12cb23 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -241,6 +241,7 @@ async fn run_inner( chunk_size, states, run_ctx, + processed_first_queue, processing_queue_type, ); @@ -314,6 +315,7 @@ pub(crate) trait RunContext { &self, execution_chunks: &mut chunks_splitting::ExecutionChunks, actor_state: chunks_splitting::ActorStateHashWithQueueSize, + is_first_queue: bool, queue_type: MessageType, ) { let chunks_splitting::ActorStateHashWithQueueSize { @@ -329,7 +331,14 @@ pub(crate) trait RunContext { }; let chunk_idx = execution_chunks.chunk_idx(queue_size); - execution_chunks.insert_into(chunk_idx, actor_id, hash); + execution_chunks.insert_into( + chunk_idx, + chunks_splitting::ChunkItem { + actor_id, + hash, + is_first_queue, + }, + ); } /// Checks whether queues for specified program must not be executed in the current run. @@ -411,6 +420,7 @@ impl<'a> RunContext for OverlaidRunContext<'a> { &self, execution_chunks: &mut chunks_splitting::ExecutionChunks, actor_state: chunks_splitting::ActorStateHashWithQueueSize, + is_first_queue: bool, queue_type: MessageType, ) { let chunks_splitting::ActorStateHashWithQueueSize { @@ -425,14 +435,20 @@ impl<'a> RunContext for OverlaidRunContext<'a> { MessageType::Injected => injected_queue_size, }; + let chunk_item = chunks_splitting::ChunkItem { + actor_id, + hash, + is_first_queue, + }; + if self.overlaid_ctx.base_program() == actor_id { // Insert base program into heaviest chunk, which is going to be executed first. // This is done to get faster reply from the target dispatch for which overlaid // executor was created. - execution_chunks.insert_into_heaviest(actor_id, hash); + execution_chunks.insert_into_heaviest(chunk_item); } else { let chunk_idx = execution_chunks.chunk_idx(queue_size); - execution_chunks.insert_into(chunk_idx, actor_id, hash); + execution_chunks.insert_into(chunk_idx, chunk_item); } } @@ -473,8 +489,15 @@ fn states( mod chunks_splitting { use super::*; + #[derive(Debug, Clone, Copy)] + pub(super) struct ChunkItem { + pub actor_id: ActorId, + pub hash: H256, + pub is_first_queue: bool, + } + // An alias introduced for better readability of the chunks splitting steps. - type Chunks = Vec>; + pub(super) type Chunk = Vec; // `prepare_execution_chunks` is not exactly sorting (sorting usually `n*log(n)` this one is `O(n)`), // but rather partitioning into subsets (chunks) of programs with approximately similar queue sizes. @@ -482,12 +505,20 @@ mod chunks_splitting { chunk_size: usize, states: Vec, run_ctx: &mut R, + processed_first_queue: &HashSet, processing_queue_type: MessageType, - ) -> Chunks { + ) -> Vec { let mut execution_chunks = ExecutionChunks::new(chunk_size, states.len()); for state in states { - run_ctx.handle_chunk_data(&mut execution_chunks, state, processing_queue_type); + let is_first_queue = !processed_first_queue.contains(&state.actor_id); + + run_ctx.handle_chunk_data( + &mut execution_chunks, + state, + is_first_queue, + processing_queue_type, + ); } execution_chunks.arrange_execution_chunks(run_ctx) @@ -516,7 +547,7 @@ mod chunks_splitting { /// A helper struct to manage execution chunks during their preparation. pub(crate) struct ExecutionChunks { chunk_size: usize, - chunks: Chunks, + chunks: Vec, } impl ExecutionChunks { @@ -537,29 +568,29 @@ mod chunks_splitting { } /// Inserts chunk execution data into the specified chunk index. - pub(super) fn insert_into(&mut self, idx: usize, actor_id: ActorId, hash: H256) { - if let Some(chunk) = self.chunks.get_mut(idx) { - chunk.push((actor_id, hash)); - } else { + pub(super) fn insert_into(&mut self, idx: usize, item: ChunkItem) { + let Some(chunk) = self.chunks.get_mut(idx) else { panic!( "Chunk index {idx} out of bounds: chunks number - {}", self.chunks.len() - ); - } + ) + }; + + chunk.push(item); } /// Insert chunk execution data into the heaviest chunk (most prior, the last one). - pub(super) fn insert_into_heaviest(&mut self, actor_id: ActorId, hash: H256) { - if let Some(chunk) = self.chunks.last_mut() { - chunk.push((actor_id, hash)); - } else { + pub(super) fn insert_into_heaviest(&mut self, item: ChunkItem) { + let Some(chunk) = self.chunks.last_mut() else { panic!("Chunks are empty, cannot insert into heaviest chunk"); - } + }; + + chunk.push(item); } /// Arranges execution chunks by merging uneven chunks and reversing their order, /// so the heaviest chunks are processed first. - fn arrange_execution_chunks(self, run_ctx: &mut R) -> Chunks { + fn arrange_execution_chunks(self, run_ctx: &mut R) -> Vec { self.chunks .into_iter() // Merge uneven chunks @@ -576,7 +607,7 @@ mod chunks_splitting { // earlier the function will nullify it and skip spawning the job for the program queue as it's empty. If the queue // was already nullified, the function will return `false` and the job will be spawned as usual. // For more info, see impl of the [`OverlaidContext`]. - .filter(|&(program_id, _)| !run_ctx.check_task_no_run(program_id)) + .filter(|&chunk_item| !run_ctx.check_task_no_run(chunk_item.actor_id)) .collect() }) .collect() @@ -586,6 +617,7 @@ mod chunks_splitting { mod chunk_execution_spawn { use super::*; + use chunks_splitting::ChunkItem; use rayon::iter::{IntoParallelIterator, ParallelIterator}; /// An alias introduced for better readability of the chunks execution steps. @@ -598,44 +630,43 @@ mod chunk_execution_spawn { /// executed concurrently, then each of the program should have received a reference to the global gas allowance counter /// and charge gas from it concurrently. pub(super) async fn spawn_chunk_execution( - chunk: Vec<(ActorId, H256)>, + chunk: Vec, db: Database, instance_creator: InstanceCreator, processed_first_queue: &mut HashSet, gas_allowance_for_chunk: u64, processing_queue_type: MessageType, ) -> Vec { - let chunk = chunk - .into_iter() - .map(|(program_id, state_hash)| { - ( - program_id, - state_hash, - processed_first_queue.insert(program_id), - ) - }) - .collect::>(); + for chunk_item in &chunk { + processed_first_queue.insert(chunk_item.actor_id); + } tokio::task::spawn_blocking(move || { chunk .into_par_iter() - .map(|(program_id, state_hash, is_first_queue)| { - let db = db.clone(); - let mut executor = instance_creator - .instantiate() - .expect("Failed to instantiate executor"); - - let (jn, new_state_hash, gas_spent) = run_runtime( - db, - &mut executor, - program_id, - state_hash, - processing_queue_type, - is_first_queue, - gas_allowance_for_chunk, - ); - (program_id, new_state_hash, jn, gas_spent) - }) + .map( + |ChunkItem { + actor_id: program_id, + hash: state_hash, + is_first_queue, + }| { + let db = db.clone(); + let mut executor = instance_creator + .instantiate() + .expect("Failed to instantiate executor"); + + let (jn, new_state_hash, gas_spent) = run_runtime( + db, + &mut executor, + program_id, + state_hash, + processing_queue_type, + is_first_queue, + gas_allowance_for_chunk, + ); + (program_id, new_state_hash, jn, gas_spent) + }, + ) .collect() }) .await @@ -813,10 +844,13 @@ mod tests { let mut common_run_context = CommonRunContext { in_block_transitions: &mut InBlockTransitions::default(), }; + + let processed_first_queue = HashSet::from([2, 3, 5].map(ActorId::from)); let chunks = chunks_splitting::prepare_execution_chunks( CHUNK_PROCESSING_THREADS, states, &mut common_run_context, + &processed_first_queue, MessageType::Canonical, ); @@ -826,9 +860,9 @@ mod tests { .map(|chunk| { chunk .into_iter() - .map(|(_, hash)| { + .map(|chunk_item| { states_to_queue_size - .get(&hash) + .get(&chunk_item.hash) .expect("State hash must be in the map") }) .sum::()