Skip to content

feat(l1): measure & speed up healing during snap sync #3623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: snap-sync-improvements
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Syncer {
pivot_header.number
);
let store_bodies_handle = tokio::spawn(store_block_bodies(
all_block_hashes[pivot_idx + 1..].to_vec(),
all_block_hashes[pivot_idx..].to_vec(),
self.peers.clone(),
store.clone(),
));
Expand Down
48 changes: 34 additions & 14 deletions crates/networking/p2p/sync/state_healing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ pub(crate) async fn heal_state_trie(
peers: PeerHandler,
) -> Result<bool, SyncError> {
let mut paths = store.get_state_heal_paths().await?.unwrap_or_default();
debug!(
"Starting state healing, pre-existing paths: {}",
paths.len()
);
let healing_start = Instant::now();
let mut total_healed: usize = 0;
// Spawn a bytecode fetcher for this block
let (bytecode_sender, bytecode_receiver) = channel::<Vec<H256>>(MAX_CHANNEL_MESSAGES);
let bytecode_fetcher_handle = tokio::spawn(bytecode_fetcher(
Expand All @@ -48,7 +54,17 @@ pub(crate) async fn heal_state_trie(
while !paths.is_empty() {
if last_update.elapsed() >= SHOW_PROGRESS_INTERVAL_DURATION {
last_update = Instant::now();
info!("State Healing in Progress, pending paths: {}", paths.len());
let speed = healing_start
.elapsed()
.as_millis()
.checked_div((total_healed / 100) as u128);
if let Some(speed) = speed {
info!(
"State Healing in Progress, pending paths: {}, healing speed: {}ms/100nodes",
paths.len(),
speed
);
}
}
// Spawn multiple parallel requests
let mut state_tasks = tokio::task::JoinSet::new();
Expand All @@ -70,7 +86,8 @@ pub(crate) async fn heal_state_trie(
// Process the results of each batch
let mut stale = false;
for res in state_tasks.join_all().await {
let (return_paths, is_stale) = res?;
let (return_paths, is_stale, nodes_healed) = res?;
total_healed += nodes_healed;
stale |= is_stale;
paths.extend(return_paths);
}
Expand Down Expand Up @@ -99,11 +116,12 @@ async fn heal_state_batch(
peers: PeerHandler,
store: Store,
bytecode_sender: Sender<Vec<H256>>,
) -> Result<(Vec<Nibbles>, bool), SyncError> {
) -> Result<(Vec<Nibbles>, bool, usize), SyncError> {
if let Some(nodes) = peers
.request_state_trienodes(state_root, batch.clone())
.await
{
let nodes_received = nodes.len();
debug!("Received {} state nodes", nodes.len());
let mut hashed_addresses = vec![];
let mut code_hashes = vec![];
Expand Down Expand Up @@ -139,15 +157,17 @@ async fn heal_state_batch(
}
}
// Write nodes to trie
trie.db().put_batch(
nodes
.into_iter()
.filter_map(|node| match node.compute_hash() {
hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())),
NodeHash::Inline(_) => None,
})
.collect(),
)?;
trie.db()
.put_batch_async(
nodes
.into_iter()
.filter_map(|node| match node.compute_hash() {
hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())),
NodeHash::Inline(_) => None,
})
.collect(),
)
.await?;
}
// Send storage & bytecode requests
if !hashed_addresses.is_empty() {
Expand All @@ -163,8 +183,8 @@ async fn heal_state_batch(
if !code_hashes.is_empty() {
bytecode_sender.send(code_hashes).await?;
}
Ok((batch, false))
Ok((batch, false, nodes_received))
} else {
Ok((batch, true))
Ok((batch, true, 0))
}
}
66 changes: 42 additions & 24 deletions crates/networking/p2p/sync/storage_healing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! Even if the pivot becomes stale, the healer will remain active and listening until a termination signal (an empty batch) is received

use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -44,13 +44,22 @@ pub(crate) async fn storage_healer(
let mut pending_paths = BTreeMap::<H256, Vec<Nibbles>>::new();
let mut stale = false;
let mut last_update = Instant::now();
let healing_start = Instant::now();
let mut total_healed = 0;
while !(stale || cancel_token.is_cancelled()) {
if last_update.elapsed() >= SHOW_PROGRESS_INTERVAL_DURATION {
last_update = Instant::now();
info!(
"Storage Healing in Progress, storages queued: {}",
pending_paths.len()
);
let speed = healing_start
.elapsed()
.as_millis()
.checked_div((total_healed / 100) as u128);
if let Some(speed) = speed {
info!(
"Storage Healing in Progress, pending paths: {}, healing speed: {}ms/100nodes",
pending_paths.len(),
speed
);
}
}
// If we have few storages in queue, fetch more from the store
// We won't be retrieving all of them as the read can become quite long and we may not end up using all of the paths in this cycle
Expand Down Expand Up @@ -90,7 +99,8 @@ pub(crate) async fn storage_healer(
}
// Add unfetched paths to queue and handle stale signal
for res in storage_tasks.join_all().await {
let (remaining, is_stale) = res?;
let (remaining, is_stale, nodes_healed) = res?;
total_healed += nodes_healed;
pending_paths.extend(remaining);
stale |= is_stale;
}
Expand All @@ -111,44 +121,52 @@ async fn heal_storage_batch(
mut batch: BTreeMap<H256, Vec<Nibbles>>,
peers: PeerHandler,
store: Store,
) -> Result<(BTreeMap<H256, Vec<Nibbles>>, bool), SyncError> {
) -> Result<(BTreeMap<H256, Vec<Nibbles>>, bool, usize), SyncError> {
if let Some(mut nodes) = peers
.request_storage_trienodes(state_root, batch.clone())
.await
{
let nodes_received = nodes.len();
debug!("Received {} storage nodes", nodes.len());
// Process the nodes for each account path
// Sort nodes by trie & update current batch
let mut nodes_to_commit = HashMap::new();
for (acc_path, paths) in batch.iter_mut() {
let trie = store.open_storage_trie(*acc_path, *EMPTY_TRIE_HASH)?;
// Get the corresponding nodes
let trie_nodes: Vec<ethrex_trie::Node> =
nodes.drain(..paths.len().min(nodes.len())).collect();
// Update batch: remove fetched paths & add children
let children = trie_nodes
// Collect fetched nodes for that particular trie
let trie_nodes = nodes
.drain(..paths.len().min(nodes.len()))
.collect::<Vec<_>>();
// Collect missing children paths for the fetched nodes (And also remove the fetched paths from the batch)
let missing_children = trie_nodes
.iter()
.zip(paths.drain(..trie_nodes.len()))
.map(|(node, path)| node_missing_children(node, &path, trie.db()))
.collect::<Result<Vec<_>, _>>()?;
paths.extend(children.into_iter().flatten());
// Write nodes to trie
trie.db().put_batch(
nodes
.iter()
.filter_map(|node| match node.compute_hash() {
// Add the missing children paths of the nodes we fetched to the batch
paths.extend(missing_children.into_iter().flatten());
// Push nodes to commit list
let trie_nodes = trie_nodes
.into_iter()
.filter_map(|node| {
match node.compute_hash() {
hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())),
// Filter out inline nodes
NodeHash::Inline(_) => None,
})
.collect(),
)?;
}
})
.collect();
nodes_to_commit.insert(*acc_path, trie_nodes);

if nodes.is_empty() {
break;
}
}
store.commit_storage_nodes(nodes_to_commit).await?;
// Return remaining and added paths to be added to the queue
// Filter out the storages we completely fetched
batch.retain(|_, v| !v.is_empty());
return Ok((batch, false));
return Ok((batch, false, nodes_received));
}
// Pivot became stale, lets inform the fetcher
Ok((batch, true))
Ok((batch, true, 0))
}
18 changes: 17 additions & 1 deletion crates/storage/trie_db/libmdbx_dupsort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ where
}
}

#[async_trait::async_trait]
impl<T, SK> TrieDB for LibmdbxDupsortTrieDB<T, SK>
where
T: DupSort<Key = (SK, [u8; 33]), SeekKey = SK, Value = Vec<u8>>,
SK: Clone + Encodable,
SK: Clone + Encodable + 'static,
{
fn get(&self, key: NodeHash) -> Result<Option<Vec<u8>>, TrieError> {
let txn = self.db.begin_read().map_err(TrieError::DbError)?;
Expand All @@ -54,6 +55,21 @@ where
}
txn.commit().map_err(TrieError::DbError)
}

async fn put_batch_async(&self, key_values: Vec<(NodeHash, Vec<u8>)>) -> Result<(), TrieError> {
let db = self.db.clone();
let fixed_key = self.fixed_key.clone();
tokio::task::spawn_blocking(move || {
let txn = db.begin_readwrite().map_err(TrieError::DbError)?;
for (key, value) in key_values {
txn.upsert::<T>((fixed_key.clone(), node_hash_to_fixed_size(key)), value)
.map_err(TrieError::DbError)?;
}
txn.commit().map_err(TrieError::DbError)
})
.await
.map_err(|e| TrieError::DbError(e.into()))?
}
}

#[cfg(test)]
Expand Down
Loading