diff --git a/src/database.rs b/src/database.rs index a8347ad2..f5a11bf0 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,5 +1,6 @@ use crate::{ context::TransactionContext, + executor::threadpool, meta::{MetadataManager, OpenMetadataError}, metrics::DatabaseMetrics, page::{PageError, PageId, PageManager}, @@ -8,9 +9,11 @@ use crate::{ }; use alloy_primitives::B256; use parking_lot::Mutex; +use rayon::ThreadPoolBuildError; use std::{ fs::File, io, + num::NonZero, ops::Deref, path::{Path, PathBuf}, }; @@ -30,6 +33,7 @@ pub struct DatabaseOptions { wipe: bool, meta_path: Option, max_pages: u32, + num_threads: Option>, } #[derive(Debug)] @@ -42,6 +46,7 @@ pub enum Error { pub enum OpenError { PageError(PageError), MetadataError(OpenMetadataError), + ThreadPoolError(ThreadPoolBuildError), IO(io::Error), } @@ -86,6 +91,16 @@ impl DatabaseOptions { self } + /// Sets the maximum number of threads used to CPU-intensive computations (like hashing). + /// + /// By default, the number of threads is selected automatically based on the number of + /// available CPUs on the system. The algorithm for deciding the default number is not + /// specified and may change in the future. + pub fn num_threads(&mut self, num_threads: NonZero) -> &mut Self { + self.num_threads = Some(num_threads); + self + } + /// Opens the database file at the given path. pub fn open(&self, db_path: impl AsRef) -> Result { let db_path = db_path.as_ref(); @@ -144,7 +159,12 @@ impl Database { .open(db_path) .map_err(OpenError::PageError)?; - Ok(Self::new(StorageEngine::new(page_manager, meta_manager))) + let thread_pool = threadpool::builder() + .num_threads(opts.num_threads.map(NonZero::get).unwrap_or(0)) + .build() + .map_err(OpenError::ThreadPoolError)?; + + Ok(Self::new(StorageEngine::new(page_manager, meta_manager, thread_pool))) } pub fn new(storage_engine: StorageEngine) -> Self { diff --git a/src/executor/futures.rs b/src/executor/futures.rs index fd875182..39b36bc0 100644 --- a/src/executor/futures.rs +++ b/src/executor/futures.rs @@ -105,11 +105,11 @@ impl Clone for Future { impl fmt::Debug for Future { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - struct Pending; + struct Pending(*const ()); impl fmt::Debug for Pending { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("") + write!(f, "", self.0) } } @@ -121,9 +121,12 @@ impl fmt::Debug for Future { } } + let p = Arc::as_ptr(&self.cell) as *const (); + let p = Pending(p); + f.debug_tuple("Future") .field(match self.try_get() { - None => &Pending, + None => &p, Some(Ok(value)) => value, Some(Err(PoisonError)) => &Poisoned, }) diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 1ebd661c..cfec5ae0 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -24,6 +24,7 @@ mod futures; mod inline; +mod never; mod traits; pub mod threadpool; @@ -31,3 +32,6 @@ pub mod threadpool; pub use futures::{Future, PoisonError}; pub use inline::Inline; pub use traits::{Executor, Wait}; + +#[cfg(test)] +pub(crate) use never::Never; diff --git a/src/executor/never.rs b/src/executor/never.rs new file mode 100644 index 00000000..80849c04 --- /dev/null +++ b/src/executor/never.rs @@ -0,0 +1,30 @@ +#![cfg(test)] + +use crate::executor::{Executor, Future}; + +/// A dummy executor that never executes any function. +#[derive(Copy, Clone, Debug)] +pub(crate) struct Never; + +impl Executor for Never { + #[inline] + fn defer(&self, _: F) -> Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + Sync + 'static, + { + Future::pending() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn defer() { + let never = Never; + let future = never.defer(|| 123); + assert_eq!(future.get(), None); + } +} diff --git a/src/lib.rs b/src/lib.rs index 022db229..9e31495d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ pub mod node; pub mod page; pub mod path; pub mod pointer; +pub mod rlp; pub mod snapshot; pub mod storage; pub mod transaction; diff --git a/src/node.rs b/src/node.rs index 86e7d52d..ad5d87c6 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,6 +1,8 @@ use crate::{ account::Account, + executor::{Executor, Wait}, pointer::Pointer, + rlp::DeferredRlpNode, storage::value::{self, Value}, }; use alloy_primitives::{hex, StorageValue, B256, U256}; @@ -282,7 +284,12 @@ impl Node { /// /// This will typically be a 33 byte prefixed keccak256 hash. pub fn to_rlp_node(&self) -> RlpNode { - RlpNode::from_rlp(&self.rlp_encode()) + RlpNode::from_rlp(&self.wait().rlp_encode()) + } + + pub fn to_deferred_rlp_node(&self, executor: E) -> DeferredRlpNode { + let this = self.clone(); + DeferredRlpNode::from_rlp_with(executor, move || this.wait().rlp_encode()) } /// Returns the RLP encoding of the [Node]. @@ -319,6 +326,29 @@ impl Node { } } +impl Wait for Node { + type Output = Self; + + fn wait(&self) -> &Self::Output { + match self.kind() { + NodeKind::AccountLeaf { ref storage_root, .. } => { + if let Some(storage_root) = storage_root { + storage_root.wait(); + } + } + NodeKind::StorageLeaf { .. } => {} + NodeKind::Branch { ref children } => { + children.iter().for_each(|child| { + if let Some(child) = child { + child.wait(); + } + }); + } + } + self + } +} + /// This is the maximum possible RLP-encoded length of a node. /// /// This value is derived from the maximum possible length of a branch node, which is the largest @@ -648,7 +678,7 @@ pub fn encode_branch(children: &[Option], out: &mut dyn BufMut) -> usiz // now encode the children for child in children.iter() { if let Some(child) = child { - out.put_slice(child.rlp()); + out.put_slice(child.rlp().as_slice()); } else { out.put_u8(EMPTY_STRING_CODE); } diff --git a/src/page/slotted_page/cell_pointer.rs b/src/page/slotted_page/cell_pointer.rs index bc4c5c26..dbe58144 100644 --- a/src/page/slotted_page/cell_pointer.rs +++ b/src/page/slotted_page/cell_pointer.rs @@ -1,6 +1,7 @@ use crate::page::{Page, PageError}; // A pointer to a page cell, which encodes the offset and length as 12-bit numbers in 3 bytes. +#[derive(Copy, Clone)] pub(crate) struct CellPointer<'p>(&'p [u8; 3]); #[derive(Debug)] diff --git a/src/pointer.rs b/src/pointer.rs index 96bae163..5a1c6e7a 100644 --- a/src/pointer.rs +++ b/src/pointer.rs @@ -1,37 +1,48 @@ -use alloy_trie::nodes::RlpNode; - use crate::{ + executor::{Inline, Wait}, location::Location, + rlp::DeferredRlpNode, storage::value::{self, Value}, }; use alloy_primitives::{B256, U256}; use alloy_rlp::encode; +use alloy_trie::nodes::RlpNode; use proptest::prelude::*; use proptest_derive::Arbitrary; const HASH_FLAG: u8 = 0x1; + /// A pointer to a node in the trie. /// This is a wrapper around a [Location] and an [RlpNode]. -#[derive(Debug, Clone, PartialEq, Eq, Arbitrary)] +#[derive(Debug, PartialEq, Eq, Clone, Arbitrary)] pub struct Pointer { location: Location, #[proptest(strategy = "u256_or_hash()")] - rlp: RlpNode, + rlp: DeferredRlpNode, } impl Pointer { /// Creates a new [Pointer] from a [Location] and an [RlpNode]. + #[inline] + #[must_use] pub fn new(location: Location, rlp: RlpNode) -> Self { + Self::new_deferred(location, rlp.into()) + } + + #[inline] + #[must_use] + pub fn new_deferred(location: Location, rlp: DeferredRlpNode) -> Self { Self { location, rlp } } /// Creates a new [Pointer] from a [Location] with an unhashed [RlpNode]. + #[must_use] pub fn new_unhashed(location: Location) -> Self { - Self { location, rlp: RlpNode::from_rlp(&[]) } + Self { location, rlp: RlpNode::from_rlp(&[]).into() } } /// Returns the [RlpNode] wrapped by the [Pointer]. - pub fn rlp(&self) -> &RlpNode { + pub fn rlp(&self) -> &DeferredRlpNode { &self.rlp } @@ -51,6 +62,15 @@ impl Pointer { } } +impl Wait for Pointer { + type Output = Self; + + fn wait(&self) -> &Self::Output { + self.rlp.wait(); + self + } +} + impl Value for Pointer { fn size(&self) -> usize { 37 // Fixed size: 4 bytes location + 33 bytes max RLP @@ -70,6 +90,7 @@ impl Value for Pointer { let arr: [u8; 37] = bytes.try_into().map_err(|_| value::Error::InvalidEncoding)?; let flags = arr[4]; let rlp = if flags & HASH_FLAG == HASH_FLAG { + debug_assert!(!(arr[5..37]).iter().all(|b| *b == 0), "read a hash of all zeros"); RlpNode::word_rlp(&B256::from_slice(&arr[5..37])) } else { // Because the RLP string must be 1-32 bytes, we can safely use the first byte to @@ -111,7 +132,7 @@ impl From<&Pointer> for [u8; 37] { // Determine flags and content let rlp = pointer.rlp(); let (flags, content) = - if rlp.is_hash() { (HASH_FLAG, &rlp[1..]) } else { (0, rlp.as_ref()) }; + if rlp.is_hash() { (HASH_FLAG, &rlp.as_slice()[1..]) } else { (0, rlp.as_slice()) }; data[4] = flags; let content_len = content.len().min(33); @@ -120,26 +141,26 @@ impl From<&Pointer> for [u8; 37] { } } -fn u256_or_hash() -> impl Strategy { +fn u256_or_hash() -> impl Strategy { prop_oneof![arb_u256_rlp(), arb_hash_rlp(),] } -fn arb_u256_rlp() -> impl Strategy { - any::().prop_map(|u| RlpNode::from_rlp(&encode(u))).boxed() +fn arb_u256_rlp() -> impl Strategy { + any::().prop_map(|u| DeferredRlpNode::from_rlp(Inline, encode(u))).boxed() } -fn arb_hash_rlp() -> impl Strategy { - any::().prop_map(|h: B256| RlpNode::word_rlp(&h)).boxed() +fn arb_hash_rlp() -> impl Strategy { + any::().prop_map(|h: B256| DeferredRlpNode::word_rlp(&h)).boxed() } #[cfg(test)] mod tests { + use super::*; + use crate::executor::Wait; use alloy_primitives::hex; use alloy_rlp::encode; use alloy_trie::EMPTY_ROOT_HASH; - use super::*; - #[test] fn test_pointer_to_bytes() { let rlp_hash = RlpNode::word_rlp(&EMPTY_ROOT_HASH); @@ -186,33 +207,33 @@ mod tests { rlp_hash_bytes.extend(&EMPTY_ROOT_HASH); let pointer = Pointer::from_bytes(&rlp_hash_bytes).unwrap(); assert_eq!(pointer.location(), Location::for_cell(1)); - assert_eq!(pointer.rlp(), &RlpNode::word_rlp(&EMPTY_ROOT_HASH)); + assert_eq!(pointer.rlp().wait(), &RlpNode::word_rlp(&EMPTY_ROOT_HASH)); let mut short_rlp_bytes = vec![0, 0, 0, 1, 0, 42]; short_rlp_bytes.extend([0; 31]); let pointer = Pointer::from_bytes(&short_rlp_bytes).unwrap(); assert_eq!(pointer.location(), Location::for_cell(1)); - assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&encode(42u64))); + assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&encode(42u64))); let mut zero_rlp_bytes = vec![0, 0, 0, 1, 0, 128]; zero_rlp_bytes.extend([0; 31]); let pointer = Pointer::from_bytes(&zero_rlp_bytes).unwrap(); assert_eq!(pointer.location(), Location::for_cell(1)); - assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&encode(0u64))); + assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&encode(0u64))); let mut short_string_rlp_bytes = vec![0, 0, 0, 1, 0, 139]; short_string_rlp_bytes.extend(b"hello world"); short_string_rlp_bytes.extend([0; 20]); let pointer = Pointer::from_bytes(&short_string_rlp_bytes).unwrap(); assert_eq!(pointer.location(), Location::for_cell(1)); - assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&encode("hello world"))); + assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&encode("hello world"))); let mut short_leaf_rlp_bytes = vec![0, 0, 0, 1, 0, 0xc7, 0x83, 0x61, 0x62, 0x63, 0x82, 0x30, 0x39]; short_leaf_rlp_bytes.extend([0; 24]); let pointer = Pointer::from_bytes(&short_leaf_rlp_bytes).unwrap(); assert_eq!(pointer.location(), Location::for_cell(1)); - assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&hex!("c783616263823039"))); + assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&hex!("c783616263823039"))); } proptest! { diff --git a/src/rlp.rs b/src/rlp.rs new file mode 100644 index 00000000..fd817740 --- /dev/null +++ b/src/rlp.rs @@ -0,0 +1,198 @@ +use crate::executor::{Executor, Future, Wait}; +use alloy_primitives::B256; +use alloy_rlp::{Decodable, EMPTY_STRING_CODE}; +use alloy_trie::nodes::RlpNode; +use proptest::{ + arbitrary::{self, Arbitrary}, + strategy::{self, Strategy}, +}; +use std::{borrow::Borrow, mem}; + +const fn empty_hash() -> RlpNode { + #[repr(C)] + struct RlpNodeInternal { + len: u32, + data: [u8; 33], + } + + let mut n = RlpNodeInternal { len: 33, data: [0u8; 33] }; + n.data[0] = EMPTY_STRING_CODE + 32; + + // SAFETY: `RlpNode` is a newtype wrapper around `ArrayVec`, which itself is a + // `repr(C)` type with the same fields as `RlpNodeInternal` + unsafe { mem::transmute(n) } +} + +static EMPTY_HASH: RlpNode = empty_hash(); + +/// An [`RlpNode`] whose final value may be computed at a later time, in a separate thread. +/// +/// Accessing the `RlpNode` before it has been fully computed will result in a temporary value +/// being returned. +#[derive(Clone, Debug)] +pub struct DeferredRlpNode(Future); + +impl DeferredRlpNode { + #[inline] + #[must_use] + pub fn from_raw(data: &[u8]) -> Option { + RlpNode::from_raw(data).map(Self::from) + } + + #[must_use] + pub fn from_rlp(executor: E, data: T) -> Self + where + E: Executor, + T: Borrow<[u8]>, + { + let data = data.borrow(); + if data.len() < 32 { + // This does not require any hash computation, so no need to run this in a separate + // thread. + // + // SAFETY: `data` is less than the maximum capacity, and `RlpNode::from_raw` is + // guaranteed not to fail in this case + unsafe { Self::from_raw(data).unwrap_unchecked() } + } else { + // This requires a hash computation. + let data = data.to_owned(); + Self(executor.defer(move || RlpNode::from_rlp(&data))) + } + } + + #[must_use] + pub fn from_rlp_with(executor: E, f: F) -> Self + where + E: Executor, + F: FnOnce() -> T + Send + 'static, + T: Borrow<[u8]>, + { + Self(executor.defer(move || { + let n = RlpNode::from_rlp(f().borrow()); + assert!( + n.is_hash(), + "closure passed to DeferredRlpNode::from_rlp_with must generate a hash" + ); + n + })) + } + + #[must_use] + pub fn word_rlp(word: &B256) -> Self { + Self::from(RlpNode::word_rlp(word)) + } + + #[inline] + #[must_use] + pub fn get(&self) -> Option<&RlpNode> { + self.0.get() + } + + #[inline] + #[must_use] + pub fn get_or_placeholder(&self) -> &RlpNode { + // If the `Future` value is not available (`.get()` returns `None`), then it can only mean + // that this node contains a hash. + self.get().unwrap_or(&EMPTY_HASH) + } + + #[inline] + #[must_use] + pub fn len(&self) -> usize { + self.get_or_placeholder().len() + } + + #[inline] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[inline] + #[must_use] + pub fn as_slice(&self) -> &[u8] { + self.get_or_placeholder().as_slice() + } + + #[inline] + #[must_use] + pub fn is_hash(&self) -> bool { + self.get_or_placeholder().is_hash() + } + + #[inline] + #[must_use] + pub fn as_hash(&self) -> Option { + self.get_or_placeholder().as_hash() + } +} + +impl Default for DeferredRlpNode { + #[inline] + fn default() -> Self { + Self(Future::ready(RlpNode::default())) + } +} + +impl From for DeferredRlpNode { + #[inline] + fn from(rlp: RlpNode) -> Self { + Self(Future::ready(rlp)) + } +} + +impl Wait for DeferredRlpNode { + type Output = RlpNode; + + #[inline] + fn wait(&self) -> &Self::Output { + self.0.wait() + } +} + +impl PartialEq for DeferredRlpNode { + fn eq(&self, other: &Self) -> bool { + self.wait() == other.wait() + } +} + +impl Eq for DeferredRlpNode {} + +impl Decodable for DeferredRlpNode { + #[inline] + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + RlpNode::decode(buf).map(Self::from) + } +} + +impl Arbitrary for DeferredRlpNode { + type Parameters = ::Parameters; + + type Strategy = strategy::Map<::Strategy, fn(RlpNode) -> Self>; + + fn arbitrary_with(params: Self::Parameters) -> Self::Strategy { + Strategy::prop_map(arbitrary::any_with::(params), Self::from) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::executor::Never; + + #[test] + fn empty_hash() { + let expected = RlpNode::word_rlp(&B256::ZERO); + assert_eq!(EMPTY_HASH, expected); + assert!(EMPTY_HASH.is_hash()); + assert!(EMPTY_HASH.as_hash().unwrap().is_zero()); + } + + #[test] + fn deferred_is_hash() { + let node = DeferredRlpNode::from_rlp(Never, [0; 128]); + assert_eq!(node.get(), None); + assert!(node.get_or_placeholder().is_hash()); + assert_eq!(node.get_or_placeholder().as_hash(), Some(B256::ZERO)); + } +} diff --git a/src/storage.rs b/src/storage.rs index b86be74f..3a3f8831 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,5 +1,7 @@ pub mod debug; pub mod engine; pub mod proofs; -mod test_utils; pub mod value; + +mod deferred_changes; +mod test_utils; diff --git a/src/storage/deferred_changes.rs b/src/storage/deferred_changes.rs new file mode 100644 index 00000000..5afd1773 --- /dev/null +++ b/src/storage/deferred_changes.rs @@ -0,0 +1,93 @@ +//! Structs to enable the computation of values generated by [`StorageEngine`] in background +//! threads. + +use crate::{ + context::TransactionContext, + executor::Wait, + node::Node, + page::{PageId, SlottedPageMut}, + storage::engine::StorageEngine, +}; +use std::{ + cell::LazyCell, + collections::HashMap, + fmt::Debug, + ops::{Deref, DerefMut}, +}; + +#[derive(Default, Debug)] +pub(super) struct DeferredChanges { + set: HashMap<(PageId, u8), Node>, +} + +impl DeferredChanges { + pub(super) fn new() -> Self { + Self::default() + } + + pub(super) fn set_value(&mut self, page_id: PageId, index: u8, value: Node) { + self.set.insert((page_id, index), value); + } + + pub(super) fn delete_value(&mut self, page_id: PageId, index: u8) { + self.set.remove(&(page_id, index)); + } + + pub(super) fn swap_cell_pointers(&mut self, page_id: PageId, index1: u8, index2: u8) { + let value1 = self.set.remove(&(page_id, index1)); + let value2 = self.set.remove(&(page_id, index2)); + if let Some(value1) = value1 { + self.set.insert((page_id, index2), value1); + } + if let Some(value2) = value2 { + self.set.insert((page_id, index1), value2); + } + } + + pub(super) fn orphan_page(&mut self, remove_page_id: PageId) { + self.set.retain(|&(page_id, _index), _value| page_id != remove_page_id); + } + + pub(super) fn flush(&mut self, engine: &StorageEngine, context: &mut TransactionContext) { + for ((page_id, index), value) in self.set.drain() { + let mut slotted_page = SlottedPageMut::try_from( + engine.get_mut_page(context, page_id).expect("page disappeared"), + ) + .expect("page could not be parsed"); + slotted_page.set_value(index, value.wait()).expect("set failed"); + } + } + + pub(super) fn flush_page_id( + &mut self, + engine: &StorageEngine, + context: &mut TransactionContext, + flush_page_id: PageId, + ) { + // Don't retrieve the page unless it's present in our set (retrieving pages is expensive + // due to the use of atomic operations). + let slotted_page = LazyCell::new(|| { + SlottedPageMut::try_from( + engine.get_mut_page(context, flush_page_id).expect("page disappeared"), + ) + .expect("page could not be parsed") + }); + self.flush_page_generic(flush_page_id, slotted_page); + } + + pub(super) fn flush_page(&mut self, slotted_page: &mut SlottedPageMut<'_>) { + let flush_page_id = slotted_page.id(); + self.flush_page_generic(flush_page_id, slotted_page); + } + + fn flush_page_generic<'a, P>(&mut self, flush_page_id: PageId, mut slotted_page: P) + where + P: Deref> + DerefMut, + { + for ((_page_id, index), value) in + self.set.extract_if(|&(page_id, _index), _value| page_id == flush_page_id) + { + slotted_page.set_value(index, value.wait()).expect("set failed"); + } + } +} diff --git a/src/storage/engine.rs b/src/storage/engine.rs index b899034d..51a568fb 100644 --- a/src/storage/engine.rs +++ b/src/storage/engine.rs @@ -1,6 +1,7 @@ use crate::{ account::Account, context::TransactionContext, + executor::Wait, location::Location, meta::{MetadataManager, OrphanPage}, node::{ @@ -14,12 +15,14 @@ use crate::{ }, path::{AddressPath, StoragePath, ADDRESS_PATH_LENGTH, STORAGE_PATH_LENGTH}, pointer::Pointer, + rlp::DeferredRlpNode, snapshot::SnapshotId, - storage::value::Value, + storage::{deferred_changes::DeferredChanges, value::Value}, }; use alloy_primitives::StorageValue; -use alloy_trie::{nodes::RlpNode, nybbles, Nibbles, EMPTY_ROOT_HASH}; +use alloy_trie::{nybbles, Nibbles, EMPTY_ROOT_HASH}; use parking_lot::Mutex; +use rayon::ThreadPool; use std::{ fmt::Debug, io, @@ -37,6 +40,7 @@ pub struct StorageEngine { pub(crate) page_manager: PageManager, pub(crate) meta_manager: Mutex, pub(crate) alive_snapshot: AtomicU64, + thread_pool: ThreadPool, } #[derive(Debug)] @@ -47,12 +51,17 @@ enum PointerChange { } impl StorageEngine { - pub fn new(page_manager: PageManager, meta_manager: MetadataManager) -> Self { + pub fn new( + page_manager: PageManager, + meta_manager: MetadataManager, + thread_pool: ThreadPool, + ) -> Self { let alive_snapshot = meta_manager.active_slot().snapshot_id(); Self { page_manager, meta_manager: Mutex::new(meta_manager), alive_snapshot: AtomicU64::new(alive_snapshot), + thread_pool, } } @@ -292,6 +301,8 @@ impl StorageEngine { context: &mut TransactionContext, mut changes: &mut [(Nibbles, Option)], ) -> Result<(), Error> { + let mut deferred = DeferredChanges::new(); + changes.sort_by(|a, b| a.0.cmp(&b.0)); if context.root_node_page_id.is_none() { // Handle empty trie case, inserting the first new value before traversing the trie. @@ -302,7 +313,7 @@ impl StorageEngine { let root_pointer = self.initialize_empty_trie(context, path, value, &mut slotted_page)?; context.root_node_page_id = root_pointer.location().page_id(); - context.root_node_hash = root_pointer.rlp().as_hash().unwrap(); + context.root_node_hash = root_pointer.wait().rlp().as_hash().unwrap(); if remaining_changes.is_empty() { return Ok(()); } @@ -316,12 +327,20 @@ impl StorageEngine { } }); - let pointer_change = - self.set_values_in_page(context, changes, 0, context.root_node_page_id.unwrap())?; + let pointer_change = self.set_values_in_page( + context, + changes, + 0, + context.root_node_page_id.unwrap(), + &mut deferred, + )?; + + deferred.flush(self, context); + match pointer_change { PointerChange::Update(pointer) => { context.root_node_page_id = pointer.location().page_id(); - context.root_node_hash = pointer.rlp().as_hash().unwrap(); + context.root_node_hash = pointer.wait().rlp().as_hash().unwrap(); } PointerChange::Delete => { context.root_node_page_id = None; @@ -329,6 +348,7 @@ impl StorageEngine { } PointerChange::None => (), } + Ok(()) } @@ -338,7 +358,9 @@ impl StorageEngine { mut changes: &[(Nibbles, Option)], path_offset: u8, page_id: PageId, + deferred: &mut DeferredChanges, ) -> Result { + deferred.flush_page_id(self, context, page_id); let page = self.get_mut_clone(context, page_id)?; let mut new_slotted_page = SlottedPageMut::try_from(page)?; let mut split_count = 0; @@ -350,12 +372,10 @@ impl StorageEngine { path_offset, &mut new_slotted_page, 0, + deferred, ); match result { - Ok(PointerChange::Delete) => return Ok(PointerChange::Delete), - Ok(PointerChange::None) => return Ok(PointerChange::None), - Ok(PointerChange::Update(pointer)) => return Ok(PointerChange::Update(pointer)), // In the case of a page split, re-attempt the operation from scratch. This ensures // that a page will be consistently evaluated, and not modified in // the middle of an operation, which could result in inconsistent @@ -369,10 +389,7 @@ impl StorageEngine { return Err(Error::PageError(PageError::PageSplitLimitReached)); } } - Err(Error::PageError(PageError::PageIsFull)) => { - return Err(Error::PageError(PageError::PageIsFull)); - } - Err(e) => return Err(e), + value => return value, } } } @@ -405,11 +422,15 @@ impl StorageEngine { path_offset: u8, slotted_page: &mut SlottedPageMut<'_>, page_index: u8, + deferred: &mut DeferredChanges, ) -> Result { if changes.is_empty() { return Ok(PointerChange::None); } + deferred.flush_page(slotted_page); // TODO: instead of flushing, + // change the call to `get_value` + // to check `deferred` first let mut node = slotted_page.get_value::(page_index)?; // Find the shortest common prefix between the node path and the changes @@ -435,6 +456,7 @@ impl StorageEngine { path_offset, slotted_page, page_index, + deferred, ); } else if changes_left.is_empty() { return self.set_values_in_cloned_page( @@ -443,6 +465,7 @@ impl StorageEngine { path_offset, slotted_page, page_index, + deferred, ); } else { unreachable!( @@ -459,6 +482,7 @@ impl StorageEngine { &mut node, common_prefix, common_prefix_length, + deferred, ); } @@ -475,6 +499,7 @@ impl StorageEngine { &mut node, path, value, + deferred, ); } @@ -488,6 +513,7 @@ impl StorageEngine { page_index, &mut node, common_prefix_length, + deferred, ); } @@ -501,6 +527,7 @@ impl StorageEngine { page_index, &mut node, common_prefix_length, + deferred, ) } @@ -532,6 +559,7 @@ impl StorageEngine { node: &mut Node, common_prefix: Nibbles, common_prefix_length: usize, + deferred: &mut DeferredChanges, ) -> Result { // Create a new branch node with the common prefix let mut new_parent_branch = Node::new_branch(common_prefix)?; @@ -541,14 +569,16 @@ impl StorageEngine { if slotted_page.num_free_bytes() < new_parent_branch.size() + CELL_POINTER_SIZE - common_prefix_length / 2 { + deferred.flush_page(slotted_page); self.split_page(context, slotted_page)?; return Err(Error::PageSplit(0)); } let node_branch_index = node.prefix()[common_prefix_length]; // Update the existing node with the new prefix node.set_prefix(node.prefix().slice(common_prefix_length + 1..))?; - let rlp_node = node.to_rlp_node(); + let rlp_node = node.to_deferred_rlp_node(&self.thread_pool); slotted_page.set_value(cell_index, node)?; + deferred.set_value(slotted_page.id(), cell_index, node.clone()); // Make sure there is no insertions until the new branch node is inserted let new_parent_branch_cell_index = slotted_page.next_free_cell_index()?; @@ -556,17 +586,26 @@ impl StorageEngine { // existing node later. new_parent_branch.set_child( node_branch_index, - Pointer::new(Location::for_cell(new_parent_branch_cell_index), rlp_node), + Pointer::new_deferred(Location::for_cell(new_parent_branch_cell_index), rlp_node), )?; let inserted_branch_cell_index = slotted_page.insert_value(&new_parent_branch)?; + deferred.set_value(slotted_page.id(), inserted_branch_cell_index, new_parent_branch); debug_assert_eq!( inserted_branch_cell_index, new_parent_branch_cell_index, "new parent branch cell index should be the same as the next free cell index, a different caused by interruption between the next_free_cell_index and insert_value" ); slotted_page.swap_cell_pointers(cell_index, new_parent_branch_cell_index)?; + deferred.swap_cell_pointers(slotted_page.id(), cell_index, new_parent_branch_cell_index); // Insert the changes into the new branch via recursion - self.set_values_in_cloned_page(context, changes, path_offset, slotted_page, cell_index) + self.set_values_in_cloned_page( + context, + changes, + path_offset, + slotted_page, + cell_index, + deferred, + ) } /// Handles the case when the path matches the node prefix exactly @@ -580,15 +619,17 @@ impl StorageEngine { node: &mut Node, path: Nibbles, value: Option<&TrieValue>, + deferred: &mut DeferredChanges, ) -> Result { if value.is_none() { // Delete the node if node.has_children() { // Delete the entire subtrie (e.g., for an AccountLeaf, delete all storage) - self.delete_subtrie(context, slotted_page, page_index)?; + self.delete_subtrie(context, slotted_page, page_index, deferred)?; } slotted_page.delete_value(page_index)?; + deferred.delete_value(slotted_page.id(), page_index); assert_eq!( changes.len(), @@ -614,6 +655,7 @@ impl StorageEngine { path_offset, slotted_page, page_index, + deferred, ); } @@ -630,16 +672,18 @@ impl StorageEngine { if new_node_size > old_node_size { let node_size_incr = new_node_size - old_node_size; if slotted_page.num_free_bytes() < node_size_incr { + deferred.flush_page(slotted_page); self.split_page(context, slotted_page)?; return Err(Error::PageSplit(0)); } } slotted_page.set_value(page_index, &new_node)?; + deferred.set_value(slotted_page.id(), page_index, new_node.clone()); if remaining_changes.is_empty() { - let rlp_node = new_node.to_rlp_node(); - Ok(PointerChange::Update(Pointer::new( + let rlp_node = new_node.to_deferred_rlp_node(&self.thread_pool); + Ok(PointerChange::Update(Pointer::new_deferred( node_location(slotted_page.id(), page_index), rlp_node, ))) @@ -652,14 +696,15 @@ impl StorageEngine { path_offset, slotted_page, page_index, + deferred, ); match account_pointer_change { Ok(PointerChange::Update(pointer)) => Ok(PointerChange::Update(pointer)), Ok(PointerChange::None) => { // even if the storage is unchanged, we still need to update the RLP encoding of // this account node as its contents have changed - let rlp_node = new_node.to_rlp_node(); - Ok(PointerChange::Update(Pointer::new( + let rlp_node = new_node.to_deferred_rlp_node(&self.thread_pool); + Ok(PointerChange::Update(Pointer::new_deferred( node_location(slotted_page.id(), page_index), rlp_node, ))) @@ -682,6 +727,7 @@ impl StorageEngine { page_index: u8, node: &mut Node, common_prefix_length: usize, + deferred: &mut DeferredChanges, ) -> Result { let child_pointer = node.direct_child()?; @@ -695,6 +741,7 @@ impl StorageEngine { path_offset + common_prefix_length as u8, slotted_page, child_cell_index, + deferred, )? } else { // Handle remote child node (on different page) @@ -704,6 +751,7 @@ impl StorageEngine { changes, path_offset + common_prefix_length as u8, child_page_id, + deferred, )? }; @@ -715,17 +763,18 @@ impl StorageEngine { page_index, Some(new_child_pointer), 0, + deferred, )?; - let rlp_node = node.to_rlp_node(); - Ok(PointerChange::Update(Pointer::new( + let rlp_node = node.to_deferred_rlp_node(&self.thread_pool); + Ok(PointerChange::Update(Pointer::new_deferred( node_location(slotted_page.id(), page_index), rlp_node, ))) } PointerChange::Delete => { - self.update_node_child(node, slotted_page, page_index, None, 0)?; - let rlp_node = node.to_rlp_node(); - Ok(PointerChange::Update(Pointer::new( + self.update_node_child(node, slotted_page, page_index, None, 0, deferred)?; + let rlp_node = node.to_deferred_rlp_node(&self.thread_pool); + Ok(PointerChange::Update(Pointer::new_deferred( node_location(slotted_page.id(), page_index), rlp_node, ))) @@ -742,6 +791,7 @@ impl StorageEngine { page_index, node, common_prefix_length, + deferred, ) } } @@ -756,6 +806,7 @@ impl StorageEngine { page_index: u8, node: &mut Node, common_prefix_length: usize, + deferred: &mut DeferredChanges, ) -> Result { if changes.is_empty() { return Ok(PointerChange::None); @@ -772,6 +823,7 @@ impl StorageEngine { path_offset, slotted_page, page_index, + deferred, ); } @@ -785,20 +837,23 @@ impl StorageEngine { // 3. and add new cell pointer for the new leaf node (3 bytes) // when adding the new child, split the page. if slotted_page.num_free_bytes() < node_size_incr + new_node.size() + CELL_POINTER_SIZE { + deferred.flush_page(slotted_page); self.split_page(context, slotted_page)?; return Err(Error::PageSplit(0)); } - let rlp_node = new_node.to_rlp_node(); + let rlp_node = new_node.to_deferred_rlp_node(&self.thread_pool); // Insert the new node and update the parent - let location = Location::for_cell(slotted_page.insert_value(&new_node)?); - node.set_child(0, Pointer::new(location, rlp_node))?; + let cell_index = slotted_page.insert_value(&new_node)?; + let location = Location::for_cell(cell_index); + node.set_child(0, Pointer::new_deferred(location, rlp_node))?; slotted_page.set_value(page_index, node)?; + deferred.set_value(slotted_page.id(), page_index, node.clone()); if remaining_changes.is_empty() { - let rlp_node = node.to_rlp_node(); - return Ok(PointerChange::Update(Pointer::new( + let rlp_node = node.to_deferred_rlp_node(&self.thread_pool); + return Ok(PointerChange::Update(Pointer::new_deferred( node_location(slotted_page.id(), page_index), rlp_node, ))); @@ -811,6 +866,7 @@ impl StorageEngine { path_offset, slotted_page, page_index, + deferred, ) } @@ -822,6 +878,7 @@ impl StorageEngine { page_index: u8, new_child_pointer: Option, child_index: u8, + deferred: &mut DeferredChanges, ) -> Result<(), Error> { if let Some(new_child_pointer) = new_child_pointer { node.set_child(child_index, new_child_pointer)?; @@ -830,6 +887,7 @@ impl StorageEngine { } slotted_page.set_value(page_index, node)?; + deferred.set_value(slotted_page.id(), page_index, node.clone()); Ok(()) } @@ -843,6 +901,7 @@ impl StorageEngine { page_index: u8, node: &mut Node, common_prefix_length: usize, + deferred: &mut DeferredChanges, ) -> Result { // Partition changes by child index let mut remaining_changes = changes; @@ -867,6 +926,7 @@ impl StorageEngine { node, common_prefix_length, child_index, + deferred, ); match result { Err(Error::PageSplit(processed)) => { @@ -882,7 +942,7 @@ impl StorageEngine { assert!(handled_in_children == changes.len(), "all changes should be handled"); // Check if the branch node should be deleted or merged - self.handle_branch_node_cleanup(context, slotted_page, page_index, node) + self.handle_branch_node_cleanup(context, slotted_page, page_index, node, deferred) } fn handle_child_node_traversal( @@ -895,6 +955,7 @@ impl StorageEngine { node: &mut Node, common_prefix_length: usize, child_index: u8, + deferred: &mut DeferredChanges, ) -> Result<(), Error> { // Get the child pointer for this index let child_pointer = node.child(child_index)?; @@ -912,6 +973,7 @@ impl StorageEngine { path_offset + common_prefix_length as u8 + 1, slotted_page, child_cell_index, + deferred, )? } else { // Remote child node @@ -921,6 +983,7 @@ impl StorageEngine { matching_changes, path_offset + common_prefix_length as u8 + 1, child_page_id, + deferred, )? }; @@ -932,10 +995,18 @@ impl StorageEngine { page_index, Some(new_child_pointer), child_index, + deferred, )?; } PointerChange::Delete => { - self.update_node_child(node, slotted_page, page_index, None, child_index)?; + self.update_node_child( + node, + slotted_page, + page_index, + None, + child_index, + deferred, + )?; } PointerChange::None => {} } @@ -979,14 +1050,18 @@ impl StorageEngine { if slotted_page.num_free_bytes() < node_size_incr + new_node.size() + CELL_POINTER_SIZE { + deferred.flush_page(slotted_page); self.split_page(context, slotted_page)?; return Err(Error::PageSplit(0)); } - let rlp_node = new_node.to_rlp_node(); - let location = Location::for_cell(slotted_page.insert_value(&new_node)?); - node.set_child(child_index, Pointer::new(location, rlp_node))?; + let rlp_node = new_node.to_deferred_rlp_node(&self.thread_pool); + let new_node_cell_index = slotted_page.insert_value(&new_node)?; + deferred.set_value(slotted_page.id(), new_node_cell_index, new_node); + let location = Location::for_cell(new_node_cell_index); + node.set_child(child_index, Pointer::new_deferred(location, rlp_node))?; slotted_page.set_value(page_index, node)?; + deferred.set_value(slotted_page.id(), page_index, node.clone()); // If there are more matching changes, recurse if !matching_changes.is_empty() { @@ -996,6 +1071,7 @@ impl StorageEngine { path_offset + common_prefix_length as u8 + 1, slotted_page, location.cell_index().unwrap(), + deferred, )?; match child_pointer_change { @@ -1006,6 +1082,7 @@ impl StorageEngine { page_index, Some(new_child_pointer), child_index, + deferred, )?; } PointerChange::Delete => { @@ -1015,6 +1092,7 @@ impl StorageEngine { page_index, None, child_index, + deferred, )?; } PointerChange::None => {} @@ -1032,24 +1110,35 @@ impl StorageEngine { slotted_page: &mut SlottedPageMut<'_>, page_index: u8, node: &Node, + deferred: &mut DeferredChanges, ) -> Result { let children = node.enumerate_children()?; if children.is_empty() { // Delete empty branch node slotted_page.delete_value(page_index)?; + deferred.delete_value(slotted_page.id(), page_index); // If we're the root node, orphan our page if page_index == 0 { self.orphan_page(context, slotted_page.id())?; + deferred.orphan_page(slotted_page.id()); } Ok(PointerChange::Delete) } else if children.len() == 1 { // Merge branch with its only child let (idx, ptr) = children[0]; - self.merge_branch_with_only_child(context, slotted_page, page_index, node, idx, ptr) + self.merge_branch_with_only_child( + context, + slotted_page, + page_index, + node, + idx, + ptr, + deferred, + ) } else { // Normal branch node with multiple children - let rlp_node = node.to_rlp_node(); - Ok(PointerChange::Update(Pointer::new( + let rlp_node = node.to_deferred_rlp_node(&self.thread_pool); + Ok(PointerChange::Update(Pointer::new_deferred( node_location(slotted_page.id(), page_index), rlp_node, ))) @@ -1065,7 +1154,12 @@ impl StorageEngine { node: &Node, only_child_index: u8, only_child_node_pointer: &Pointer, + deferred: &mut DeferredChanges, ) -> Result { + deferred.flush_page(slotted_page); // TODO: instead of flushing, + // change the call to `get_value` + // to check `deferred` first + // Get the child node let (mut only_child_node, child_slotted_page) = if let Some(cell_index) = only_child_node_pointer.location().cell_index() { @@ -1073,6 +1167,11 @@ impl StorageEngine { (slotted_page.get_value::(cell_index)?, None) } else { // Child is on another page + deferred.flush_page_id( + self, + context, + only_child_node_pointer.location().page_id().expect("page_id should exist"), + ); let child_page = self.get_mut_clone( context, only_child_node_pointer.location().page_id().expect("page_id should exist"), @@ -1088,7 +1187,7 @@ impl StorageEngine { only_child_node.set_prefix(new_nibbles)?; // Get the RLP node for the merged child - let rlp_node = only_child_node.to_rlp_node(); + let rlp_node = only_child_node.to_deferred_rlp_node(&self.thread_pool); let child_is_in_same_page = child_slotted_page.is_none(); @@ -1100,6 +1199,7 @@ impl StorageEngine { only_child_node, only_child_node_pointer, rlp_node, + deferred, ) } else { // Child is on another page @@ -1110,6 +1210,7 @@ impl StorageEngine { only_child_node, child_slotted_page.unwrap(), rlp_node, + deferred, ) } } @@ -1121,7 +1222,8 @@ impl StorageEngine { page_index: u8, only_child_node: Node, only_child_node_pointer: &Pointer, - rlp_node: RlpNode, + rlp_node: DeferredRlpNode, + deferred: &mut DeferredChanges, ) -> Result { let child_cell_index = only_child_node_pointer.location().cell_index().expect("cell index should exist"); @@ -1129,15 +1231,18 @@ impl StorageEngine { // Delete both nodes and insert the merged one slotted_page.delete_value(child_cell_index)?; slotted_page.delete_value(page_index)?; + deferred.delete_value(slotted_page.id(), child_cell_index); + deferred.delete_value(slotted_page.id(), page_index); let only_child_node_index = slotted_page.insert_value(&only_child_node)?; + deferred.set_value(slotted_page.id(), only_child_node_index, only_child_node); // If we are the root of the page, we must insert at index 0 if page_index == 0 { assert_eq!(only_child_node_index, page_index); } - Ok(PointerChange::Update(Pointer::new( + Ok(PointerChange::Update(Pointer::new_deferred( node_location(slotted_page.id(), only_child_node_index), rlp_node, ))) @@ -1151,26 +1256,34 @@ impl StorageEngine { page_index: u8, only_child_node: Node, mut child_slotted_page: SlottedPageMut<'_>, - rlp_node: RlpNode, + rlp_node: DeferredRlpNode, + deferred: &mut DeferredChanges, ) -> Result { let branch_page_id = slotted_page.id(); // Ensure the child page has enough space if child_slotted_page.num_free_bytes() < 200 { + deferred.flush_page(&mut child_slotted_page); self.split_page(context, &mut child_slotted_page)?; // Not returning Error::PageSplit because we're splitting the child page } // Delete ourselves and update the child slotted_page.delete_value(page_index)?; + deferred.delete_value(slotted_page.id(), page_index); child_slotted_page.set_value(0, &only_child_node)?; + deferred.set_value(child_slotted_page.id(), 0, only_child_node); // If we're the root node, orphan our page if page_index == 0 { self.orphan_page(context, branch_page_id)?; + deferred.orphan_page(slotted_page.id()); } - Ok(PointerChange::Update(Pointer::new(node_location(child_slotted_page.id(), 0), rlp_node))) + Ok(PointerChange::Update(Pointer::new_deferred( + node_location(child_slotted_page.id(), 0), + rlp_node, + ))) } // Split the page into two, moving the largest immediate subtrie of the root node to a new child @@ -1220,7 +1333,7 @@ impl StorageEngine { largest_child_index, Pointer::new( Location::for_page(child_slotted_page.id()), - largest_child_pointer.rlp().clone(), + largest_child_pointer.rlp().wait().clone(), ), )?; } @@ -1236,13 +1349,14 @@ impl StorageEngine { context: &mut TransactionContext, slotted_page: &mut SlottedPageMut<'_>, cell_index: u8, + deferred: &mut DeferredChanges, ) -> Result<(), Error> { if cell_index == 0 { // if we are a root node, deleting ourself will orphan our entire page and // all of our descendant pages. Instead of deleting each cell one-by-one // we can orphan our entire page, and recursively orphan all our descendant // pages as well. - return self.orphan_subtrie(context, slotted_page, cell_index); + return self.orphan_subtrie(context, slotted_page, cell_index, deferred); } let node: Node = slotted_page.get_value(cell_index)?; @@ -1252,7 +1366,7 @@ impl StorageEngine { for (_, child_ptr) in children { if let Some(cell_index) = child_ptr.location().cell_index() { - self.delete_subtrie(context, slotted_page, cell_index)? + self.delete_subtrie(context, slotted_page, cell_index, deferred)? } else { // the child is a root of another page, and that child will be // deleted, essentially orphaning that page and all descendants of @@ -1260,12 +1374,13 @@ impl StorageEngine { let page_id = child_ptr.location().page_id().expect("page_id must exist"); let page = self.get_page(context, page_id)?; let slotted_page = SlottedPage::try_from(page)?; - self.orphan_subtrie(context, &slotted_page, 0)? + self.orphan_subtrie(context, &slotted_page, 0, deferred)? } } } slotted_page.delete_value(cell_index)?; + deferred.delete_value(slotted_page.id(), cell_index); Ok(()) } @@ -1276,6 +1391,7 @@ impl StorageEngine { context: &mut TransactionContext, slotted_page: &SlottedPage<'_>, cell_index: u8, + deferred: &mut DeferredChanges, ) -> Result<(), Error> { let node: Node = slotted_page.get_value(cell_index)?; @@ -1284,7 +1400,7 @@ impl StorageEngine { for (_, child_ptr) in children { if let Some(cell_index) = child_ptr.location().cell_index() { - self.orphan_subtrie(context, slotted_page, cell_index)?; + self.orphan_subtrie(context, slotted_page, cell_index, deferred)?; } else { // the child is a root of another page, and that child will be // deleted, essentially orphaning that page and all descendants of @@ -1294,13 +1410,14 @@ impl StorageEngine { child_ptr.location().page_id().expect("page_id must exist"), )?; let child_slotted_page = SlottedPage::try_from(child_page)?; - self.orphan_subtrie(context, &child_slotted_page, 0)? + self.orphan_subtrie(context, &child_slotted_page, 0, deferred)? } } } if cell_index == 0 { self.orphan_page(context, slotted_page.id())?; + deferred.orphan_page(slotted_page.id()); } Ok(()) @@ -1418,8 +1535,10 @@ fn move_subtrie_nodes( // Recursively move its children let new_location = move_subtrie_nodes(source_page, child_index, target_page)?; // update the pointer in the parent node - updated_node - .set_child(branch_index, Pointer::new(new_location, child_ptr.rlp().clone()))?; + updated_node.set_child( + branch_index, + Pointer::new(new_location, child_ptr.rlp().wait().clone()), + )?; } } } @@ -1539,6 +1658,7 @@ mod tests { }; use alloy_primitives::{address, b256, hex, keccak256, Address, StorageKey, B256, U256}; use alloy_trie::{ + nodes::RlpNode, root::{storage_root_unhashed, storage_root_unsorted}, EMPTY_ROOT_HASH, KECCAK_EMPTY, }; @@ -1713,7 +1833,7 @@ mod tests { .as_mut(), ) .unwrap(); - assert_metrics(&context, 1, 1, 0, 0); + assert_metrics(&context, 3, 1, 0, 0); assert_eq!( context.root_node_hash, @@ -1749,7 +1869,7 @@ mod tests { .as_mut(), ) .unwrap(); - assert_metrics(&context, 1, 1, 0, 0); + assert_metrics(&context, 4, 1, 0, 0); assert_eq!( context.root_node_hash, @@ -1802,7 +1922,7 @@ mod tests { )); storage_engine.set_values(&mut context, changes.as_mut()).unwrap(); - assert_metrics(&context, 2, 1, 0, 0); + assert_metrics(&context, 9, 1, 0, 0); assert_eq!( context.root_node_hash, diff --git a/src/storage/test_utils.rs b/src/storage/test_utils.rs index 7469f45a..82e542ff 100644 --- a/src/storage/test_utils.rs +++ b/src/storage/test_utils.rs @@ -1,20 +1,23 @@ #![cfg(test)] -use alloy_primitives::U256; -use alloy_trie::{EMPTY_ROOT_HASH, KECCAK_EMPTY}; -use rand::{rngs::StdRng, RngCore}; - use crate::{ - account::Account, context::TransactionContext, meta::MetadataManager, + account::Account, context::TransactionContext, executor::threadpool, meta::MetadataManager, storage::engine::StorageEngine, PageManager, }; +use alloy_primitives::U256; +use alloy_trie::{EMPTY_ROOT_HASH, KECCAK_EMPTY}; +use rand::{rngs::StdRng, RngCore}; pub(crate) fn create_test_engine(max_pages: u32) -> (StorageEngine, TransactionContext) { let meta_manager = MetadataManager::from_file(tempfile::tempfile().expect("failed to create temporary file")) .expect("failed to open metadata file"); - let page_manager = PageManager::options().max_pages(max_pages).open_temp_file().unwrap(); - let storage_engine = StorageEngine::new(page_manager, meta_manager); + let page_manager = PageManager::options() + .max_pages(max_pages) + .open_temp_file() + .expect("failed to create page manager"); + let thread_pool = threadpool::builder().build().expect("failed to create thread pool"); + let storage_engine = StorageEngine::new(page_manager, meta_manager, thread_pool); let context = storage_engine.write_context(); (storage_engine, context) } @@ -34,24 +37,26 @@ pub(crate) fn assert_metrics( pages_reallocated: u32, pages_split: u32, ) { + /// Struct used to make error messages easier to read + #[derive(PartialEq, Eq, Debug)] + struct Metrics { + pages_read: u32, + pages_allocated: u32, + pages_reallocated: u32, + pages_split: u32, + } + + let expected = Metrics { pages_read, pages_allocated, pages_reallocated, pages_split }; + + let actual = Metrics { + pages_read: context.transaction_metrics.get_pages_read(), + pages_allocated: context.transaction_metrics.get_pages_allocated(), + pages_reallocated: context.transaction_metrics.get_pages_reallocated(), + pages_split: context.transaction_metrics.get_pages_split(), + }; + assert_eq!( - context.transaction_metrics.get_pages_read(), - pages_read, - "unexpected number of pages read" - ); - assert_eq!( - context.transaction_metrics.get_pages_allocated(), - pages_allocated, - "unexpected number of pages allocated" - ); - assert_eq!( - context.transaction_metrics.get_pages_reallocated(), - pages_reallocated, - "unexpected number of pages reallocated" - ); - assert_eq!( - context.transaction_metrics.get_pages_split(), - pages_split, - "unexpected number of pages split" + expected, actual, + "transaction metrics don't match:\n expected: {expected:?}\n actual: {actual:?}" ); }