Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
context::TransactionContext,
executor::threadpool,
meta::{MetadataManager, OpenMetadataError},
metrics::DatabaseMetrics,
page::{PageError, PageId, PageManager},
Expand All @@ -8,9 +9,11 @@ use crate::{
};
use alloy_primitives::B256;
use parking_lot::Mutex;
use rayon::ThreadPoolBuildError;
use std::{
fs::File,
io,
num::NonZero,
ops::Deref,
path::{Path, PathBuf},
};
Expand All @@ -30,6 +33,7 @@ pub struct DatabaseOptions {
wipe: bool,
meta_path: Option<PathBuf>,
max_pages: u32,
num_threads: Option<NonZero<usize>>,
}

#[derive(Debug)]
Expand All @@ -42,6 +46,7 @@ pub enum Error {
pub enum OpenError {
PageError(PageError),
MetadataError(OpenMetadataError),
ThreadPoolError(ThreadPoolBuildError),
IO(io::Error),
}

Expand Down Expand Up @@ -86,6 +91,16 @@ impl DatabaseOptions {
self
}

/// Sets the maximum number of threads used to CPU-intensive computations (like hashing).
///
/// By default, the number of threads is selected automatically based on the number of
/// available CPUs on the system. The algorithm for deciding the default number is not
/// specified and may change in the future.
pub fn num_threads(&mut self, num_threads: NonZero<usize>) -> &mut Self {
self.num_threads = Some(num_threads);
self
}

/// Opens the database file at the given path.
pub fn open(&self, db_path: impl AsRef<Path>) -> Result<Database, OpenError> {
let db_path = db_path.as_ref();
Expand Down Expand Up @@ -144,7 +159,12 @@ impl Database {
.open(db_path)
.map_err(OpenError::PageError)?;

Ok(Self::new(StorageEngine::new(page_manager, meta_manager)))
let thread_pool = threadpool::builder()
.num_threads(opts.num_threads.map(NonZero::get).unwrap_or(0))
.build()
.map_err(OpenError::ThreadPoolError)?;

Ok(Self::new(StorageEngine::new(page_manager, meta_manager, thread_pool)))
}

pub fn new(storage_engine: StorageEngine) -> Self {
Expand Down
9 changes: 6 additions & 3 deletions src/executor/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ impl<T> Clone for Future<T> {

impl<T: fmt::Debug> fmt::Debug for Future<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Pending;
struct Pending(*const ());

impl fmt::Debug for Pending {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<pending>")
write!(f, "<pending @ {:?}>", self.0)
}
}

Expand All @@ -121,9 +121,12 @@ impl<T: fmt::Debug> fmt::Debug for Future<T> {
}
}

let p = Arc::as_ptr(&self.cell) as *const ();
let p = Pending(p);

f.debug_tuple("Future")
.field(match self.try_get() {
None => &Pending,
None => &p,
Some(Ok(value)) => value,
Some(Err(PoisonError)) => &Poisoned,
})
Expand Down
4 changes: 4 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@

mod futures;
mod inline;
mod never;
mod traits;

pub mod threadpool;

pub use futures::{Future, PoisonError};
pub use inline::Inline;
pub use traits::{Executor, Wait};

#[cfg(test)]
pub(crate) use never::Never;
30 changes: 30 additions & 0 deletions src/executor/never.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#![cfg(test)]

use crate::executor::{Executor, Future};

/// A dummy executor that never executes any function.
#[derive(Copy, Clone, Debug)]
pub(crate) struct Never;

impl Executor for Never {
#[inline]
fn defer<F, T>(&self, _: F) -> Future<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + Sync + 'static,
{
Future::pending()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn defer() {
let never = Never;
let future = never.defer(|| 123);
assert_eq!(future.get(), None);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod node;
pub mod page;
pub mod path;
pub mod pointer;
pub mod rlp;
pub mod snapshot;
pub mod storage;
pub mod transaction;
Expand Down
34 changes: 32 additions & 2 deletions src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
account::Account,
executor::{Executor, Wait},
pointer::Pointer,
rlp::DeferredRlpNode,
storage::value::{self, Value},
};
use alloy_primitives::{hex, StorageValue, B256, U256};
Expand Down Expand Up @@ -282,7 +284,12 @@ impl Node {
///
/// This will typically be a 33 byte prefixed keccak256 hash.
pub fn to_rlp_node(&self) -> RlpNode {
RlpNode::from_rlp(&self.rlp_encode())
RlpNode::from_rlp(&self.wait().rlp_encode())
}

pub fn to_deferred_rlp_node<E: Executor>(&self, executor: E) -> DeferredRlpNode {
let this = self.clone();
DeferredRlpNode::from_rlp_with(executor, move || this.wait().rlp_encode())
}

/// Returns the RLP encoding of the [Node].
Expand Down Expand Up @@ -319,6 +326,29 @@ impl Node {
}
}

impl Wait for Node {
type Output = Self;

fn wait(&self) -> &Self::Output {
match self.kind() {
NodeKind::AccountLeaf { ref storage_root, .. } => {
if let Some(storage_root) = storage_root {
storage_root.wait();
}
}
NodeKind::StorageLeaf { .. } => {}
NodeKind::Branch { ref children } => {
children.iter().for_each(|child| {
if let Some(child) = child {
child.wait();
}
});
}
}
self
}
}

/// This is the maximum possible RLP-encoded length of a node.
///
/// This value is derived from the maximum possible length of a branch node, which is the largest
Expand Down Expand Up @@ -648,7 +678,7 @@ pub fn encode_branch(children: &[Option<Pointer>], out: &mut dyn BufMut) -> usiz
// now encode the children
for child in children.iter() {
if let Some(child) = child {
out.put_slice(child.rlp());
out.put_slice(child.rlp().as_slice());
} else {
out.put_u8(EMPTY_STRING_CODE);
}
Expand Down
1 change: 1 addition & 0 deletions src/page/slotted_page/cell_pointer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::page::{Page, PageError};

// A pointer to a page cell, which encodes the offset and length as 12-bit numbers in 3 bytes.
#[derive(Copy, Clone)]
pub(crate) struct CellPointer<'p>(&'p [u8; 3]);

#[derive(Debug)]
Expand Down
59 changes: 40 additions & 19 deletions src/pointer.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
use alloy_trie::nodes::RlpNode;

use crate::{
executor::{Inline, Wait},
location::Location,
rlp::DeferredRlpNode,
storage::value::{self, Value},
};
use alloy_primitives::{B256, U256};
use alloy_rlp::encode;
use alloy_trie::nodes::RlpNode;
use proptest::prelude::*;
use proptest_derive::Arbitrary;

const HASH_FLAG: u8 = 0x1;

/// A pointer to a node in the trie.
/// This is a wrapper around a [Location] and an [RlpNode].
#[derive(Debug, Clone, PartialEq, Eq, Arbitrary)]
#[derive(Debug, PartialEq, Eq, Clone, Arbitrary)]
pub struct Pointer {
location: Location,
#[proptest(strategy = "u256_or_hash()")]
rlp: RlpNode,
rlp: DeferredRlpNode,
}

impl Pointer {
/// Creates a new [Pointer] from a [Location] and an [RlpNode].
#[inline]
#[must_use]
pub fn new(location: Location, rlp: RlpNode) -> Self {
Self::new_deferred(location, rlp.into())
}

#[inline]
#[must_use]
pub fn new_deferred(location: Location, rlp: DeferredRlpNode) -> Self {
Self { location, rlp }
}

/// Creates a new [Pointer] from a [Location] with an unhashed [RlpNode].
#[must_use]
pub fn new_unhashed(location: Location) -> Self {
Self { location, rlp: RlpNode::from_rlp(&[]) }
Self { location, rlp: RlpNode::from_rlp(&[]).into() }
}

/// Returns the [RlpNode] wrapped by the [Pointer].
pub fn rlp(&self) -> &RlpNode {
pub fn rlp(&self) -> &DeferredRlpNode {
&self.rlp
}

Expand All @@ -51,6 +62,15 @@ impl Pointer {
}
}

impl Wait for Pointer {
type Output = Self;

fn wait(&self) -> &Self::Output {
self.rlp.wait();
self
}
}

impl Value for Pointer {
fn size(&self) -> usize {
37 // Fixed size: 4 bytes location + 33 bytes max RLP
Expand All @@ -70,6 +90,7 @@ impl Value for Pointer {
let arr: [u8; 37] = bytes.try_into().map_err(|_| value::Error::InvalidEncoding)?;
let flags = arr[4];
let rlp = if flags & HASH_FLAG == HASH_FLAG {
debug_assert!(!(arr[5..37]).iter().all(|b| *b == 0), "read a hash of all zeros");
RlpNode::word_rlp(&B256::from_slice(&arr[5..37]))
} else {
// Because the RLP string must be 1-32 bytes, we can safely use the first byte to
Expand Down Expand Up @@ -111,7 +132,7 @@ impl From<&Pointer> for [u8; 37] {
// Determine flags and content
let rlp = pointer.rlp();
let (flags, content) =
if rlp.is_hash() { (HASH_FLAG, &rlp[1..]) } else { (0, rlp.as_ref()) };
if rlp.is_hash() { (HASH_FLAG, &rlp.as_slice()[1..]) } else { (0, rlp.as_slice()) };

data[4] = flags;
let content_len = content.len().min(33);
Expand All @@ -120,26 +141,26 @@ impl From<&Pointer> for [u8; 37] {
}
}

fn u256_or_hash() -> impl Strategy<Value = RlpNode> {
fn u256_or_hash() -> impl Strategy<Value = DeferredRlpNode> {
prop_oneof![arb_u256_rlp(), arb_hash_rlp(),]
}

fn arb_u256_rlp() -> impl Strategy<Value = RlpNode> {
any::<U256>().prop_map(|u| RlpNode::from_rlp(&encode(u))).boxed()
fn arb_u256_rlp() -> impl Strategy<Value = DeferredRlpNode> {
any::<U256>().prop_map(|u| DeferredRlpNode::from_rlp(Inline, encode(u))).boxed()
}

fn arb_hash_rlp() -> impl Strategy<Value = RlpNode> {
any::<B256>().prop_map(|h: B256| RlpNode::word_rlp(&h)).boxed()
fn arb_hash_rlp() -> impl Strategy<Value = DeferredRlpNode> {
any::<B256>().prop_map(|h: B256| DeferredRlpNode::word_rlp(&h)).boxed()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::executor::Wait;
use alloy_primitives::hex;
use alloy_rlp::encode;
use alloy_trie::EMPTY_ROOT_HASH;

use super::*;

#[test]
fn test_pointer_to_bytes() {
let rlp_hash = RlpNode::word_rlp(&EMPTY_ROOT_HASH);
Expand Down Expand Up @@ -186,33 +207,33 @@ mod tests {
rlp_hash_bytes.extend(&EMPTY_ROOT_HASH);
let pointer = Pointer::from_bytes(&rlp_hash_bytes).unwrap();
assert_eq!(pointer.location(), Location::for_cell(1));
assert_eq!(pointer.rlp(), &RlpNode::word_rlp(&EMPTY_ROOT_HASH));
assert_eq!(pointer.rlp().wait(), &RlpNode::word_rlp(&EMPTY_ROOT_HASH));

let mut short_rlp_bytes = vec![0, 0, 0, 1, 0, 42];
short_rlp_bytes.extend([0; 31]);
let pointer = Pointer::from_bytes(&short_rlp_bytes).unwrap();
assert_eq!(pointer.location(), Location::for_cell(1));
assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&encode(42u64)));
assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&encode(42u64)));

let mut zero_rlp_bytes = vec![0, 0, 0, 1, 0, 128];
zero_rlp_bytes.extend([0; 31]);
let pointer = Pointer::from_bytes(&zero_rlp_bytes).unwrap();
assert_eq!(pointer.location(), Location::for_cell(1));
assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&encode(0u64)));
assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&encode(0u64)));

let mut short_string_rlp_bytes = vec![0, 0, 0, 1, 0, 139];
short_string_rlp_bytes.extend(b"hello world");
short_string_rlp_bytes.extend([0; 20]);
let pointer = Pointer::from_bytes(&short_string_rlp_bytes).unwrap();
assert_eq!(pointer.location(), Location::for_cell(1));
assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&encode("hello world")));
assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&encode("hello world")));

let mut short_leaf_rlp_bytes =
vec![0, 0, 0, 1, 0, 0xc7, 0x83, 0x61, 0x62, 0x63, 0x82, 0x30, 0x39];
short_leaf_rlp_bytes.extend([0; 24]);
let pointer = Pointer::from_bytes(&short_leaf_rlp_bytes).unwrap();
assert_eq!(pointer.location(), Location::for_cell(1));
assert_eq!(pointer.rlp(), &RlpNode::from_rlp(&hex!("c783616263823039")));
assert_eq!(pointer.rlp().wait(), &RlpNode::from_rlp(&hex!("c783616263823039")));
}

proptest! {
Expand Down
Loading