From 3b0b237ebb96311fd3ce98439ba38a81cb06715b Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 26 Mar 2026 14:52:59 +0300 Subject: [PATCH 1/5] Set environment variables for single-threaded test execution --- .config/nextest.toml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.config/nextest.toml b/.config/nextest.toml index 6616db7bb8c..f29ebe7a75a 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -6,6 +6,12 @@ rsync -av gear-cli-and-runtime-release/ target/release/ chmod +x target/release/gear '''] +[scripts.setup.set-envs] +command = ["bash", "-c", ''' +echo "CARGO_BUILD_JOBS=1" >> $NEXTEST_ENV +echo "RAYON_NUM_THREADS=1" >> $NEXTEST_ENV +'''] + [profile.default] leak-timeout = { period = "5s", result = "fail" } slow-timeout = { period = "1m", terminate-after = 5 } @@ -15,6 +21,10 @@ path = "junit.xml" store-success-output = false store-failure-output = true +[[profile.default.scripts]] +filter = 'all()' +setup = "set-envs" + [profile.ci] fail-fast = false archive.include = [ From 0cecd0ea0f4f1a72b2b5601c59c5133fad9f8c96 Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 26 Mar 2026 14:53:23 +0300 Subject: [PATCH 2/5] Remove unused CI profile overrides from nextest configuration --- .config/nextest.toml | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index f29ebe7a75a..e942e6b0580 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -36,25 +36,7 @@ path = "junit.xml" store-success-output = false store-failure-output = true -# sdk -[[profile.ci.overrides]] -filter = 'package(gsdk) or package(gcli)' -retries = 5 - [[profile.ci.scripts]] filter = 'package(gsdk) or package(gcli)' platform = "cfg(unix)" setup = "replace-node-and-runtime" - -# sometimes fails on CI machine in debug profile -# due to an inconsistent machine load and unoptimized code -[[profile.ci.overrides]] -filter = 'package(gear-authorship)' -retries = 5 -threads-required = 4 - -# ethexe -[[profile.ci.overrides]] -filter = 'package(ethexe-service) or package(ethexe-observer)' -retries = 5 -threads-required = 4 From 795dd87bfb195f82444cef4684042520244fd2ff Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 26 Mar 2026 17:11:50 +0300 Subject: [PATCH 3/5] Run code instrumentation in custom threadpool --- ethexe/compute/src/codes.rs | 80 +++++++------ ethexe/compute/src/compute.rs | 9 +- ethexe/compute/src/lib.rs | 11 +- ethexe/compute/src/tests.rs | 2 +- ethexe/processor/src/handling/mod.rs | 1 - ethexe/processor/src/handling/run.rs | 113 +++++++++--------- ethexe/processor/src/lib.rs | 50 ++++++-- ethexe/processor/src/tests.rs | 45 ++++--- .../src/{handling => }/thread_pool.rs | 8 +- 9 files changed, 187 insertions(+), 132 deletions(-) rename ethexe/processor/src/{handling => }/thread_pool.rs (93%) diff --git a/ethexe/compute/src/codes.rs b/ethexe/compute/src/codes.rs index 74db54cb262..168644c2276 100644 --- a/ethexe/compute/src/codes.rs +++ b/ethexe/compute/src/codes.rs @@ -16,17 +16,20 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{ComputeError, ProcessorExt, Result, service::SubService}; +use crate::{ProcessorExt, Result, service::SubService}; use ethexe_common::{ CodeAndIdUnchecked, db::{CodesStorageRO, CodesStorageRW}, }; use ethexe_db::Database; use ethexe_processor::{ProcessedCodeInfo, ValidCodeInfo}; +use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered}; use gprimitives::CodeId; use metrics::Gauge; -use std::task::{Context, Poll}; -use tokio::task::JoinSet; +use std::{ + future, + task::{Context, Poll}, +}; /// Metrics for the [`CodesSubService`]. #[derive(Clone, metrics_derive::Metrics)] @@ -41,7 +44,7 @@ pub struct CodesSubService { processor: P, metrics: Metrics, - processions: JoinSet>, + processions: FuturesUnordered>>, } impl CodesSubService

{ @@ -50,7 +53,7 @@ impl CodesSubService

{ db, processor, metrics: Metrics::default(), - processions: JoinSet::new(), + processions: FuturesUnordered::new(), } } @@ -71,36 +74,37 @@ impl CodesSubService

{ "Instrumented code {code_id:?} must exist in database" ); } - self.processions.spawn(async move { Ok(code_id) }); + self.processions.push(future::ready(Ok(code_id)).boxed()); } else { let db = self.db.clone(); let mut processor = self.processor.clone(); - self.processions.spawn_blocking(move || { - processor - .process_code(code_and_id) - .map(|ProcessedCodeInfo { code_id, valid }| { - if let Some(ValidCodeInfo { - code, + self.processions.push( + async move { + let ProcessedCodeInfo { code_id, valid } = + processor.process_code(code_and_id).await?; + if let Some(ValidCodeInfo { + code, + instrumented_code, + code_metadata, + }) = valid + { + db.set_original_code(&code); + db.set_instrumented_code( + ethexe_runtime_common::VERSION, + code_id, instrumented_code, - code_metadata, - }) = valid - { - db.set_original_code(&code); - db.set_instrumented_code( - ethexe_runtime_common::VERSION, - code_id, - instrumented_code, - ); - db.set_code_metadata(code_id, code_metadata); - db.set_code_valid(code_id, true); - } else { - db.set_code_valid(code_id, false); - } - - code_id - }) - }); + ); + db.set_code_metadata(code_id, code_metadata); + db.set_code_valid(code_id, true); + } else { + db.set_code_valid(code_id, false); + } + + Ok(code_id) + } + .boxed(), + ); } self.metrics @@ -113,14 +117,14 @@ impl SubService for CodesSubService

{ type Output = CodeId; fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { - futures::ready!(self.processions.poll_join_next(cx)) - .map(|res| { - self.metrics - .processing_codes - .set(self.processions.len() as f64); - res.map_err(ComputeError::CodeProcessJoin)? - }) - .map_or(Poll::Pending, Poll::Ready) + if let Poll::Ready(Some(res)) = self.processions.poll_next_unpin(cx) { + self.metrics + .processing_codes + .set(self.processions.len() as f64); + return Poll::Ready(res); + } + + Poll::Pending } } diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index cb665ec9e99..e2f67248057 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -436,7 +436,7 @@ mod tests { const USER_ID: ActorId = ActorId::new([1u8; 32]); - pub fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId { + pub async fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId { let code_id = CodeId::generate(code); let ValidCodeInfo { @@ -448,6 +448,7 @@ mod tests { code: code.to_vec(), code_id, }) + .await .expect("failed to process code") .valid .expect("code is invalid"); @@ -579,7 +580,8 @@ mod tests { let db = Database::memory(); let mut processor = Processor::new(db.clone()).unwrap(); - let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db); + let ping_code_id = + test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await; let ping_id = ActorId::from(0x10000); let blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db); @@ -702,7 +704,8 @@ mod tests { let db = Database::memory(); let mut processor = Processor::new(db.clone()).unwrap(); - let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db); + let ping_code_id = + test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await; let ping_id = ActorId::from(0x10000); let blockchain = BlockChain::mock(3).setup(&db); diff --git a/ethexe/compute/src/lib.rs b/ethexe/compute/src/lib.rs index a5c3b8618db..1f6af911bd3 100644 --- a/ethexe/compute/src/lib.rs +++ b/ethexe/compute/src/lib.rs @@ -62,8 +62,6 @@ pub enum ComputeError { BlockHeaderNotFound(H256), #[error("block validators committed for era not found for block({0})")] CommittedEraNotFound(H256), - #[error("process code join error")] - CodeProcessJoin(#[from] tokio::task::JoinError), #[error("codes queue not found for computed block({0})")] CodesQueueNotFound(H256), #[error("last committed batch not found for computed block({0})")] @@ -101,7 +99,10 @@ pub trait ProcessorExt: Sized + Unpin + Send + Clone + 'static { executable: ExecutableData, promise_out_tx: Option>, ) -> impl Future> + Send; - fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result; + fn process_code( + &mut self, + code_and_id: CodeAndIdUnchecked, + ) -> impl Future> + Send; } impl ProcessorExt for Processor { @@ -115,7 +116,7 @@ impl ProcessorExt for Processor { .map_err(Into::into) } - fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result { - self.process_code(code_and_id).map_err(Into::into) + async fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result { + self.process_code(code_and_id).await.map_err(Into::into) } } diff --git a/ethexe/compute/src/tests.rs b/ethexe/compute/src/tests.rs index 7aabd3ca808..3b2928a351b 100644 --- a/ethexe/compute/src/tests.rs +++ b/ethexe/compute/src/tests.rs @@ -79,7 +79,7 @@ impl ProcessorExt for MockProcessor { Ok(self.process_programs_result.take().unwrap_or_default()) } - fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result { + async fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result { Ok(self .process_codes_result .take() diff --git a/ethexe/processor/src/handling/mod.rs b/ethexe/processor/src/handling/mod.rs index bea415794c7..1c97c40cd91 100644 --- a/ethexe/processor/src/handling/mod.rs +++ b/ethexe/processor/src/handling/mod.rs @@ -23,7 +23,6 @@ use gprimitives::ActorId; pub(crate) mod events; pub(crate) mod overlaid; pub(crate) mod run; -mod thread_pool; /// A high-level interface for executing ops, /// which mutate states based on the current block request events. diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index 2f3c0dd9ff3..e7403353ce0 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -570,11 +570,57 @@ pub(super) mod chunks_splitting { } } -mod chunk_execution_spawn { +pub(crate) mod chunk_execution_spawn { use super::*; - use crate::{handling::thread_pool::ThreadPool, host::InstanceWrapper}; + use crate::{THREAD_POOL, host::InstanceWrapper}; use ethexe_runtime_common::ProcessQueueContext; - use std::sync::LazyLock; + use itertools::Either; + + pub struct Executable { + queue_type: MessageType, + block_info: BlockInfo, + promise_policy: PromisePolicy, + program_id: ActorId, + state_hash: H256, + instrumented_code: InstrumentedCode, + code_metadata: CodeMetadata, + executor: InstanceWrapper, + db: Box, + gas_allowance_for_chunk: u64, + promise_out_tx: Option>, + } + + pub fn execute_chunk_item(executable: Executable) -> Result { + let Executable { + queue_type, + block_info, + promise_policy, + program_id, + state_hash, + instrumented_code, + code_metadata, + mut executor, + db, + gas_allowance_for_chunk, + promise_out_tx, + } = executable; + + let (jn, new_state_hash, gas_spent) = executor.run( + db, + ProcessQueueContext { + program_id, + state_root: state_hash, + queue_type, + instrumented_code, + code_metadata, + gas_allowance: GasAllowanceCounter::new(gas_allowance_for_chunk), + block_info, + promise_policy, + }, + promise_out_tx, + )?; + Ok((program_id, new_state_hash, jn, gas_spent)) + } /// An alias introduced for better readability of the chunks execution steps. pub type ChunkItemOutput = (ActorId, H256, ProgramJournals, u64); @@ -585,60 +631,11 @@ mod chunk_execution_spawn { /// It means that in the same time unit (!) all programs simultaneously charge gas allowance. If programs were to be /// executed concurrently, then each of the program should have received a reference to the global gas allowance counter /// and charge gas from it concurrently. - pub async fn spawn_chunk_execution( + pub(super) async fn spawn_chunk_execution( ctx: &mut impl RunContext, chunk: Vec<(ActorId, H256)>, queue_type: MessageType, ) -> Result> { - struct Executable { - queue_type: MessageType, - block_info: BlockInfo, - promise_policy: PromisePolicy, - program_id: ActorId, - state_hash: H256, - instrumented_code: InstrumentedCode, - code_metadata: CodeMetadata, - executor: InstanceWrapper, - db: Box, - gas_allowance_for_chunk: u64, - promise_out_tx: Option>, - } - - fn execute_chunk_item(executable: Executable) -> Result { - let Executable { - queue_type, - block_info, - promise_policy, - program_id, - state_hash, - instrumented_code, - code_metadata, - mut executor, - db, - gas_allowance_for_chunk, - promise_out_tx, - } = executable; - - let (jn, new_state_hash, gas_spent) = executor.run( - db, - ProcessQueueContext { - program_id, - state_root: state_hash, - queue_type, - instrumented_code, - code_metadata, - gas_allowance: GasAllowanceCounter::new(gas_allowance_for_chunk), - block_info, - promise_policy, - }, - promise_out_tx, - )?; - Ok((program_id, new_state_hash, jn, gas_spent)) - } - - static THREAD_POOL: LazyLock>> = - LazyLock::new(|| ThreadPool::new(execute_chunk_item)); - let gas_allowance_for_chunk = ctx .inner() .gas_allowance_counter @@ -660,7 +657,7 @@ mod chunk_execution_spawn { let executor = ctx.inner().instance_creator.instantiate()?; - Ok(Executable { + Ok(Either::Left(Executable { queue_type, block_info, promise_policy, @@ -672,11 +669,15 @@ mod chunk_execution_spawn { db: ctx.inner().db.cas().clone_boxed(), gas_allowance_for_chunk, promise_out_tx: ctx.inner().promise_out_tx.clone(), - }) + })) }) .collect::>>()?; - THREAD_POOL.spawn_many(executables).try_collect().await + THREAD_POOL + .spawn_many(executables) + .map(|output| output.unwrap_left()) + .try_collect() + .await } } diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 073552e3f46..9c0c38bd699 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -18,6 +18,9 @@ //! Program's execution service for eGPU. +pub use host::InstanceError; + +use crate::{host::InstanceWrapper, thread_pool::ThreadPool}; use core::num::NonZero; use ethexe_common::{ CodeAndIdUnchecked, ProgramStates, Schedule, SimpleBlockData, @@ -38,15 +41,33 @@ use gear_core::{ use gprimitives::{ActorId, CodeId, H256, MessageId}; use handling::{ProcessingHandler, overlaid::OverlaidRunContext, run::CommonRunContext}; use host::InstanceCreator; +use itertools::Either; +use std::sync::LazyLock; use tokio::sync::mpsc; -pub use host::InstanceError; - mod handling; mod host; #[cfg(test)] mod tests; +mod thread_pool; + +type ExecuteChunkInput = handling::run::chunk_execution_spawn::Executable; +type ExecuteChunkOutput = Result; +type ProcessCodeInput = (InstanceWrapper, Vec); +type ProcessCodeOutput = host::Result>; +type ThreadPoolInput = Either; +type ThreadPoolOutput = Either; + +static THREAD_POOL: LazyLock> = LazyLock::new(|| { + ThreadPool::new(|input: ThreadPoolInput| { + input + .map_left(|executable| { + handling::run::chunk_execution_spawn::execute_chunk_item(executable) + }) + .map_right(|(instance, code)| Processor::instrument_code(instance, code)) + }) +}); // Default amount of programs in one chunk to be processed in parallel. pub const DEFAULT_CHUNK_SIZE: NonZero = NonZero::new(16).unwrap(); @@ -137,7 +158,17 @@ impl Processor { OverlaidProcessor(self) } - pub fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result { + fn instrument_code( + mut instance: InstanceWrapper, + code: Vec, + ) -> host::Result> { + instance.instrument(code) + } + + pub async fn process_code( + &mut self, + code_and_id: CodeAndIdUnchecked, + ) -> Result { log::debug!("Processing upload code {code_and_id:?}"); let CodeAndIdUnchecked { code, code_id } = code_and_id; @@ -149,9 +180,12 @@ impl Processor { }); } - let Some((instrumented_code, code_metadata)) = - self.creator.instantiate()?.instrument(&code)? - else { + let instance = self.creator.instantiate()?; + let res = THREAD_POOL + .spawn(Either::Right((instance, code.clone()))) + .await + .unwrap_right()?; + let Some((instrumented_code, code_metadata)) = res else { return Ok(ProcessedCodeInfo { code_id, valid: None, @@ -278,13 +312,13 @@ impl Processor { } } -#[derive(Clone, Default)] +#[derive(Debug, Clone, Default)] pub struct ProcessedCodeInfo { pub code_id: CodeId, pub valid: Option, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ValidCodeInfo { pub code: Vec, pub instrumented_code: InstrumentedCode, diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index 7abf052cc19..cc5ef6daa63 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -73,7 +73,7 @@ mod utils { (code_id, code) } - pub fn upload_code(processor: &mut Processor, code: &[u8]) -> CodeId { + pub async fn upload_code(processor: &mut Processor, code: &[u8]) -> CodeId { let code_id = CodeId::generate(code); let ValidCodeInfo { @@ -85,6 +85,7 @@ mod utils { code: code.to_vec(), code_id, }) + .await .expect("failed to process code") .valid .expect("code is invalid"); @@ -98,7 +99,7 @@ mod utils { code_id } - pub fn setup_test_env_and_load_codes( + pub async fn setup_test_env_and_load_codes( codes: [&[u8]; N], ) -> (Processor, BlockChain, [CodeId; N]) { let db = Database::memory(); @@ -107,7 +108,7 @@ mod utils { let mut code_ids = Vec::new(); for code in codes { - code_ids.push(upload_code(&mut processor, code)); + code_ids.push(upload_code(&mut processor, code).await); } (processor, chain, code_ids.try_into().unwrap()) @@ -135,7 +136,8 @@ mod utils { } pub async fn simple_init_test(code: impl AsRef<[u8]>) -> InBlockTransitions { - let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([code.as_ref()]); + let (mut processor, chain, [code_id]) = + setup_test_env_and_load_codes([code.as_ref()]).await; let block1 = chain.blocks[1].to_simple(); let mut handler = setup_handler(processor.db.clone(), block1); @@ -180,7 +182,8 @@ mod utils { async fn ping_init() { init_logger(); - let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_ping::WASM_BINARY]); + let (mut processor, chain, [code_id]) = + setup_test_env_and_load_codes([demo_ping::WASM_BINARY]).await; // Empty processing for block1 let executable = ExecutableData { @@ -270,8 +273,8 @@ async fn ping_init() { .expect("failed to process send message"); } -#[test] -fn handle_new_code_valid() { +#[tokio::test] +async fn handle_new_code_valid() { init_logger(); let mut processor = Processor::new(Database::memory()).expect("failed to create processor"); @@ -283,6 +286,7 @@ fn handle_new_code_valid() { code: code.clone(), code_id, }) + .await .map(|res| (res.code_id, res.valid.expect("code must be valid"))) .unwrap(); @@ -296,8 +300,8 @@ fn handle_new_code_valid() { ); } -#[test] -fn handle_new_code_invalid() { +#[tokio::test] +async fn handle_new_code_invalid() { init_logger(); let mut processor = Processor::new(Database::memory()).expect("failed to create processor"); @@ -307,6 +311,7 @@ fn handle_new_code_invalid() { assert!( processor .process_code(CodeAndIdUnchecked { code, code_id }) + .await .expect("failed to call runtime api") .valid .is_none() @@ -318,7 +323,7 @@ async fn ping_pong() { init_logger(); let (mut processor, chain, [code_id, ..]) = - setup_test_env_and_load_codes([demo_ping::WASM_BINARY, demo_async::WASM_BINARY]); + setup_test_env_and_load_codes([demo_ping::WASM_BINARY, demo_async::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); let user_id = ActorId::from(10); @@ -398,7 +403,7 @@ async fn async_and_ping() { }; let (mut processor, chain, [ping_code_id, upload_code_id, ..]) = - setup_test_env_and_load_codes([demo_ping::WASM_BINARY, demo_async::WASM_BINARY]); + setup_test_env_and_load_codes([demo_ping::WASM_BINARY, demo_async::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); let mut handler = setup_handler(processor.db.clone(), block1); @@ -537,7 +542,7 @@ async fn many_waits() { let (_, code) = wat_to_wasm(wat.as_str()); - let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([code.as_slice()]); + let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([code.as_slice()]).await; let block1 = chain.blocks[1].to_simple(); let wake_block = chain.blocks[1 + blocks_to_wait].to_simple(); @@ -689,7 +694,7 @@ async fn overlay_execution() { }; let (mut processor, chain, [ping_code_id, async_code_id]) = - setup_test_env_and_load_codes([demo_ping::WASM_BINARY, demo_async::WASM_BINARY]); + setup_test_env_and_load_codes([demo_ping::WASM_BINARY, demo_async::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); // ----------------------------------------------------------------------------- @@ -919,7 +924,8 @@ async fn injected_ping_pong() { init_logger(); let (promise_out_tx, mut promise_receiver) = mpsc::unbounded_channel(); - let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_ping::WASM_BINARY]); + let (mut processor, chain, [code_id]) = + setup_test_env_and_load_codes([demo_ping::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); let user_1 = ActorId::from(10); @@ -1029,7 +1035,8 @@ async fn injected_prioritized_over_canonical() { init_logger(); let (promise_out_tx, mut promise_receiver) = mpsc::unbounded_channel(); - let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_ping::WASM_BINARY]); + let (mut processor, chain, [code_id]) = + setup_test_env_and_load_codes([demo_ping::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); let canonical_user = ActorId::from(10); @@ -1141,7 +1148,8 @@ async fn injected_prioritized_over_canonical() { async fn executable_balance_charged() { init_logger(); - let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_ping::WASM_BINARY]); + let (mut processor, chain, [code_id]) = + setup_test_env_and_load_codes([demo_ping::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); let mut handler = setup_handler(processor.db.clone(), block1); @@ -1228,7 +1236,7 @@ async fn executable_balance_injected_panic_not_charged() { let (promise_out_tx, mut promise_receiver) = mpsc::unbounded_channel(); let (mut processor, chain, [code_id]) = - setup_test_env_and_load_codes([demo_panic_payload::WASM_BINARY]); + setup_test_env_and_load_codes([demo_panic_payload::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); let user_id = ActorId::from(10); @@ -1367,7 +1375,8 @@ async fn insufficient_executable_balance_still_charged() { init_logger(); - let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_ping::WASM_BINARY]); + let (mut processor, chain, [code_id]) = + setup_test_env_and_load_codes([demo_ping::WASM_BINARY]).await; let block1 = chain.blocks[1].to_simple(); let mut handler = setup_handler(processor.db.clone(), block1); diff --git a/ethexe/processor/src/handling/thread_pool.rs b/ethexe/processor/src/thread_pool.rs similarity index 93% rename from ethexe/processor/src/handling/thread_pool.rs rename to ethexe/processor/src/thread_pool.rs index a8b9fe2043e..575be789f1c 100644 --- a/ethexe/processor/src/handling/thread_pool.rs +++ b/ethexe/processor/src/thread_pool.rs @@ -20,7 +20,7 @@ //! and `threadpool` is not smart enough. use futures::prelude::*; -use std::{num::NonZero, panic::AssertUnwindSafe, thread}; +use std::{env, num::NonZero, panic::AssertUnwindSafe, thread}; type Task = (I, tokio::sync::oneshot::Sender>); @@ -41,7 +41,11 @@ where where F: FnMut(I) -> O + Send + Clone + 'static, { - let n_cpus = thread::available_parallelism().map_or(1, NonZero::get); + let n_cpus = env::var("ETHEXE_PROCESSOR_NUM_THREADS") + .ok() + .and_then(|num| num.parse().ok()) + .or_else(|| thread::available_parallelism().ok()) + .map_or(1, NonZero::get); let (task_tx, task_rx) = crossbeam::channel::unbounded::>(); From 3597c5a3aad584122dc548460234819f507e76fd Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 26 Mar 2026 18:31:57 +0300 Subject: [PATCH 4/5] Type-agnostic `ThreadPool::spawn` --- ethexe/processor/src/handling/run.rs | 92 +++++++--------------------- ethexe/processor/src/lib.rs | 37 +++-------- ethexe/processor/src/thread_pool.rs | 82 ++++++++++++------------- 3 files changed, 70 insertions(+), 141 deletions(-) diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index e7403353ce0..98c844fec7d 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -572,55 +572,9 @@ pub(super) mod chunks_splitting { pub(crate) mod chunk_execution_spawn { use super::*; - use crate::{THREAD_POOL, host::InstanceWrapper}; + use crate::THREAD_POOL; use ethexe_runtime_common::ProcessQueueContext; - use itertools::Either; - - pub struct Executable { - queue_type: MessageType, - block_info: BlockInfo, - promise_policy: PromisePolicy, - program_id: ActorId, - state_hash: H256, - instrumented_code: InstrumentedCode, - code_metadata: CodeMetadata, - executor: InstanceWrapper, - db: Box, - gas_allowance_for_chunk: u64, - promise_out_tx: Option>, - } - - pub fn execute_chunk_item(executable: Executable) -> Result { - let Executable { - queue_type, - block_info, - promise_policy, - program_id, - state_hash, - instrumented_code, - code_metadata, - mut executor, - db, - gas_allowance_for_chunk, - promise_out_tx, - } = executable; - - let (jn, new_state_hash, gas_spent) = executor.run( - db, - ProcessQueueContext { - program_id, - state_root: state_hash, - queue_type, - instrumented_code, - code_metadata, - gas_allowance: GasAllowanceCounter::new(gas_allowance_for_chunk), - block_info, - promise_policy, - }, - promise_out_tx, - )?; - Ok((program_id, new_state_hash, jn, gas_spent)) - } + use futures::stream::FuturesOrdered; /// An alias introduced for better readability of the chunks execution steps. pub type ChunkItemOutput = (ActorId, H256, ProgramJournals, u64); @@ -650,32 +604,32 @@ pub(crate) mod chunk_execution_spawn { timestamp: block_header.timestamp, }; - let executables = chunk + chunk .into_iter() .map(|(program_id, state_hash)| { let (instrumented_code, code_metadata) = ctx.program_code(program_id)?; - - let executor = ctx.inner().instance_creator.instantiate()?; - - Ok(Either::Left(Executable { - queue_type, - block_info, - promise_policy, - program_id, - state_hash, - instrumented_code, - code_metadata, - executor, - db: ctx.inner().db.cas().clone_boxed(), - gas_allowance_for_chunk, - promise_out_tx: ctx.inner().promise_out_tx.clone(), + let mut executor = ctx.inner().instance_creator.instantiate()?; + let db = ctx.inner().db.cas().clone_boxed(); + let promise_out_tx = ctx.inner().promise_out_tx.clone(); + Ok(THREAD_POOL.spawn(move || { + let (jn, new_state_hash, gas_spent) = executor.run( + db, + ProcessQueueContext { + program_id, + state_root: state_hash, + queue_type, + instrumented_code, + code_metadata, + gas_allowance: GasAllowanceCounter::new(gas_allowance_for_chunk), + block_info, + promise_policy, + }, + promise_out_tx, + )?; + Ok((program_id, new_state_hash, jn, gas_spent)) })) }) - .collect::>>()?; - - THREAD_POOL - .spawn_many(executables) - .map(|output| output.unwrap_left()) + .collect::>>()? .try_collect() .await } diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 9c0c38bd699..5f4456b049d 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -20,7 +20,7 @@ pub use host::InstanceError; -use crate::{host::InstanceWrapper, thread_pool::ThreadPool}; +use crate::thread_pool::ThreadPool; use core::num::NonZero; use ethexe_common::{ CodeAndIdUnchecked, ProgramStates, Schedule, SimpleBlockData, @@ -41,7 +41,6 @@ use gear_core::{ use gprimitives::{ActorId, CodeId, H256, MessageId}; use handling::{ProcessingHandler, overlaid::OverlaidRunContext, run::CommonRunContext}; use host::InstanceCreator; -use itertools::Either; use std::sync::LazyLock; use tokio::sync::mpsc; @@ -52,22 +51,7 @@ mod host; mod tests; mod thread_pool; -type ExecuteChunkInput = handling::run::chunk_execution_spawn::Executable; -type ExecuteChunkOutput = Result; -type ProcessCodeInput = (InstanceWrapper, Vec); -type ProcessCodeOutput = host::Result>; -type ThreadPoolInput = Either; -type ThreadPoolOutput = Either; - -static THREAD_POOL: LazyLock> = LazyLock::new(|| { - ThreadPool::new(|input: ThreadPoolInput| { - input - .map_left(|executable| { - handling::run::chunk_execution_spawn::execute_chunk_item(executable) - }) - .map_right(|(instance, code)| Processor::instrument_code(instance, code)) - }) -}); +static THREAD_POOL: LazyLock = LazyLock::new(ThreadPool::new); // Default amount of programs in one chunk to be processed in parallel. pub const DEFAULT_CHUNK_SIZE: NonZero = NonZero::new(16).unwrap(); @@ -158,13 +142,6 @@ impl Processor { OverlaidProcessor(self) } - fn instrument_code( - mut instance: InstanceWrapper, - code: Vec, - ) -> host::Result> { - instance.instrument(code) - } - pub async fn process_code( &mut self, code_and_id: CodeAndIdUnchecked, @@ -180,11 +157,13 @@ impl Processor { }); } - let instance = self.creator.instantiate()?; let res = THREAD_POOL - .spawn(Either::Right((instance, code.clone()))) - .await - .unwrap_right()?; + .spawn({ + let mut instance = self.creator.instantiate()?; + let code = code.clone(); + move || instance.instrument(code) + }) + .await?; let Some((instrumented_code, code_metadata)) = res else { return Ok(ProcessedCodeInfo { code_id, diff --git a/ethexe/processor/src/thread_pool.rs b/ethexe/processor/src/thread_pool.rs index 575be789f1c..e3404f375fa 100644 --- a/ethexe/processor/src/thread_pool.rs +++ b/ethexe/processor/src/thread_pool.rs @@ -19,39 +19,31 @@ //! Small custom thread pool interface, because `rayon` is too smart //! and `threadpool` is not smart enough. -use futures::prelude::*; -use std::{env, num::NonZero, panic::AssertUnwindSafe, thread}; +use std::{any::Any, env, num::NonZero, panic::AssertUnwindSafe, thread}; -type Task = (I, tokio::sync::oneshot::Sender>); +type Task = ( + Box Box + Send + 'static>, + tokio::sync::oneshot::Sender>>, +); -/// Thread pool that handler tasks of type `I` -/// and produces outputs of type `O`. #[derive(Debug, Clone)] -pub struct ThreadPool { - task_tx: crossbeam::channel::Sender>, +pub struct ThreadPool { + task_tx: crossbeam::channel::Sender, } -impl ThreadPool -where - I: Send + 'static, - O: Send + 'static, -{ +impl ThreadPool { /// Creates a new thread pool. - pub fn new(handler: F) -> Self - where - F: FnMut(I) -> O + Send + Clone + 'static, - { + pub fn new() -> Self { let n_cpus = env::var("ETHEXE_PROCESSOR_NUM_THREADS") .ok() .and_then(|num| num.parse().ok()) .or_else(|| thread::available_parallelism().ok()) .map_or(1, NonZero::get); - let (task_tx, task_rx) = crossbeam::channel::unbounded::>(); + let (task_tx, task_rx) = crossbeam::channel::unbounded::(); for _ in 0..n_cpus { let task_rx = task_rx.clone(); - let handler = handler.clone(); thread::spawn(move || { loop { @@ -60,12 +52,8 @@ where break; }; - let mut handler = handler.clone(); - // Output receiver could be cancelled - let _ = sender.send(std::panic::catch_unwind(AssertUnwindSafe(move || { - handler(task) - }))); + let _ = sender.send(std::panic::catch_unwind(AssertUnwindSafe(task))); } }); } @@ -84,42 +72,50 @@ where /// /// Panics if worker thread dies despite using /// `std::panic::catch_unwind` around the handler. - pub async fn spawn(&self, input: I) -> O { + pub async fn spawn(&self, f: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { let (tx, rx) = tokio::sync::oneshot::channel(); + let f = Box::new(move || { + let res = f(); + Box::new(res) as Box<_> + }); + self.task_tx - .try_send((input, tx)) + .try_send((f, tx)) .expect("The channel is unbounded"); - rx.await + let res = rx + .await .expect("Worker thread has died") - .unwrap_or_else(|err| std::panic::resume_unwind(err)) - } - - /// Spawns tasks from an iterator of inputs, - /// producing a stream of outputs. - /// - /// The outputs are ordered the same as inputs. - pub fn spawn_many>(&self, input: II) -> impl Stream { - input - .into_iter() - .map(|input| self.spawn(input)) - .collect::>() + .unwrap_or_else(|err| std::panic::resume_unwind(err)); + *res.downcast::().expect("Failed to downcast result") } } #[cfg(test)] mod tests { use super::*; + use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; + + fn task(n: usize) -> String { + "amogus".repeat(n) + } #[tokio::test] async fn test_thread_pool() { - let thread_pool = ThreadPool::new(|n| "amogus".repeat(n)); + let thread_pool = ThreadPool::new(); + + assert_eq!(thread_pool.spawn(|| task(2)).await, "amogusamogus"); - assert_eq!(thread_pool.spawn(2).await, "amogusamogus"); assert_eq!( - thread_pool - .spawn_many([0, 1, 2, 3]) + [0, 1, 2, 3] + .into_iter() + .map(|n| thread_pool.spawn(move || task(n))) + .collect::>() .collect::>() .await, vec![ @@ -135,7 +131,7 @@ mod tests { // Ensure that panics don't break things for _ in 0..n_cpus * 2 { assert!( - AssertUnwindSafe(thread_pool.spawn(usize::MAX)) + AssertUnwindSafe(thread_pool.spawn(|| task(usize::MAX))) .catch_unwind() .await .is_err() From a2365788c9080bc7aeb077b2efe21494ae834ecb Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 26 Mar 2026 18:33:25 +0300 Subject: [PATCH 5/5] Set `ETHEXE_PROCESSOR_NUM_THREADS` in nextest.toml --- .config/nextest.toml | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index e942e6b0580..34ae7fda772 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,4 +1,4 @@ -experimental = ["setup-scripts"] +experimental = ["setup-scripts", "wrapper-scripts"] [scripts.setup.replace-node-and-runtime] command = ["bash", "-c", ''' @@ -8,10 +8,14 @@ chmod +x target/release/gear [scripts.setup.set-envs] command = ["bash", "-c", ''' -echo "CARGO_BUILD_JOBS=1" >> $NEXTEST_ENV -echo "RAYON_NUM_THREADS=1" >> $NEXTEST_ENV +echo "CARGO_BUILD_JOBS=1" >> "$NEXTEST_ENV" +echo "RAYON_NUM_THREADS=1" >> "$NEXTEST_ENV" +echo "ETHEXE_PROCESSOR_NUM_THREADS=1" >> "$NEXTEST_ENV" '''] +[scripts.wrapper.compute-4-cores] +command = "env RAYON_NUM_THREADS=4 ETHEXE_PROCESSOR_NUM_THREADS=4" + [profile.default] leak-timeout = { period = "5s", result = "fail" } slow-timeout = { period = "1m", terminate-after = 5 } @@ -40,3 +44,13 @@ store-failure-output = true filter = 'package(gsdk) or package(gcli)' platform = "cfg(unix)" setup = "replace-node-and-runtime" + +# tests of such crates have timeouts, so we need to compile runtime faster +[[profile.ci.scripts]] +filter = 'package(ethexe-service) or package(ethexe-rpc) or package(ethexe-compute)' +run-wrapper = "compute-4-cores" + +[[profile.ci.overrides]] +filter = 'package(ethexe-service) or package(ethexe-rpc) or package(ethexe-compute)' +retries = 5 +threads-required = 4