diff --git a/Cargo.lock b/Cargo.lock index 3993e965ef3..0f8d93bf481 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5206,6 +5206,7 @@ dependencies = [ "metrics", "metrics-derive", "ntest", + "proptest", "thiserror 2.0.17", "tokio", "wasmparser 0.230.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethexe/compute/Cargo.toml b/ethexe/compute/Cargo.toml index a964902cc3d..02e0264f48c 100644 --- a/ethexe/compute/Cargo.toml +++ b/ethexe/compute/Cargo.toml @@ -35,5 +35,6 @@ wasmparser.workspace = true ethexe-common = { workspace = true, features = ["mock"] } ethexe-db = { workspace = true, features = ["mock"] } ntest.workspace = true +proptest.workspace = true # test examples demo-ping = { workspace = true, features = ["ethexe"] } diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index e2f67248057..48a2f1257a2 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -403,7 +403,13 @@ pub(crate) mod utils { #[cfg(test)] mod tests { use super::*; - use crate::{ComputeService, tests::MockProcessor}; + use crate::{ + ComputeService, + tests::{ + MockProcessor, block_chain_strategy, next_compute_event, next_subservice_event, + proptest_config, run_async_test, + }, + }; use ethexe_common::{ DEFAULT_BLOCK_GAS_LIMIT, db::{GlobalsStorageRO, OnChainStorageRW}, @@ -411,7 +417,7 @@ mod tests { RouterEvent, mirror::ExecutableBalanceTopUpRequestedEvent, router::ProgramCreatedEvent, }, gear::StateTransition, - mock::*, + mock::BlockChain, }; use ethexe_processor::Processor; use gear_core::{ @@ -419,6 +425,8 @@ mod tests { rpc::ReplyInfo, }; use gprimitives::{ActorId, H256}; + use proptest::{collection, prelude::*}; + use std::collections::BTreeMap; mod test_utils { use crate::CodeAndIdUnchecked; @@ -516,273 +524,281 @@ mod tests { } } - #[tokio::test] - #[ntest::timeout(3000)] - async fn test_compute() { - gear_utils::init_default_logger(); - - // Create non-empty processor result with transitions - let non_empty_result = FinalizedBlockTransitions { - transitions: vec![StateTransition { - actor_id: ActorId::from([1; 32]), - new_state_hash: H256::from([2; 32]), - value_to_receive: 100, - ..Default::default() - }], - ..Default::default() - }; + fn promise_test_inputs_strategy() -> BoxedStrategy<(BlockChain, Vec)> { + (4usize..=16) + .prop_flat_map(|blockchain_len| { + let requestable_indexes = (2..blockchain_len).collect::>(); + let max_selected = requestable_indexes.len(); - let db = Database::memory(); - let block_hash = BlockChain::mock(1).setup(&db).blocks[1].hash; - let config = ComputeConfig::without_quarantine(); - let mut service = ComputeSubService::new( - config, - db.clone(), - MockProcessor { - process_programs_result: Some(non_empty_result), - ..Default::default() - }, - ); + block_chain_strategy(blockchain_len as u32).prop_flat_map(move |chain| { + prop::sample::subsequence(requestable_indexes.clone(), 1..=max_selected) + .prop_map(move |request_indexes| (chain.clone(), request_indexes)) + }) + }) + .boxed() + } - let announce = Announce { - block_hash, - parent: db.config().genesis_announce_hash, - gas_allowance: Some(100), - injected_transactions: vec![], - }; - let announce_hash = announce.to_hash(); + fn predecessor_test_inputs_strategy() -> BoxedStrategy { + (2u32..=16).prop_flat_map(block_chain_strategy).boxed() + } - service.receive_announce_to_compute(announce, PromisePolicy::Disabled); + async fn collect_compute_events( + service: &mut ComputeService

, + expected_events: usize, + ) -> Vec { + let mut observed_events = Vec::with_capacity(expected_events); + + while observed_events.len() < expected_events { + observed_events.push(next_compute_event(service).await); + } - assert_eq!( - service.next().await.unwrap().unwrap_announce_computed(), - announce_hash - ); + observed_events + } - // Verify block was marked as computed - assert!(db.announce_meta(announce_hash).computed); + proptest! { + #![proptest_config(proptest_config(32))] + + #[test] + fn test_compute( + chain in block_chain_strategy(1), + transitions in collection::vec(any::(), 1..=4) + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let db = Database::memory(); + let block_hash = chain.setup(&db).blocks[1].hash; + let config = ComputeConfig::without_quarantine(); + let mut service = ComputeSubService::new( + config, + db.clone(), + MockProcessor { + process_programs_result: Some(FinalizedBlockTransitions { + transitions: transitions.clone(), + ..Default::default() + }), + ..Default::default() + }, + ); - // Verify transitions were stored in DB - let stored_transitions = db.announce_outcome(announce_hash).unwrap(); - assert_eq!(stored_transitions.len(), 1); - assert_eq!(stored_transitions[0].actor_id, ActorId::from([1; 32])); - assert_eq!(stored_transitions[0].new_state_hash, H256::from([2; 32])); + let announce = Announce { + block_hash, + parent: db.config().genesis_announce_hash, + gas_allowance: Some(100), + injected_transactions: vec![], + }; + let announce_hash = announce.to_hash(); - // Verify latest announce - assert_eq!(db.globals().latest_computed_announce_hash, announce_hash); - } + service.receive_announce_to_compute(announce, PromisePolicy::Disabled); - #[tokio::test] - #[ntest::timeout(60000)] - async fn test_compute_with_promises() { - gear_utils::init_default_logger(); - const BLOCKCHAIN_LEN: usize = 10; - - let db = Database::memory(); - let mut processor = Processor::new(db.clone()).unwrap(); - let ping_code_id = - test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await; - let ping_id = ActorId::from(0x10000); - - let blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db); - - // Setup first announce. - let start_announce_hash = { - let mut announce = blockchain.block_top_announce(0).announce.clone(); - announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); - - let announce_hash = db.set_announce(announce); - db.mutate_announce_meta(announce_hash, |meta| meta.computed = true); - db.globals_mutate(|globals| { - globals.start_announce_hash = announce_hash; + assert_eq!( + next_subservice_event(&mut service).await, + ComputeEvent::AnnounceComputed(announce_hash) + ); + assert!(db.announce_meta(announce_hash).computed); + assert_eq!(db.announce_outcome(announce_hash).unwrap(), transitions); + assert_eq!(db.globals().latest_computed_announce_hash, announce_hash); }); - db.set_announce_program_states(announce_hash, Default::default()); - db.set_announce_schedule(announce_hash, Default::default()); + } + } - announce_hash - }; + proptest! { + #![proptest_config(proptest_config(64))] + + #[test] + fn test_compute_with_promises( + (chain, request_indexes) in promise_test_inputs_strategy() + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let db = Database::memory(); + let mut processor = Processor::new(db.clone()).unwrap(); + let ping_code_id = + test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await; + let ping_id = ActorId::from(0x10000); + let blockchain = chain.setup(&db); + let blockchain_len = blockchain.blocks.len() - 1; + + let start_announce_hash = { + let mut announce = blockchain.block_top_announce(0).announce.clone(); + announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); + + let announce_hash = db.set_announce(announce); + db.mutate_announce_meta(announce_hash, |meta| meta.computed = true); + db.globals_mutate(|globals| { + globals.start_announce_hash = announce_hash; + }); + db.set_announce_program_states(announce_hash, Default::default()); + db.set_announce_schedule(announce_hash, Default::default()); - // Setup announces and events. - let mut parent_announce = start_announce_hash; - let announces_chain = (1..BLOCKCHAIN_LEN) - .map(|i| { - let announce = { + announce_hash + }; + + let mut parent_announce = start_announce_hash; + let mut announces_by_block = BTreeMap::new(); + + for i in 1..blockchain_len { let mut announce = blockchain.block_top_announce(i).announce.clone(); announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); announce.parent = parent_announce; - let block = announce.block_hash; - let txs = if i != 1 { - vec![test_utils::injected_tx(ping_id, b"PING".into(), block)] + if i != 1 { + announce.injected_transactions = + vec![test_utils::injected_tx(ping_id, b"PING".into(), announce.block_hash)]; + } + + let announce_hash = db.set_announce(announce.clone()); + db.mutate_announce_meta(announce_hash, |meta| meta.computed = false); + + let mut block_events = if i == 1 { + test_utils::create_program_events(ping_id, ping_code_id) } else { Default::default() }; + block_events.extend(test_utils::block_events(5, ping_id, b"PING".into())); + db.set_block_events(announce.block_hash, &block_events); - announce.injected_transactions = txs; - announce - }; - - let announce_hash = db.set_announce(announce.clone()); - db.mutate_announce_meta(announce_hash, |meta| meta.computed = false); - - let mut block_events = if i == 1 { - test_utils::create_program_events(ping_id, ping_code_id) - } else { - Default::default() - }; - block_events.extend(test_utils::block_events(5, ping_id, b"PING".into())); - db.set_block_events(announce.block_hash, &block_events); - - parent_announce = announce_hash; - announce - }) - .collect::>(); - - let mut compute_service = - ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); - - // Send announces for computation. - compute_service.compute_announce( - announces_chain.get(2).unwrap().clone(), - PromisePolicy::Enabled, - ); - compute_service.compute_announce( - announces_chain.get(5).unwrap().clone(), - PromisePolicy::Enabled, - ); - compute_service.compute_announce( - announces_chain.get(8).unwrap().clone(), - PromisePolicy::Enabled, - ); - - let mut expected_announces = vec![ - announces_chain.get(2).unwrap().to_hash(), - announces_chain.get(5).unwrap().to_hash(), - announces_chain.get(8).unwrap().to_hash(), - ]; - - let mut expected_promises = expected_announces - .iter() - .map(|hash| { - let announce = db.announce(*hash).unwrap(); - let tx = announce.injected_transactions[0].clone().into_data(); - Promise { - tx_hash: tx.to_hash(), - reply: ReplyInfo { - payload: b"PONG".into(), - value: 0, - code: ReplyCode::Success(SuccessReplyReason::Manual), - }, + parent_announce = announce_hash; + announces_by_block.insert(i, announce); } - }) - .collect::>(); - while !expected_announces.is_empty() || !expected_promises.is_empty() { - match compute_service.next().await.unwrap().unwrap() { - ComputeEvent::AnnounceComputed(hash) => { - if *expected_announces.first().unwrap() == hash { - expected_announces.remove(0); - } - } - ComputeEvent::Promise(promise, announce) => { - if *expected_announces.first().unwrap() == announce - && expected_promises.first().unwrap().clone() == promise - { - expected_promises.remove(0); - } + let mut compute_service = + ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); + let mut expected_events = Vec::with_capacity(request_indexes.len() * 2); + + // `subsequence` preserves order, predecessors are computed silently, and only the + // requested announces emit the Promise + AnnounceComputed pairs asserted below. + for index in &request_indexes { + let announce = announces_by_block[index].clone(); + let announce_hash = announce.to_hash(); + let tx = announce + .injected_transactions + .first() + .cloned() + .expect( + "request indexes start at 2, so each requested announce carries one injected transaction", + ) + .into_data(); + + expected_events.push(ComputeEvent::Promise( + Promise { + tx_hash: tx.to_hash(), + reply: ReplyInfo { + payload: b"PONG".into(), + value: 0, + code: ReplyCode::Success(SuccessReplyReason::Manual), + }, + }, + announce_hash, + )); + expected_events.push(ComputeEvent::AnnounceComputed(announce_hash)); + compute_service.compute_announce(announce, PromisePolicy::Enabled); } - _ => unreachable!("unexpected event for current test"), - } - } - } - #[tokio::test] - #[ntest::timeout(60000)] - async fn test_compute_with_early_break() { - gear_utils::init_default_logger(); + let observed_events = + collect_compute_events(&mut compute_service, expected_events.len()).await; + assert_eq!(observed_events, expected_events); + }); + } - let db = Database::memory(); - let mut processor = Processor::new(db.clone()).unwrap(); + #[test] + fn test_compute_with_early_break( + chain in block_chain_strategy(3), + tx_count in 30usize..=100 + ) { + gear_utils::init_default_logger(); - let ping_code_id = - test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await; - let ping_id = ActorId::from(0x10000); + run_async_test(async move { + let db = Database::memory(); + let mut processor = Processor::new(db.clone()).unwrap(); - let blockchain = BlockChain::mock(3).setup(&db); + let ping_code_id = + test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await; + let ping_id = ActorId::from(0x10000); + let blockchain = chain.setup(&db); - let first_announce_hash = { - let mut announce = blockchain.block_top_announce(1).announce.clone(); - announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); + let first_announce_hash = { + let mut announce = blockchain.block_top_announce(1).announce.clone(); + announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); - let mut canonical_events = test_utils::create_program_events(ping_id, ping_code_id); - canonical_events.push(test_utils::canonical_event(ping_id, b"PING".into())); + let mut canonical_events = + test_utils::create_program_events(ping_id, ping_code_id); + canonical_events.push(test_utils::canonical_event(ping_id, b"PING".into())); - db.set_block_events(announce.block_hash, &canonical_events); - db.set_announce(announce) - }; + db.set_block_events(announce.block_hash, &canonical_events); + db.set_announce(announce) + }; - let (announce, announce_hash) = { - let mut announce = blockchain.block_top_announce(2).announce.clone(); - announce.gas_allowance = Some(400_000); - announce.parent = first_announce_hash; + let (announce, announce_hash) = { + let mut announce = blockchain.block_top_announce(2).announce.clone(); + announce.gas_allowance = Some(400_000); + announce.parent = first_announce_hash; + + let ref_block = announce.block_hash; + announce.injected_transactions = (0..tx_count) + .map(|_| test_utils::injected_tx(ping_id, b"PING".into(), ref_block)) + .collect::>(); + let hash = db.set_announce(announce.clone()); + (announce, hash) + }; - let ref_block = announce.block_hash; - let txs = (0..300) - .map(|_| test_utils::injected_tx(ping_id, b"PING".into(), ref_block)) - .collect::>(); - announce.injected_transactions = txs; - let hash = db.set_announce(announce.clone()); - (announce, hash) - }; + let mut compute_service = + ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); + compute_service.compute_announce(announce, PromisePolicy::Enabled); - let mut compute_service = - ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); - compute_service.compute_announce(announce, PromisePolicy::Enabled); + let mut announce_computed = false; + for _ in 0..=tx_count + 1 { + if next_compute_event(&mut compute_service).await + == ComputeEvent::AnnounceComputed(announce_hash) + { + announce_computed = true; + break; + } + } - loop { - let event = compute_service.next().await.unwrap().unwrap(); - if event == ComputeEvent::AnnounceComputed(announce_hash) { - break; - } + assert!(announce_computed); + }); } } - #[test] - fn collect_not_computed_predecessors_work_correctly() { - const BLOCKCHAIN_LEN: usize = 10; + proptest! { + #![proptest_config(proptest_config(128))] - let db = Database::memory(); - let blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db); + #[test] + fn collect_not_computed_predecessors_work_correctly( + chain in predecessor_test_inputs_strategy() + ) { + let db = Database::memory(); + let blockchain = chain.setup(&db); + let blockchain_len = blockchain.blocks.len() - 1; - // Setup announces except the start-announce to not-computed state. - (0..BLOCKCHAIN_LEN - 1).for_each(|idx| { - let announce_hash = blockchain.block_top_announce(idx).announce.to_hash(); + (0..blockchain_len - 1).for_each(|idx| { + let announce_hash = blockchain.block_top_announce(idx).announce.to_hash(); - if idx == 0 { - db.mutate_announce_meta(announce_hash, |meta| meta.computed = true); - } else { - db.mutate_announce_meta(announce_hash, |meta| meta.computed = false); - } - }); + if idx == 0 { + db.mutate_announce_meta(announce_hash, |meta| meta.computed = true); + } else { + db.mutate_announce_meta(announce_hash, |meta| meta.computed = false); + } + }); - let expected_not_computed_announces = (1..BLOCKCHAIN_LEN - 1) - .map(|idx| blockchain.block_top_announce(idx).announce.to_hash()) - .collect::>(); + let expected_not_computed_announces = (1..blockchain_len - 1) + .map(|idx| blockchain.block_top_announce(idx).announce.to_hash()) + .collect::>(); - let head_announce = blockchain - .block_top_announce(BLOCKCHAIN_LEN - 1) - .announce - .clone(); - let not_computed_announces = utils::collect_not_computed_predecessors(&head_announce, &db) - .unwrap() - .into_iter() - .map(|v| v.0) - .collect::>(); - - assert_eq!( - expected_not_computed_announces.len(), - not_computed_announces.len() - ); - assert_eq!(expected_not_computed_announces, not_computed_announces); + let head_announce = blockchain + .block_top_announce(blockchain_len - 1) + .announce + .clone(); + let not_computed_announces = + utils::collect_not_computed_predecessors(&head_announce, &db) + .unwrap() + .into_iter() + .map(|entry| entry.0) + .collect::>(); + + prop_assert_eq!(not_computed_announces, expected_not_computed_announces); + } } } diff --git a/ethexe/compute/src/prepare.rs b/ethexe/compute/src/prepare.rs index 24ae9634fee..a20a11c1249 100644 --- a/ethexe/compute/src/prepare.rs +++ b/ethexe/compute/src/prepare.rs @@ -369,198 +369,238 @@ fn prepare_one_block::random(); - - let block = chain.blocks[1].to_simple().next_block(); - let block = BlockData { - hash: block.hash, - header: block.header, - events: vec![ - BlockEvent::Router(RouterEvent::BatchCommitted(BatchCommittedEvent { - digest: batch_committed, - })), - BlockEvent::Router(RouterEvent::AnnouncesCommitted(AnnouncesCommittedEvent( - block1_announce_hash, - ))), - BlockEvent::Router(RouterEvent::CodeGotValidated(CodeGotValidatedEvent { - code_id: code1_id, - valid: true, - })), - BlockEvent::Router(RouterEvent::CodeValidationRequested( - CodeValidationRequestedEvent { - code_id: code2_id, - timestamp: 1000, - tx_hash: H256::random(), - }, - )), - ], - } - .setup(&db); - - prepare_one_block(&db, block.clone()).unwrap(); - - let meta = db.block_meta(block.hash); - assert!(meta.prepared); - assert_eq!(meta.codes_queue, Some(vec![code2_id].into()),); - assert_eq!(meta.last_committed_batch, Some(batch_committed),); - assert_eq!(meta.last_committed_announce, Some(block1_announce_hash)); + fn announce_hash_strategy() -> BoxedStrategy> { + any::<[u8; 32]>() + .prop_map(H256::from) + .prop_map(|hash| unsafe { HashOf::new(hash) }) + .boxed() } - #[tokio::test] - #[ntest::timeout(3000)] - async fn test_prepare_no_codes() { - gear_utils::init_default_logger(); + fn start_with_codes_strategy() -> BoxedStrategy<(BlockChain, Vec, Vec)> { + block_chain_strategy(1) + .prop_flat_map(|chain| { + collection::vec(any::(), 1..=16).prop_flat_map(move |code| { + let loaded_code_id = CodeId::generate(&code); + let chain = chain.clone(); + distinct_code_ids_sorted(3) + .prop_filter( + "extra code ids must differ from the preloaded parent code id", + move |ids| !ids.contains(&loaded_code_id), + ) + .prop_map(move |ids| (chain.clone(), ids, code.clone())) + }) + }) + .boxed() + } - let db = Database::memory(); - let mut service = PrepareSubService::new(db.clone()); - let chain = BlockChain::mock(1).setup(&db); - let block = chain.blocks[1].to_simple().next_block().setup(&db); + proptest! { + #![proptest_config(proptest_config(128))] + + #[test] + fn test_prepare_one_block( + chain in block_chain_strategy(1), + code_ids in distinct_code_ids_sorted(2), + batch_committed in any::<[u8; 32]>().prop_map(Digest), + block1_announce_hash in announce_hash_strategy(), + ) { + gear_utils::init_default_logger(); + + let db = Database::memory(); + let chain = chain.setup(&db); + let [code1_id, code2_id] = <[CodeId; 2]>::try_from(code_ids).unwrap(); + + let block = chain.blocks[1].to_simple().next_block(); + let block = BlockData { + hash: block.hash, + header: block.header, + events: vec![ + BlockEvent::Router(RouterEvent::BatchCommitted(BatchCommittedEvent { + digest: batch_committed, + })), + BlockEvent::Router(RouterEvent::AnnouncesCommitted(AnnouncesCommittedEvent( + block1_announce_hash, + ))), + BlockEvent::Router(RouterEvent::CodeGotValidated(CodeGotValidatedEvent { + code_id: code1_id, + valid: true, + })), + BlockEvent::Router(RouterEvent::CodeValidationRequested( + CodeValidationRequestedEvent { + code_id: code2_id, + timestamp: 1000, + tx_hash: H256::random(), + }, + )), + ], + } + .setup(&db); - service.receive_block_to_prepare(block.hash); + prepare_one_block(&db, block.clone()).unwrap(); - assert_eq!( - service.next().await.unwrap(), - Event::BlockPrepared(block.hash), - ); + let meta = db.block_meta(block.hash); + prop_assert!(meta.prepared); + prop_assert_eq!(meta.codes_queue, Some(vec![code2_id].into())); + prop_assert_eq!(meta.last_committed_batch, Some(batch_committed)); + prop_assert_eq!(meta.last_committed_announce, Some(block1_announce_hash)); + } } - #[tokio::test] - #[ntest::timeout(3000)] - async fn test_prepare_with_codes() { - gear_utils::init_default_logger(); + proptest! { + #![proptest_config(proptest_config(64))] - let db = Database::memory(); - let mut service = PrepareSubService::new(db.clone()); - let chain = BlockChain::mock(1).setup(&db); + #[test] + fn test_prepare_no_codes(chain in block_chain_strategy(1)) { + gear_utils::init_default_logger(); - let code1_id = CodeId::from([1u8; 32]); - let code2_id = CodeId::from([2u8; 32]); + run_async_test(async move { + let db = Database::memory(); + let mut service = PrepareSubService::new(db.clone()); + let chain = chain.setup(&db); + let block = chain.blocks[1].to_simple().next_block().setup(&db); - let block = chain.blocks[1].to_simple().next_block(); - let block = BlockData { - hash: block.hash, - header: block.header, - events: vec![ - BlockEvent::Router(RouterEvent::CodeGotValidated(CodeGotValidatedEvent { - code_id: code1_id, - valid: true, - })), - BlockEvent::Router(RouterEvent::CodeValidationRequested( - CodeValidationRequestedEvent { - code_id: code2_id, - timestamp: 1000, - tx_hash: H256::random(), - }, - )), - ], - } - .setup(&db); + service.receive_block_to_prepare(block.hash); - service.receive_block_to_prepare(block.hash); - assert_eq!( - service.next().await.unwrap(), - Event::RequestCodes([code1_id, code2_id].into()) - ); + assert_eq!( + next_subservice_event(&mut service).await, + Event::BlockPrepared(block.hash), + ); + }); + } - service.receive_processed_code(code1_id); - assert_eq!( - service.next().await.unwrap(), - Event::BlockPrepared(block.hash), - ); - } + #[test] + fn test_prepare_with_codes(chain in block_chain_strategy(1), code_ids in distinct_code_ids_sorted(2)) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let db = Database::memory(); + let mut service = PrepareSubService::new(db.clone()); + let chain = chain.setup(&db); + let [code1_id, code2_id] = <[CodeId; 2]>::try_from(code_ids).unwrap(); + + let block = chain.blocks[1].to_simple().next_block(); + let block = BlockData { + hash: block.hash, + header: block.header, + events: vec![ + BlockEvent::Router(RouterEvent::CodeGotValidated(CodeGotValidatedEvent { + code_id: code1_id, + valid: true, + })), + BlockEvent::Router(RouterEvent::CodeValidationRequested( + CodeValidationRequestedEvent { + code_id: code2_id, + timestamp: 1000, + tx_hash: H256::random(), + }, + )), + ], + } + .setup(&db); - #[tokio::test] - #[ntest::timeout(3000)] - async fn test_sub_service_start_with_codes() { - gear_utils::init_default_logger(); - - let db = Database::memory(); - let mut service = PrepareSubService::new(db.clone()); - - let validated_code_id = CodeId::from([1u8; 32]); - let requested_code_id = CodeId::from([2u8; 32]); - let parent_block_code_id = CodeId::from([3u8; 32]); - - let code = b"1234"; - let parent_block_loaded_code_id = CodeId::generate(code); - - let chain = BlockChain::mock(1) - .tap_mut(|chain| { - chain.blocks[1].as_prepared_mut().codes_queue = - [parent_block_code_id, parent_block_loaded_code_id].into(); - chain.codes.insert( - parent_block_loaded_code_id, - CodeData { - original_bytes: code.to_vec(), - blob_info: Default::default(), - instrumented: None, - }, + service.receive_block_to_prepare(block.hash); + assert_eq!( + next_subservice_event(&mut service).await, + Event::RequestCodes([code1_id, code2_id].into()) ); - }) - .setup(&db); - let block2 = chain.blocks[1].to_simple().next_block(); - let block3 = block2.next_block(); - - BlockData { - hash: block2.hash, - header: block2.header, - events: vec![BlockEvent::Router(RouterEvent::CodeGotValidated( - CodeGotValidatedEvent { - code_id: validated_code_id, - valid: true, - }, - ))], - } - .setup(&db); - - BlockData { - hash: block3.hash, - header: block3.header, - events: vec![BlockEvent::Router(RouterEvent::CodeValidationRequested( - CodeValidationRequestedEvent { - code_id: requested_code_id, - timestamp: 1000, - tx_hash: H256::random(), - }, - ))], + service.receive_processed_code(code1_id); + assert_eq!( + next_subservice_event(&mut service).await, + Event::BlockPrepared(block.hash), + ); + }); } - .setup(&db); - - service.receive_block_to_prepare(block3.hash); - assert_eq!( - service.next().await.unwrap(), - Event::RequestCodes( - [ - parent_block_code_id, - parent_block_loaded_code_id, - validated_code_id, - requested_code_id - ] - .into() - ) - ); - service.receive_processed_code(validated_code_id); - assert_eq!( - service.next().await.unwrap(), - Event::BlockPrepared(block3.hash), - ); + #[test] + fn test_sub_service_start_with_codes( + (chain, code_ids, code) in start_with_codes_strategy() + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let db = Database::memory(); + let mut service = PrepareSubService::new(db.clone()); + let [validated_code_id, requested_code_id, parent_block_code_id] = + <[CodeId; 3]>::try_from(code_ids).unwrap(); + let parent_block_loaded_code_id = CodeId::generate(&code); + + let chain = chain + .tap_mut(|chain| { + chain.blocks[1].as_prepared_mut().codes_queue = + [parent_block_code_id, parent_block_loaded_code_id].into(); + chain.codes.insert( + parent_block_loaded_code_id, + CodeData { + original_bytes: code.clone(), + blob_info: Default::default(), + instrumented: None, + }, + ); + }) + .setup(&db); + + let block2 = chain.blocks[1].to_simple().next_block(); + let block3 = block2.next_block(); + + BlockData { + hash: block2.hash, + header: block2.header, + events: vec![BlockEvent::Router(RouterEvent::CodeGotValidated( + CodeGotValidatedEvent { + code_id: validated_code_id, + valid: true, + }, + ))], + } + .setup(&db); + + BlockData { + hash: block3.hash, + header: block3.header, + events: vec![BlockEvent::Router(RouterEvent::CodeValidationRequested( + CodeValidationRequestedEvent { + code_id: requested_code_id, + timestamp: 1000, + tx_hash: H256::random(), + }, + ))], + } + .setup(&db); + + service.receive_block_to_prepare(block3.hash); + assert_eq!( + next_subservice_event(&mut service).await, + Event::RequestCodes( + [ + parent_block_code_id, + parent_block_loaded_code_id, + validated_code_id, + requested_code_id + ] + .into() + ) + ); + + service.receive_processed_code(validated_code_id); + assert_eq!( + next_subservice_event(&mut service).await, + Event::BlockPrepared(block3.hash), + ); + }); + } } } diff --git a/ethexe/compute/src/service.rs b/ethexe/compute/src/service.rs index 5b96f0256a0..b58bc7be7f8 100644 --- a/ethexe/compute/src/service.rs +++ b/ethexe/compute/src/service.rs @@ -136,116 +136,102 @@ pub(crate) trait SubService: Unpin + Send + 'static { #[cfg(test)] mod tests { - use super::*; - use ethexe_common::{CodeAndIdUnchecked, db::*, mock::*}; + use crate::tests::{ + MockProcessor, block_chain_strategy, next_compute_event, proptest_config, run_async_test, + }; + use ethexe_common::{ + CodeAndIdUnchecked, + db::{AnnounceStorageRO, BlockMetaStorageRO, CodesStorageRO}, + mock::Tap, + }; use ethexe_db::Database as DB; - use futures::StreamExt; use gear_core::ids::prelude::CodeIdExt; use gprimitives::CodeId; + use proptest::{collection, prelude::*}; - /// Test ComputeService block preparation functionality - #[tokio::test] - async fn prepare_block() { - gear_utils::init_default_logger(); - - let db = DB::memory(); - let mut service = ComputeService::new_mock_processor(db.clone()); - - let chain = BlockChain::mock(1).setup(&db); - let block = chain.blocks[1].to_simple().next_block().setup(&db); - - // Request block preparation - service.prepare_block(block.hash); - - // Poll service to process the preparation request - let event = service.next().await.unwrap().unwrap(); - assert_eq!(event, ComputeEvent::BlockPrepared(block.hash)); - - // Verify block is marked as prepared in DB - assert!(db.block_meta(block.hash).prepared); - } + proptest! { + #![proptest_config(proptest_config(64))] - /// Test ComputeService block processing functionality - #[tokio::test] - async fn compute_announce() { - gear_utils::init_default_logger(); + #[test] + fn prepare_block(chain in block_chain_strategy(1)) { + gear_utils::init_default_logger(); - let db = DB::memory(); - let mut service = ComputeService::new_mock_processor(db.clone()); + run_async_test(async move { + let db = DB::memory(); + let mut service = ComputeService::new_mock_processor(db.clone()); - let chain = BlockChain::mock(1).setup(&db); + let chain = chain.setup(&db); + let block = chain.blocks[1].to_simple().next_block().setup(&db); - let block = chain.blocks[1].to_simple().next_block().setup(&db); - - service.prepare_block(block.hash); - let event = service.next().await.unwrap().unwrap(); - assert_eq!(event, ComputeEvent::BlockPrepared(block.hash)); - - // Request computation - let announce = Announce { - block_hash: block.hash, - parent: chain.block_top_announce_hash(1), - gas_allowance: Some(42), - injected_transactions: vec![], - }; - let announce_hash = announce.to_hash(); - service.compute_announce(announce, PromisePolicy::Disabled); - - // Poll service to process the block - let event = service.next().await.unwrap().unwrap(); - assert_eq!(event, ComputeEvent::AnnounceComputed(announce_hash)); - - // Verify block is marked as computed in DB - assert!(db.announce_meta(announce_hash).computed); - } + service.prepare_block(block.hash); - /// Test ComputeService code processing functionality - #[tokio::test] - async fn process_code() { - gear_utils::init_default_logger(); - - let code = vec![0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00]; // Simple WASM header - let code_id = CodeId::generate(&code); - - let db = DB::memory(); - let processor = MockProcessor::with_default_valid_code() - .tap_mut(|p| p.process_codes_result.as_mut().unwrap().code_id = code_id); - let mut service = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); - - // Create test code - - let code_and_id = CodeAndIdUnchecked { code, code_id }; - - // Verify code is not yet in DB - assert!(db.code_valid(code_id).is_none()); - - // Request code processing - service.process_code(code_and_id); - - // Poll service to process the code - let event = service.next().await.unwrap().unwrap(); - - // Should receive CodeProcessed event with correct code_id - match event { - ComputeEvent::CodeProcessed(processed_code_id) => { - assert_eq!(processed_code_id, code_id); - } - _ => panic!("Expected CodeProcessed event"), + let event = next_compute_event(&mut service).await; + assert_eq!(event, ComputeEvent::BlockPrepared(block.hash)); + assert!(db.block_meta(block.hash).prepared); + }); } - // Verify that the processor was called for non-validated code - assert_eq!( - processor.process_code_call_count(), - 1, - "Processor should be called for non-validated code" - ); + #[test] + fn compute_announce(chain in block_chain_strategy(1), gas_allowance in 1u64..=1_000_000) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let db = DB::memory(); + let mut service = ComputeService::new_mock_processor(db.clone()); + + let chain = chain.setup(&db); + let block = chain.blocks[1].to_simple().next_block().setup(&db); + + service.prepare_block(block.hash); + assert_eq!( + next_compute_event(&mut service).await, + ComputeEvent::BlockPrepared(block.hash) + ); + + let announce = Announce { + block_hash: block.hash, + parent: chain.block_top_announce_hash(1), + gas_allowance: Some(gas_allowance), + injected_transactions: vec![], + }; + let announce_hash = announce.to_hash(); + service.compute_announce(announce, PromisePolicy::Disabled); + + assert_eq!( + next_compute_event(&mut service).await, + ComputeEvent::AnnounceComputed(announce_hash) + ); + assert!(db.announce_meta(announce_hash).computed); + }); + } - // Verify code is now marked as valid in DB - assert_eq!(db.code_valid(code_id), Some(true)); + #[test] + fn process_code(code in collection::vec(any::(), 1..=64)) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let code_id = CodeId::generate(&code); + let db = DB::memory(); + let processor = MockProcessor::with_default_valid_code() + .tap_mut(|p| p.process_codes_result.as_mut().unwrap().code_id = code_id); + let mut service = ComputeService::new( + ComputeConfig::without_quarantine(), + db.clone(), + processor.clone(), + ); + + assert!(db.code_valid(code_id).is_none()); + + service.process_code(CodeAndIdUnchecked { code, code_id }); + + assert_eq!( + next_compute_event(&mut service).await, + ComputeEvent::CodeProcessed(code_id) + ); + assert_eq!(processor.process_code_call_count(), 1); + assert_eq!(db.code_valid(code_id), Some(true)); + }); + } } } diff --git a/ethexe/compute/src/tests.rs b/ethexe/compute/src/tests.rs index 28a6a10e310..584e5999253 100644 --- a/ethexe/compute/src/tests.rs +++ b/ethexe/compute/src/tests.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use super::*; +use crate::service::SubService; use ethexe_common::{ CodeBlobInfo, PromisePolicy, db::*, @@ -24,17 +25,77 @@ use ethexe_common::{ BlockEvent, RouterEvent, router::{CodeGotValidatedEvent, CodeValidationRequestedEvent}, }, - mock::*, + mock::{BlockChain, BlockChainParams, CodeData, DBMockExt}, }; use ethexe_db::Database; use ethexe_processor::ValidCodeInfo; -use futures::StreamExt; +use futures::{Future, StreamExt}; use gear_core::{ code::{CodeMetadata, InstantiatedSectionSizes, InstrumentedCode}, ids::prelude::CodeIdExt, }; +use gprimitives::{CodeId, H256}; +use proptest::{collection, prelude::*}; use std::time::Duration; -use tokio::{sync::mpsc, time::timeout}; +use tokio::{runtime::Builder, sync::mpsc, time::timeout}; + +thread_local! { + // Reuse one current-thread runtime per test thread to avoid rebuilding it for every proptest case. + static TEST_RUNTIME: tokio::runtime::Runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime"); +} + +pub(crate) const ASYNC_EVENT_TIMEOUT: Duration = Duration::from_secs(3); +const NO_EVENT_TIMEOUT: Duration = Duration::from_millis(500); +const PROPTEST_TIMEOUT_MS: u32 = 60_000; + +pub(crate) fn block_chain_strategy(len: u32) -> BoxedStrategy { + any_with::(BlockChainParams::from(len)).boxed() +} + +pub(crate) fn distinct_code_ids_sorted(count: usize) -> BoxedStrategy> { + collection::btree_set(any::<[u8; 32]>().prop_map(CodeId::from), count) + .prop_map(|ids| ids.into_iter().collect()) + .boxed() +} + +pub(crate) fn run_async_test(future: F) -> F::Output { + TEST_RUNTIME.with(|runtime| runtime.block_on(future)) +} + +pub(crate) async fn next_compute_event( + compute: &mut ComputeService

, +) -> ComputeEvent { + timeout(ASYNC_EVENT_TIMEOUT, compute.next()) + .await + .expect("timed out waiting for compute event") + .expect("compute stream ended") + .expect("compute service returned error") +} + +pub(crate) async fn next_subservice_event(service: &mut S) -> S::Output { + timeout(ASYNC_EVENT_TIMEOUT, service.next()) + .await + .expect("timed out waiting for sub-service event") + .expect("sub-service returned error") +} + +pub(crate) async fn assert_no_compute_event(compute: &mut ComputeService

) { + assert!( + timeout(NO_EVENT_TIMEOUT, compute.next()).await.is_err(), + "unexpected follow-up compute event" + ); +} + +pub(crate) fn proptest_config(cases: u32) -> ProptestConfig { + ProptestConfig { + cases, + timeout: PROPTEST_TIMEOUT_MS, + ..ProptestConfig::default() + } +} // MockProcessor that implements ProcessorExt and always returns Ok with empty results #[derive(Clone, Default)] @@ -144,7 +205,7 @@ fn insert_code_events(chain: &mut BlockChain, events_in_block: u32) { } } -fn mark_as_not_prepared(chain: &mut BlockChain) { +fn reset_to_unprepared(chain: &mut BlockChain) { // skip genesis for block in chain.blocks.iter_mut().skip(1) { block.prepared = None; @@ -164,13 +225,11 @@ struct TestEnv { } impl TestEnv { - // Setup the chain and compute service. - fn new(chain_len: u32, events_in_block: u32) -> TestEnv { + fn new(mut chain: BlockChain, events_in_block: u32) -> TestEnv { let db = Database::memory(); - let mut chain = BlockChain::mock(chain_len); insert_code_events(&mut chain, events_in_block); - mark_as_not_prepared(&mut chain); + reset_to_unprepared(&mut chain); chain = chain.setup(&db); let compute = ComputeService::new_with_defaults(db.clone()); @@ -181,45 +240,36 @@ impl TestEnv { async fn prepare_and_assert_block(&mut self, block: H256) { self.compute.prepare_block(block); - let event = self - .compute - .next() - .await - .unwrap() - .expect("expect compute service request codes to load"); - let codes_to_load = event.unwrap_request_load_codes(); - - for code_id in codes_to_load { - let Some(CodeData { - original_bytes: code, - .. - }) = self.chain.codes.remove(&code_id) - else { - continue; - }; - - self.compute - .process_code(CodeAndIdUnchecked { code, code_id }); - - let event = self - .compute - .next() - .await - .unwrap() - .expect("expect code will be processing"); - let processed_code_id = event.unwrap_code_processed(); - - assert_eq!(processed_code_id, code_id); + match next_compute_event(&mut self.compute).await { + ComputeEvent::RequestLoadCodes(codes_to_load) => { + for code_id in codes_to_load { + let Some(CodeData { + original_bytes: code, + .. + }) = self.chain.codes.remove(&code_id) + else { + continue; + }; + + self.compute + .process_code(CodeAndIdUnchecked { code, code_id }); + + let processed_code_id = next_compute_event(&mut self.compute) + .await + .unwrap_code_processed(); + assert_eq!(processed_code_id, code_id); + } + + let prepared_block = next_compute_event(&mut self.compute) + .await + .unwrap_block_prepared(); + assert_eq!(prepared_block, block); + } + ComputeEvent::BlockPrepared(prepared_block) => { + assert_eq!(prepared_block, block); + } + event => panic!("unexpected compute event while preparing block: {event:?}"), } - - let event = self - .compute - .next() - .await - .unwrap() - .expect("expect block prepared after processing all codes"); - let prepared_block = event.unwrap_block_prepared(); - assert_eq!(prepared_block, block); } async fn compute_and_assert_announce(&mut self, announce: Announce) { @@ -227,14 +277,9 @@ impl TestEnv { self.compute .compute_announce(announce.clone(), PromisePolicy::Disabled); - let event = self - .compute - .next() + let computed_announce = next_compute_event(&mut self.compute) .await - .unwrap() - .expect("expect block will be processing"); - - let computed_announce = event.unwrap_announce_computed(); + .unwrap_announce_computed(); assert_eq!(computed_announce, announce_hash); self.db.mutate_block_meta(announce.block_hash, |meta| { @@ -255,250 +300,253 @@ fn new_announce(db: &Database, block_hash: H256, gas_allowance: Option) -> } } -#[tokio::test] -async fn block_computation_basic() -> Result<()> { - gear_utils::init_default_logger(); - - let mut env = TestEnv::new(1, 3); - - for block in env.chain.blocks.clone().iter().skip(1) { - env.prepare_and_assert_block(block.hash).await; - - let announce = new_announce(&env.db, block.hash, Some(100)); - env.compute_and_assert_announce(announce).await; - } - - Ok(()) +fn chain_with_event_count_strategy() -> BoxedStrategy<(BlockChain, u32)> { + (1u32..=6, 0u32..=4) + .prop_flat_map(|(chain_len, events_in_block)| { + block_chain_strategy(chain_len).prop_map(move |chain| (chain, events_in_block)) + }) + .boxed() } -#[tokio::test] -async fn multiple_preparation_and_one_processing() -> Result<()> { - gear_utils::init_default_logger(); - - let mut env = TestEnv::new(3, 3); +fn single_block_chain_with_event_count_strategy() -> BoxedStrategy<(BlockChain, u32)> { + (0u32..=4) + .prop_flat_map(|events_in_block| { + block_chain_strategy(1).prop_map(move |chain| (chain, events_in_block)) + }) + .boxed() +} - for block in env.chain.blocks.clone().iter().skip(1) { - env.prepare_and_assert_block(block.hash).await; +proptest! { + #![proptest_config(proptest_config(64))] + + #[test] + fn block_computation_basic((chain, events_in_block) in chain_with_event_count_strategy()) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let mut env = TestEnv::new(chain, events_in_block); + let block_hashes = env + .chain + .blocks + .iter() + .skip(1) + .map(|block| block.hash) + .collect::>(); + + for block_hash in block_hashes { + env.prepare_and_assert_block(block_hash).await; + + let announce = new_announce(&env.db, block_hash, Some(100)); + env.compute_and_assert_announce(announce).await; + } + }); } - // append announces to prepared blocks, except the last one, so that it can be computed - for i in 1..3 { - let announce = new_announce(&env.db, env.chain.blocks[i].hash, Some(100)); - env.db.mutate_block_meta(announce.block_hash, |meta| { - meta.announces - .get_or_insert_default() - .insert(announce.to_hash()); + #[test] + fn multiple_preparation_and_one_processing( + (chain, events_in_block) in chain_with_event_count_strategy() + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let mut env = TestEnv::new(chain, events_in_block); + let block_hashes = env + .chain + .blocks + .iter() + .skip(1) + .map(|block| block.hash) + .collect::>(); + + for block_hash in block_hashes { + env.prepare_and_assert_block(block_hash).await; + } + + let last_index = env.chain.blocks.len() - 1; + for i in 1..last_index { + let announce = new_announce(&env.db, env.chain.blocks[i].hash, Some(100)); + env.db.mutate_block_meta(announce.block_hash, |meta| { + meta.announces + .get_or_insert_default() + .insert(announce.to_hash()); + }); + env.db.set_announce(announce); + } + + let announce = new_announce(&env.db, env.chain.blocks[last_index].hash, Some(100)); + env.compute_and_assert_announce(announce).await; }); - env.db.set_announce(announce); } - let announce = new_announce(&env.db, env.chain.blocks[3].hash, Some(100)); - env.compute_and_assert_announce(announce).await; - - Ok(()) -} - -#[tokio::test] -async fn one_preparation_and_multiple_processing() -> Result<()> { - gear_utils::init_default_logger(); - - let mut env = TestEnv::new(3, 3); - - env.prepare_and_assert_block(env.chain.blocks[3].hash).await; - - for block in env.chain.blocks.clone().iter().skip(1) { - let announce = new_announce(&env.db, block.hash, Some(100)); - env.compute_and_assert_announce(announce).await; + #[test] + fn one_preparation_and_multiple_processing( + (chain, events_in_block) in chain_with_event_count_strategy() + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let mut env = TestEnv::new(chain, events_in_block); + let last_block_hash = env.chain.blocks.back().unwrap().hash; + env.prepare_and_assert_block(last_block_hash).await; + + let block_hashes = env + .chain + .blocks + .iter() + .skip(1) + .map(|block| block.hash) + .collect::>(); + + for block_hash in block_hashes { + let announce = new_announce(&env.db, block_hash, Some(100)); + env.compute_and_assert_announce(announce).await; + } + }); } - Ok(()) -} - -#[tokio::test] -async fn code_validation_request_does_not_block_preparation() -> Result<()> { - gear_utils::init_default_logger(); - - let mut env = TestEnv::new(1, 3); - - let mut block_events = env.chain.blocks[1].as_synced().events.clone(); - - // add invalid event which shouldn't stop block prepare - block_events.push(BlockEvent::Router(RouterEvent::CodeValidationRequested( - CodeValidationRequestedEvent { - code_id: CodeId::zero(), - timestamp: 0u64, - tx_hash: H256::random(), - }, - ))); - env.db - .set_block_events(env.chain.blocks[1].hash, &block_events); - env.prepare_and_assert_block(env.chain.blocks[1].hash).await; - - let announce = new_announce(&env.db, env.chain.blocks[1].hash, Some(100)); - env.compute_and_assert_announce(announce.clone()).await; - env.compute_and_assert_announce(announce.clone()).await; - - Ok(()) -} - -#[tokio::test] -async fn code_validation_request_for_already_processed_code_does_not_request_loading() -> Result<()> -{ - gear_utils::init_default_logger(); - - let db = Database::memory(); - let processor = MockProcessor::default(); - let mut compute = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); - - let code = create_new_code(1); - let code_id = db.set_original_code(&code); - db.set_code_valid(code_id, true); - - // Setup chain and mark blocks as not prepared - let mut chain = BlockChain::mock(1); - mark_as_not_prepared(&mut chain); - let chain = chain.setup(&db); - let block_hash = chain.blocks[1].hash; - - // Add CodeValidationRequested event for the already-validated code - let events = db.block_events(block_hash).unwrap_or_default(); - let mut new_events = events.clone(); - new_events.push(BlockEvent::Router(RouterEvent::CodeValidationRequested( - CodeValidationRequestedEvent { - code_id, - timestamp: 0u64, - tx_hash: H256::random(), - }, - ))); - db.set_block_events(block_hash, &new_events); - - compute.prepare_block(block_hash); - - // The first event should be BlockPrepared, NOT RequestCodes - // because the code is already validated - let event = compute - .next() - .await - .unwrap() - .expect("expect compute service to produce an event"); - - // Verify block was prepared without requesting code loading - let prepared_block = event.unwrap_block_prepared(); - assert_eq!(prepared_block, block_hash); - - // Verify that no follow-up events are produced (no RequestCodes) - let no_follow_up_event = timeout(Duration::from_millis(100), compute.next()).await; - assert!( - no_follow_up_event.is_err(), - "unexpected follow-up compute event after block preparation: {no_follow_up_event:?}" - ); - - // Verify that the processor was NOT called - assert_eq!( - processor.process_code_call_count(), - 0, - "Processor should not be called for already-validated code" - ); - - Ok(()) -} - -#[tokio::test] -async fn code_validation_request_for_non_validated_code_requests_loading() -> Result<()> { - gear_utils::init_default_logger(); - - let db = Database::memory(); - let processor = MockProcessor::default(); - let mut compute = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); + #[test] + fn code_validation_request_does_not_block_preparation( + (chain, events_in_block) in single_block_chain_with_event_count_strategy() + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let mut env = TestEnv::new(chain, events_in_block); + let block_hash = env.chain.blocks[1].hash; + let mut block_events = env.chain.blocks[1].as_synced().events.clone(); + + block_events.push(BlockEvent::Router(RouterEvent::CodeValidationRequested( + CodeValidationRequestedEvent { + code_id: CodeId::zero(), + timestamp: 0u64, + tx_hash: H256::random(), + }, + ))); + + env.db.set_block_events(block_hash, &block_events); + env.prepare_and_assert_block(block_hash).await; + + let announce = new_announce(&env.db, block_hash, Some(100)); + env.compute_and_assert_announce(announce.clone()).await; + env.compute_and_assert_announce(announce).await; + }); + } - let code = create_new_code(1); - let code_id = db.set_original_code(&code); - // Note: code is NOT marked as valid (db.code_valid(code_id) is None) - - // Setup chain and mark blocks as not prepared - let mut chain = BlockChain::mock(1); - mark_as_not_prepared(&mut chain); - let chain = chain.setup(&db); - let block_hash = chain.blocks[1].hash; - - // Add CodeValidationRequested event for the non-validated code - let events = db.block_events(block_hash).unwrap_or_default(); - let mut new_events = events.clone(); - new_events.push(BlockEvent::Router(RouterEvent::CodeValidationRequested( - CodeValidationRequestedEvent { - code_id, - timestamp: 0u64, - tx_hash: H256::random(), - }, - ))); - db.set_block_events(block_hash, &new_events); - - compute.prepare_block(block_hash); - - // The first event should be RequestCodes because the code is NOT validated - let event = compute - .next() - .await - .unwrap() - .expect("expect compute service to produce an event"); + #[test] + fn code_validation_request_for_already_processed_code_does_not_request_loading( + chain in block_chain_strategy(1) + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let db = Database::memory(); + let processor = MockProcessor::default(); + let mut compute = ComputeService::new( + ComputeConfig::without_quarantine(), + db.clone(), + processor.clone(), + ); + + let code = create_new_code(1); + let code_id = db.set_original_code(&code); + db.set_code_valid(code_id, true); + + let mut chain = chain; + reset_to_unprepared(&mut chain); + let chain = chain.setup(&db); + let block_hash = chain.blocks[1].hash; + + let mut new_events = db.block_events(block_hash).unwrap_or_default(); + new_events.push(BlockEvent::Router(RouterEvent::CodeValidationRequested( + CodeValidationRequestedEvent { + code_id, + timestamp: 0u64, + tx_hash: H256::random(), + }, + ))); + db.set_block_events(block_hash, &new_events); + + compute.prepare_block(block_hash); + + let prepared_block = next_compute_event(&mut compute).await.unwrap_block_prepared(); + assert_eq!(prepared_block, block_hash); + assert_no_compute_event(&mut compute).await; + assert_eq!(processor.process_code_call_count(), 0); + }); + } - // Verify that RequestCodes is emitted for non-validated code - let codes_to_load = event.unwrap_request_load_codes(); - assert!( - codes_to_load.contains(&code_id), - "CodeId should be requested for loading when not validated" - ); + #[test] + fn code_validation_request_for_non_validated_code_requests_loading( + chain in block_chain_strategy(1) + ) { + gear_utils::init_default_logger(); + + run_async_test(async move { + let db = Database::memory(); + let processor = MockProcessor::default(); + let mut compute = ComputeService::new( + ComputeConfig::without_quarantine(), + db.clone(), + processor.clone(), + ); + + let code = create_new_code(1); + let code_id = db.set_original_code(&code); + + let mut chain = chain; + reset_to_unprepared(&mut chain); + let chain = chain.setup(&db); + let block_hash = chain.blocks[1].hash; + + let mut new_events = db.block_events(block_hash).unwrap_or_default(); + new_events.push(BlockEvent::Router(RouterEvent::CodeValidationRequested( + CodeValidationRequestedEvent { + code_id, + timestamp: 0u64, + tx_hash: H256::random(), + }, + ))); + db.set_block_events(block_hash, &new_events); - Ok(()) -} + compute.prepare_block(block_hash); -#[tokio::test] -async fn process_code_for_already_processed_valid_code_emits_code_processed() -> Result<()> { - gear_utils::init_default_logger(); + let codes_to_load = next_compute_event(&mut compute) + .await + .unwrap_request_load_codes(); + assert!(codes_to_load.contains(&code_id)); + }); + } - let db = Database::memory(); - let processor = MockProcessor::default(); - let mut compute = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); + #[test] + fn process_code_for_already_processed_valid_code_emits_code_processed(nonce in any::()) { + gear_utils::init_default_logger(); - let code = create_new_code(2); - let code_id = db.set_original_code(&code); + run_async_test(async move { + let db = Database::memory(); + let processor = MockProcessor::default(); + let mut compute = ComputeService::new( + ComputeConfig::without_quarantine(), + db.clone(), + processor.clone(), + ); - db.set_instrumented_code( - ethexe_runtime_common::VERSION, - code_id, - InstrumentedCode::new(vec![0], InstantiatedSectionSizes::new(0, 0, 0, 0, 0, 0)), - ); - db.set_code_valid(code_id, true); + let code = create_new_code(nonce); + let code_id = db.set_original_code(&code); - compute.process_code(CodeAndIdUnchecked { code_id, code }); + db.set_instrumented_code( + ethexe_runtime_common::VERSION, + code_id, + InstrumentedCode::new(vec![0], InstantiatedSectionSizes::new(0, 0, 0, 0, 0, 0)), + ); + db.set_code_valid(code_id, true); - let event = compute - .next() - .await - .unwrap() - .expect("expect already processed code to produce CodeProcessed event"); - let processed_code_id = event.unwrap_code_processed(); - assert_eq!(processed_code_id, code_id); - - // Verify that the processor was NOT called for already-validated code - // The CodesSubService should short-circuit and emit CodeProcessed without calling the processor - assert_eq!( - processor.process_code_call_count(), - 0, - "Processor should not be called for already-validated code" - ); + compute.process_code(CodeAndIdUnchecked { code_id, code }); - Ok(()) + let processed_code_id = next_compute_event(&mut compute) + .await + .unwrap_code_processed(); + assert_eq!(processed_code_id, code_id); + assert_eq!(processor.process_code_call_count(), 0); + }); + } }