Skip to content

Commit

Permalink
Remaining fixes and cleanup from feature/in-memory-db
Browse files Browse the repository at this point in the history
We were unable to finish these before open-sourcing.

Fixes:

- Update payload statuses of finalized blocks in Store::set_block_ancestor_payload_statuses.

- Restrict Store::latest_archivable_index to valid blocks to prevent blocks from getting lost when archiving.

  Rely on the fix in Store::set_block_ancestor_payload_statuses to make binary search possible.

  The contents of Context.payload_statuses no longer have to be submitted before Context.extra_blocks.
  Snapshot tests pass either way, though some behavior is different behind the scenes.
  Submitting payload statuses afterwards prevents archiving, making all blocks remain in memory.

- Revert unnecessary changes in grandine-snapshot-tests and http_api::context.

  Mainnet blocks do not have to be treated as valid.
  It made sense, but it was a workaround to prevent archived blocks from being lost.
  The bug in Store that caused that is now fixed.

- Limit the Storage case of Controller::preprocessed_state_post_block to max_empty_slots.

- Make in-memory mode apply to all databases.
  Some databases were always persistent due to a mistake during the rewrite to the enum-based design.
  There were 2 in runtime::run_after_genesis and 1 in grandine::Context::run.

Cleanup:

- Remove justified block check from Store::unload_old_states.

  Document why it's neither necessary nor sufficient.

- Simplify and finish documenting test does_not_unload_states_that_may_become_anchors.

  Block 4 was unnecessary.
  The last block (previously block 6, now block 4) triggers unloading too.
  It must be at the start of an epoch because unloading is only done when the head is in that position.

  Block 2 was also unnecessary.
  The next block (previously block 3, now block 2) must be in epoch 2 or later to make its attestations count.
  This is not an obstacle in a real network.

  If block 2 is removed, block 1 can be moved into epoch 2 as well.
  No slots have to be empty in that case.

- Move PayloadStatus from fork_choice_store::misc to types::nonstandard.

  The change was already made but reverted when feature/deneb3 was merged into integration.

- Rename StoreConfig::minimal to aggressive and document it.

  The new name is unambiguous and fits better.
  StoreConfig::aggressive sets only one field to a non-default value and the value is not truly minimal.

- Abandon idea to poison WaitGroup and revert partial implementation of it.

  Tests in fork_choice_control that make the mutator panic sometimes hang.
  The hanging makes it hard to tell if the tests failed and necessitates killing them manually.
  The poisoning was intended to make them reliably panic instead of hanging.

  Reimplementing WaitGroup to poison the mutex inside seemed to work at first, but it's still unreliable.

- Remove pointless calls to Store::is_slot_finalized.

- Remove Snapshot::state_before_or_at_slot.

  It was only called by the Controller method of the same name.

- Replace Bytes in database with Arc<[u8]> and document why.

- Use Connection::pragma_update for remaining pragmas.

Documentation improvements:

- Move comment about nomenclature to the root of fork_choice_store and define unloading in it.

  A glossary does not belong near Store::archive_finalized.
  That may have been the right place when the only distinction was between pruning and archiving.
  The doc comment for the crate is a better place for it.

- Document SQLite's treatment of invalid values more accurately.
  • Loading branch information
weekday-grandine-io committed Mar 28, 2024
1 parent cc27cd2 commit 47e697d
Show file tree
Hide file tree
Showing 28 changed files with 408 additions and 414 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 17 additions & 11 deletions ad_hoc_bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use rand::seq::SliceRandom as _;
use std_ext::ArcExt as _;
use types::{
combined::{BeaconState, SignedBeaconBlock},
config::Config,
config::Config as ChainConfig,
phase0::{consts::GENESIS_SLOT, primitives::Slot},
preset::Preset,
traits::{BeaconState as _, SignedBeaconBlock as _},
Expand Down Expand Up @@ -251,37 +251,37 @@ fn main() -> Result<()> {

match options.blocks.into() {
Chain::Mainnet => run(
Config::mainnet(),
ChainConfig::mainnet(),
options,
mainnet::beacon_state,
mainnet::beacon_blocks,
),
Chain::Medalla => run(
Config::medalla(),
ChainConfig::medalla(),
options,
medalla::beacon_state,
medalla::beacon_blocks,
),
Chain::Goerli => run(
Config::goerli(),
ChainConfig::goerli(),
options,
goerli::beacon_state,
goerli::beacon_blocks,
),
Chain::Withdrawals => run(
Config::withdrawal_devnet_4(),
ChainConfig::withdrawal_devnet_4(),
options,
withdrawal_devnet_4::beacon_state,
withdrawal_devnet_4::beacon_blocks,
),
Chain::Holesky => run(
Config::holesky(),
ChainConfig::holesky(),
options,
holesky::beacon_state,
holesky::beacon_blocks,
),
Chain::HoleskyDevnet => run(
Config::holesky_devnet(),
ChainConfig::holesky_devnet(),
options,
holesky_devnet::beacon_state,
holesky_devnet::beacon_blocks,
Expand All @@ -297,7 +297,7 @@ fn main() -> Result<()> {
#[allow(clippy::float_arithmetic)]
#[allow(clippy::too_many_lines)]
fn run<P: Preset>(
config: Config,
chain_config: ChainConfig,
options: Options,
beacon_state: impl FnOnce(Slot, usize) -> Arc<BeaconState<P>>,
beacon_blocks: impl FnOnce(RangeInclusive<Slot>, usize) -> Vec<Arc<SignedBeaconBlock<P>>>,
Expand Down Expand Up @@ -327,7 +327,7 @@ fn run<P: Preset>(
.message()
.hash_tree_root();

let config = Arc::new(config);
let chain_config = Arc::new(chain_config);

let store_config = StoreConfig {
unfinalized_states_in_memory,
Expand All @@ -343,10 +343,16 @@ fn run<P: Preset>(
let (p2p_tx, p2p_rx) = futures::channel::mpsc::unbounded();

let (controller, _mutator_handle) = if use_block_verification_pool {
AdHocBenchController::with_p2p_tx(config, store_config, anchor_block, anchor_state, p2p_tx)
AdHocBenchController::with_p2p_tx(
chain_config,
store_config,
anchor_block,
anchor_state,
p2p_tx,
)
} else {
AdHocBenchController::with_p2p_tx(
config,
chain_config,
store_config,
anchor_block,
anchor_state,
Expand Down
2 changes: 1 addition & 1 deletion database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ workspace = true

[dependencies]
anyhow = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
fs-err = { workspace = true }
im = { workspace = true }
itertools = { workspace = true }
libmdbx = { workspace = true }
log = { workspace = true }
snap = { workspace = true }
std_ext = { workspace = true }
tap = { workspace = true }
thiserror = { workspace = true }
unwrap_none = { workspace = true }
Expand Down
63 changes: 38 additions & 25 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
// TODO(feature/in-memory-db): Minimize changes from `develop`.

use core::ops::{Range, RangeFrom, RangeToInclusive};
use std::{borrow::Cow, path::Path, sync::Mutex};
use std::{
borrow::Cow,
path::Path,
sync::{Arc, Mutex},
};

use anyhow::Result;
use bytes::Bytes;
use bytesize::ByteSize;
use im::OrdMap;
use itertools::Either;
use libmdbx::{DatabaseFlags, Environment, Geometry, WriteFlags};
use log::info;
use snap::raw::{Decoder, Encoder};
use std_ext::ArcExt as _;
use tap::Pipe as _;
use thiserror::Error;
use unwrap_none::UnwrapNone as _;
Expand All @@ -21,7 +23,7 @@ const MAX_NAMED_DATABASES: usize = 10;
pub struct Database(DatabaseKind);

impl Database {
pub fn persistent(name: &str, directory: impl AsRef<Path>, size: ByteSize) -> Result<Self> {
pub fn persistent(name: &str, directory: impl AsRef<Path>, max_size: ByteSize) -> Result<Self> {
// If a database with the legacy name exists, keep using it.
// Otherwise, create a new database with the specified name.
// This check will not force existing users to resync.
Expand All @@ -34,7 +36,7 @@ impl Database {
let environment = Environment::builder()
.set_max_dbs(MAX_NAMED_DATABASES)
.set_geometry(Geometry {
size: Some(..usize::try_from(size.as_u64())?),
size: Some(..usize::try_from(max_size.as_u64())?),
growth_step: Some(isize::try_from(GROWTH_STEP.as_u64())?),
shrink_threshold: None,
page_size: None,
Expand Down Expand Up @@ -138,7 +140,7 @@ impl Database {

if let Some((key, value)) = end_pair {
new_map
.insert(key.clone(), value.clone())
.insert(key.clone_arc(), value.clone_arc())
.expect_none("end_pair should have been discarded by OrdMap::split");
}

Expand Down Expand Up @@ -208,14 +210,13 @@ impl Database {

let mut cursor = transaction.cursor(&database)?;

let mut iterator = cursor
cursor
.set_range(start)
.transpose()
.into_iter()
.chain(core::iter::from_fn(move || cursor.next().transpose()))
.map(|result| decompress_pair(result?));

Either::Left(core::iter::from_fn(move || iterator.next()))
.map(|result| decompress_pair(result?))
.pipe(Either::Left)
}
DatabaseKind::InMemory { map } => {
let map = map.lock().expect("in-memory database mutex is poisoned");
Expand All @@ -224,7 +225,7 @@ impl Database {

if let Some((key, value)) = start_pair {
above
.insert(key.clone(), value.clone())
.insert(key.clone_arc(), value.clone_arc())
.expect_none("start_pair should have been discarded by OrdMap::split");
}

Expand Down Expand Up @@ -253,14 +254,13 @@ impl Database {

let mut cursor = transaction.cursor(&database)?;

let mut iterator = cursor
cursor
.set_key(end)
.transpose()
.into_iter()
.chain(core::iter::from_fn(move || cursor.prev().transpose()))
.map(|result| decompress_pair(result?));

Either::Left(core::iter::from_fn(move || iterator.next()))
.map(|result| decompress_pair(result?))
.pipe(Either::Left)
}
DatabaseKind::InMemory { map } => {
let map = map.lock().expect("in-memory database mutex is poisoned");
Expand All @@ -269,7 +269,7 @@ impl Database {

if let Some((key, value)) = end_pair {
below
.insert(key.clone(), value.clone())
.insert(key.clone_arc(), value.clone_arc())
.expect_none("end_pair should have been discarded by OrdMap::split");
}

Expand Down Expand Up @@ -312,7 +312,7 @@ impl Database {
let mut new_map = map.clone();

for (key, value) in pairs {
let key = Bytes::copy_from_slice(key.as_ref());
let key = key.as_ref().into();
let compressed = compress(value.as_ref())?.into();
new_map.insert(key, compressed);
}
Expand Down Expand Up @@ -396,20 +396,33 @@ enum DatabaseKind {
environment: Environment,
},
InMemory {
// TODO(feature/in-memory-db): Consider other types for binary data:
// - `Vec<u8>`
// - `std::sync::Arc<[u8]>`
// - `triomphe::Arc<[u8]>`
// - `Box<[u8]>`
// Alternatively, return `Bytes` instead of `Cow` and `Vec`.
map: Mutex<OrdMap<Bytes, Bytes>>,
// Various methods of `OrdMap` and `Database` clone the elements of this map,
// so they should be cheaply cloneable. This disqualifies `Vec<u8>` and `Box<[u8]>`.
//
// Various methods of `Database` return keys in the form of `Vec<u8>` or `Cow<[u8]>`.
// Converting between them and `Arc<[u8]>` is costly due to the reference count before data.
// Returning `Arc<[u8]>` from the methods would require a conversion in the persistent case
// because `libmdbx` cannot decode directly into `std::sync::Arc` or `triomphe::Arc`.
//
// `Bytes` can be cheaply converted to and from `Vec<u8>` if its capacity equals its length,
// but `Database` cannot benefit from that with its current API.
// Returning a `Vec<u8>` or `Cow<u8>` requires copying due to shared ownership.
// Writing requires copying due to the signature of `Database::put`.
//
// Some versions of `libmdbx` (including the one from `reth-libmdbx`) can decode into
// `lifetimed_bytes::Bytes`, which functions like `Cow<[u8]>`, but with the internal
// representation of `Bytes`. `lifetimed_bytes::Bytes` is necessarily distinct from
// `bytes::Bytes`, which makes it harder to use.
map: Mutex<InMemoryMap>,
},
}

#[derive(Debug, Error)]
#[error("database directory path should be a valid Unicode string")]
struct Error;

type InMemoryMap = OrdMap<Arc<[u8]>, Arc<[u8]>>;

fn compress(data: &[u8]) -> Result<Vec<u8>> {
Encoder::new().compress_vec(data).map_err(Into::into)
}
Expand Down
2 changes: 1 addition & 1 deletion eth2_libp2p
31 changes: 11 additions & 20 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use futures::channel::{mpsc::Sender as MultiSender, oneshot::Sender as OneshotSe
use genesis::GenesisProvider;
use prometheus_metrics::Metrics;
use std_ext::ArcExt as _;
use tap::TapFallible as _;
use thiserror::Error;
use types::{
combined::{BeaconState, SignedBeaconBlock},
Expand Down Expand Up @@ -139,36 +138,28 @@ where

mutator.process_unfinalized_blocks(unfinalized_blocks)?;

let wait_group = W::Swappable::default();
let join_handle = Builder::new().name("store-mutator".to_owned()).spawn(|| {
// The closure should be unwind safe.
// The synchronization primitives used by the mutator are unlikely to panic.
// The instance of `Store` used by the mutator may become inconsistent but cannot be
// observed because the shared snapshot is only updated with values that are consistent.
std::panic::catch_unwind(AssertUnwindSafe(move || mutator.run()))
.map_err(panics::payload_into_error)
.context(Error::MutatorPanicked)?
.context(Error::MutatorFailed)
})?;

let controller = Arc::new(Self {
store_snapshot,
execution_engine,
state_cache,
storage,
thread_pool,
wait_group: wait_group.clone(),
wait_group: W::Swappable::default(),
metrics,
mutator_tx: mutator_tx.clone(),
});

let thread_name = "store-mutator".to_owned();

// TODO(feature/in-memory-db): Move and/or rephrase comment if still relevant.
// Call `Wait::poison` to prevent tests from hanging if the mutator thread fails or panics.
let join_handle = Builder::new().name(thread_name).spawn(move || {
// The closure should be unwind safe.
// The synchronization primitives used by the mutator are unlikely to panic.
// The instance of `Store` used by the mutator may become inconsistent but cannot be
// observed because the shared snapshot is only updated with values that are consistent.
std::panic::catch_unwind(AssertUnwindSafe(move || mutator.run()))
.map_err(panics::payload_into_error)
.tap_err(|_| W::poison(&wait_group))
.context(Error::MutatorPanicked)?
.tap_err(|_| W::poison(&wait_group))
.context(Error::MutatorFailed)
})?;

let mutator_handle = MutatorHandle {
join_handle: Some(join_handle),
mutator_tx,
Expand Down
Loading

0 comments on commit 47e697d

Please sign in to comment.