Skip to content

Commit

Permalink
Offload older epoch start and checkpoint beacon states to storage
Browse files Browse the repository at this point in the history
  • Loading branch information
povi committed Jan 14, 2025
1 parent b69014a commit 4bf800e
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 116 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ binary_utils = { workspace = true }
bls = { workspace = true }
clock = { workspace = true }
criterion = { workspace = true }
database = { workspace = true }
easy-ext = { workspace = true }
eth2_cache_utils = { workspace = true }
eth2_libp2p = { workspace = true }
Expand Down
19 changes: 16 additions & 3 deletions benches/benches/fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use allocator as _;
use anyhow::Result;
use clock::Tick;
use criterion::{BatchSize, Criterion, Throughput};
use database::Database;
use easy_ext::ext;
use eth2_cache_utils::holesky::{self, CAPELLA_BEACON_STATE};
use execution_engine::NullExecutionEngine;
use fork_choice_control::{Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL};
use fork_choice_store::{
ApplyBlockChanges, ApplyTickChanges, AttestationAction, AttestationItem, AttestationOrigin,
BlockAction, Store, StoreConfig, ValidAttestation,
Expand Down Expand Up @@ -63,11 +65,19 @@ impl Criterion {
.into_iter()
.exactly_one()?;

let storage = Arc::new(Storage::new(
config.clone_arc(),
Database::in_memory(),
DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
false,
));

let mut store = Store::new(
config.clone_arc(),
StoreConfig::default(),
anchor_block,
anchor_state,
storage,
false,
);

Expand Down Expand Up @@ -137,7 +147,7 @@ impl Criterion {
}
}

fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
fn process_slot<P: Preset>(store: &mut Store<P, Storage<P>>, slot: Slot) -> Result<()> {
let Some(changes) = store.apply_tick(Tick::start_of_slot(slot))? else {
panic!("tick at slot {slot} should be later than the current one")
};
Expand All @@ -149,7 +159,10 @@ fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
Ok(())
}

fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<P>>) -> Result<()> {
fn process_block<P: Preset>(
store: &mut Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
) -> Result<()> {
let slot = block.message().slot();

let block_action = store.validate_block(
Expand Down Expand Up @@ -191,7 +204,7 @@ fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<
}

fn process_attestation<P: Preset>(
store: &mut Store<P>,
store: &mut Store<P, Storage<P>>,
attestation: Arc<Attestation<P>>,
) -> Result<()> {
let slot = attestation.data().slot;
Expand Down
6 changes: 4 additions & 2 deletions fork_choice_control/src/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use types::{
traits::{BeaconBlock as _, SignedBeaconBlock as _},
};

use crate::Storage;

#[derive(Constructor)]
pub struct BlockProcessor<P: Preset> {
chain_config: Arc<ChainConfig>,
Expand Down Expand Up @@ -158,7 +160,7 @@ impl<P: Preset> BlockProcessor<P> {

pub fn validate_block_for_gossip(
&self,
store: &Store<P>,
store: &Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
) -> Result<Option<BlockAction<P>>> {
store.validate_block_for_gossip(block, |parent| {
Expand All @@ -178,7 +180,7 @@ impl<P: Preset> BlockProcessor<P> {

pub fn validate_block<E: ExecutionEngine<P> + Send>(
&self,
store: &Store<P>,
store: &Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
state_root_policy: StateRootPolicy,
execution_engine: E,
Expand Down
7 changes: 4 additions & 3 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::{

pub struct Controller<P: Preset, E, A, W: Wait> {
// The latest consistent snapshot of the store.
store_snapshot: Arc<ArcSwap<Store<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
block_processor: Arc<BlockProcessor<P>>,
execution_engine: E,
state_cache: Arc<StateCacheProcessor<P>>,
Expand Down Expand Up @@ -114,6 +114,7 @@ where
store_config,
anchor_block,
anchor_state,
storage.clone_arc(),
finished_initial_forward_sync,
);

Expand Down Expand Up @@ -520,11 +521,11 @@ where
self.store_snapshot().store_config()
}

pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P>>> {
pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P, Storage<P>>>> {
self.store_snapshot.load()
}

pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P>> {
pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P, Storage<P>>> {
self.store_snapshot.load_full()
}

Expand Down
46 changes: 40 additions & 6 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ use crate::{

#[expect(clippy::struct_field_names)]
pub struct Mutator<P: Preset, E, W, TS, PS, LS, NS, SS, VS> {
store: Arc<Store<P>>,
store_snapshot: Arc<ArcSwap<Store<P>>>,
store: Arc<Store<P, Storage<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
block_processor: Arc<BlockProcessor<P>>,
event_channels: Arc<EventChannels>,
Expand Down Expand Up @@ -136,7 +136,7 @@ where
{
#[expect(clippy::too_many_arguments)]
pub fn new(
store_snapshot: Arc<ArcSwap<Store<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
block_processor: Arc<BlockProcessor<P>>,
event_channels: Arc<EventChannels>,
Expand Down Expand Up @@ -1428,8 +1428,42 @@ where
if misc::is_epoch_start::<P>(head_slot) {
info!("unloading old beacon states (head slot: {head_slot})");

self.store_mut()
let unloaded = self
.store_mut()
.unload_old_states(unfinalized_states_in_memory);

let unloaded_checkpoint_states = self
.store_mut()
.unload_checkpoint_states(unfinalized_states_in_memory);

let store = self.owned_store();
let storage = self.storage.clone_arc();
let wait_group = wait_group.clone();

Builder::new()
.name("store-unloader".to_owned())
.spawn(move || {
debug!("persisting unloaded old beacon states…");

let states_with_block_roots = unloaded
.iter()
.map(|chain_link| (chain_link.state(&store), chain_link.block_root))
.chain(unloaded_checkpoint_states);

match storage.append_states(states_with_block_roots) {
Ok(slots) => {
debug!(
"unloaded old beacon states persisted \
(state slots: {slots:?})",
)
}
Err(error) => {
error!("persisting unloaded old beacon states to storage failed: {error:?}")
}
}

drop(wait_group);
})?;
}

let processing_duration = insertion_time.duration_since(submission_time);
Expand Down Expand Up @@ -2350,7 +2384,7 @@ where
self.thread_pool.spawn(task);
}

fn store_mut(&mut self) -> &mut Store<P> {
fn store_mut(&mut self) -> &mut Store<P, Storage<P>> {
self.store.make_mut()
}

Expand All @@ -2359,7 +2393,7 @@ where
// faster to clone a `Store` with all the `Arc`s inside it and allocate another `Arc`.
//
// As a result, this method should only be called when `Mutator.store` is in a consistent state.
fn owned_store(&self) -> Arc<Store<P>> {
fn owned_store(&self) -> Arc<Store<P, Storage<P>>> {
self.store.clone_arc()
}

Expand Down
2 changes: 1 addition & 1 deletion fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ pub struct BlockWithRoot<P: Preset> {
pub struct Snapshot<'storage, P: Preset> {
// Use a `Guard` instead of an owned snapshot unlike in tasks based on the intuition that
// `Snapshot`s will be less common than tasks.
store_snapshot: Guard<Arc<Store<P>>>,
store_snapshot: Guard<Arc<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
storage: &'storage Storage<P>,
}
Expand Down
46 changes: 35 additions & 11 deletions fork_choice_control/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,26 @@ pub enum StateLoadStrategy<P: Preset> {
}

#[expect(clippy::struct_field_names)]
#[derive(Clone)]
pub struct Storage<P> {
config: Arc<Config>,
pub(crate) database: Database,
pub(crate) database: Arc<Database>,
pub(crate) archival_epoch_interval: NonZeroU64,
prune_storage: bool,
phantom: PhantomData<P>,
}

impl<P: Preset> Storage<P> {
#[must_use]
pub const fn new(
pub fn new(
config: Arc<Config>,
database: Database,
archival_epoch_interval: NonZeroU64,
prune_storage: bool,
) -> Self {
Self {
config,
database,
database: Arc::new(database),
archival_epoch_interval,
prune_storage,
phantom: PhantomData,
Expand Down Expand Up @@ -222,7 +223,7 @@ impl<P: Preset> Storage<P> {
&self,
unfinalized: impl Iterator<Item = &'cl ChainLink<P>>,
finalized: impl DoubleEndedIterator<Item = &'cl ChainLink<P>>,
store: &Store<P>,
store: &Store<P, Self>,
) -> Result<AppendedBlockSlots> {
let mut slots = AppendedBlockSlots::default();
let mut store_head_slot = 0;
Expand Down Expand Up @@ -353,6 +354,25 @@ impl<P: Preset> Storage<P> {
Ok(persisted_blob_ids)
}

pub(crate) fn append_states(
&self,
states_with_block_roots: impl Iterator<Item = (Arc<BeaconState<P>>, H256)>,
) -> Result<Vec<Slot>> {
let mut slots = vec![];
let mut batch = vec![];

for (state, block_root) in states_with_block_roots {
if !self.contains_key(StateByBlockRoot(block_root))? {
slots.push(state.slot());
batch.push(serialize(StateByBlockRoot(block_root), state)?);
}
}

self.database.put_batch(batch)?;

Ok(slots)
}

pub(crate) fn blob_sidecar_by_id(
&self,
blob_id: BlobIdentifier,
Expand Down Expand Up @@ -405,7 +425,7 @@ impl<P: Preset> Storage<P> {
Ok(None)
}

pub(crate) fn genesis_block_root(&self, store: &Store<P>) -> Result<H256> {
pub(crate) fn genesis_block_root(&self, store: &Store<P, Self>) -> Result<H256> {
self.block_root_by_slot_with_store(store, GENESIS_SLOT)?
.ok_or(Error::GenesisBlockRootNotFound)
.map_err(Into::into)
Expand Down Expand Up @@ -448,7 +468,7 @@ impl<P: Preset> Storage<P> {
// Like `block_root_by_slot`, but looks for the root in `store` first.
pub(crate) fn block_root_by_slot_with_store(
&self,
store: &Store<P>,
store: &Store<P, Self>,
slot: Slot,
) -> Result<Option<H256>> {
if let Some(chain_link) = store.chain_link_before_or_at(slot) {
Expand Down Expand Up @@ -551,7 +571,7 @@ impl<P: Preset> Storage<P> {

pub(crate) fn dependent_root(
&self,
store: &Store<P>,
store: &Store<P, Self>,
state: &BeaconState<P>,
epoch: Epoch,
) -> Result<H256> {
Expand Down Expand Up @@ -711,10 +731,8 @@ impl<P: Preset> Storage<P> {

itertools::process_results(results, |pairs| {
pairs
.take_while(|(key_bytes, _)| {
FinalizedBlockByRoot::has_prefix(key_bytes)
&& !UnfinalizedBlockByRoot::has_prefix(key_bytes)
})
.take_while(|(key_bytes, _)| FinalizedBlockByRoot::has_prefix(key_bytes))
.filter(|(key_bytes, _)| !UnfinalizedBlockByRoot::has_prefix(key_bytes))
.count()
})
}
Expand Down Expand Up @@ -744,6 +762,12 @@ impl<P: Preset> Storage<P> {
}
}

impl<P: Preset> fork_choice_store::Storage<P> for Storage<P> {
fn stored_state_by_block_root(&self, block_root: H256) -> Result<Option<Arc<BeaconState<P>>>> {
self.state_by_block_root(block_root)
}
}

#[derive(Default, Debug)]
pub struct AppendedBlockSlots {
pub finalized: Vec<Slot>,
Expand Down
Loading

0 comments on commit 4bf800e

Please sign in to comment.