diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 8fe62d6e95..fba1307e4c 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -292,7 +292,7 @@ impl Syncer { pivot_header.number ); let store_bodies_handle = tokio::spawn(store_block_bodies( - all_block_hashes[pivot_idx + 1..].to_vec(), + all_block_hashes[pivot_idx..].to_vec(), self.peers.clone(), store.clone(), )); diff --git a/crates/networking/p2p/sync/state_healing.rs b/crates/networking/p2p/sync/state_healing.rs index 54fb2ec039..6312f61cad 100644 --- a/crates/networking/p2p/sync/state_healing.rs +++ b/crates/networking/p2p/sync/state_healing.rs @@ -35,6 +35,12 @@ pub(crate) async fn heal_state_trie( peers: PeerHandler, ) -> Result { let mut paths = store.get_state_heal_paths().await?.unwrap_or_default(); + debug!( + "Starting state healing, pre-existing paths: {}", + paths.len() + ); + let healing_start = Instant::now(); + let mut total_healed: usize = 0; // Spawn a bytecode fetcher for this block let (bytecode_sender, bytecode_receiver) = channel::>(MAX_CHANNEL_MESSAGES); let bytecode_fetcher_handle = tokio::spawn(bytecode_fetcher( @@ -48,7 +54,17 @@ pub(crate) async fn heal_state_trie( while !paths.is_empty() { if last_update.elapsed() >= SHOW_PROGRESS_INTERVAL_DURATION { last_update = Instant::now(); - info!("State Healing in Progress, pending paths: {}", paths.len()); + let speed = healing_start + .elapsed() + .as_millis() + .checked_div((total_healed / 100) as u128); + if let Some(speed) = speed { + info!( + "State Healing in Progress, pending paths: {}, healing speed: {}ms/100nodes", + paths.len(), + speed + ); + } } // Spawn multiple parallel requests let mut state_tasks = tokio::task::JoinSet::new(); @@ -70,7 +86,8 @@ pub(crate) async fn heal_state_trie( // Process the results of each batch let mut stale = false; for res in state_tasks.join_all().await { - let (return_paths, is_stale) = res?; + let (return_paths, is_stale, nodes_healed) = res?; + total_healed += nodes_healed; stale |= is_stale; paths.extend(return_paths); } @@ -99,11 +116,12 @@ async fn heal_state_batch( peers: PeerHandler, store: Store, bytecode_sender: Sender>, -) -> Result<(Vec, bool), SyncError> { +) -> Result<(Vec, bool, usize), SyncError> { if let Some(nodes) = peers .request_state_trienodes(state_root, batch.clone()) .await { + let nodes_received = nodes.len(); debug!("Received {} state nodes", nodes.len()); let mut hashed_addresses = vec![]; let mut code_hashes = vec![]; @@ -139,15 +157,17 @@ async fn heal_state_batch( } } // Write nodes to trie - trie.db().put_batch( - nodes - .into_iter() - .filter_map(|node| match node.compute_hash() { - hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())), - NodeHash::Inline(_) => None, - }) - .collect(), - )?; + trie.db() + .put_batch_async( + nodes + .into_iter() + .filter_map(|node| match node.compute_hash() { + hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())), + NodeHash::Inline(_) => None, + }) + .collect(), + ) + .await?; } // Send storage & bytecode requests if !hashed_addresses.is_empty() { @@ -163,8 +183,8 @@ async fn heal_state_batch( if !code_hashes.is_empty() { bytecode_sender.send(code_hashes).await?; } - Ok((batch, false)) + Ok((batch, false, nodes_received)) } else { - Ok((batch, true)) + Ok((batch, true, 0)) } } diff --git a/crates/networking/p2p/sync/storage_healing.rs b/crates/networking/p2p/sync/storage_healing.rs index d0d2122347..57d90944aa 100644 --- a/crates/networking/p2p/sync/storage_healing.rs +++ b/crates/networking/p2p/sync/storage_healing.rs @@ -6,7 +6,7 @@ //! Even if the pivot becomes stale, the healer will remain active and listening until a termination signal (an empty batch) is received use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -44,13 +44,22 @@ pub(crate) async fn storage_healer( let mut pending_paths = BTreeMap::>::new(); let mut stale = false; let mut last_update = Instant::now(); + let healing_start = Instant::now(); + let mut total_healed = 0; while !(stale || cancel_token.is_cancelled()) { if last_update.elapsed() >= SHOW_PROGRESS_INTERVAL_DURATION { last_update = Instant::now(); - info!( - "Storage Healing in Progress, storages queued: {}", - pending_paths.len() - ); + let speed = healing_start + .elapsed() + .as_millis() + .checked_div((total_healed / 100) as u128); + if let Some(speed) = speed { + info!( + "Storage Healing in Progress, pending paths: {}, healing speed: {}ms/100nodes", + pending_paths.len(), + speed + ); + } } // If we have few storages in queue, fetch more from the store // We won't be retrieving all of them as the read can become quite long and we may not end up using all of the paths in this cycle @@ -90,7 +99,8 @@ pub(crate) async fn storage_healer( } // Add unfetched paths to queue and handle stale signal for res in storage_tasks.join_all().await { - let (remaining, is_stale) = res?; + let (remaining, is_stale, nodes_healed) = res?; + total_healed += nodes_healed; pending_paths.extend(remaining); stale |= is_stale; } @@ -111,44 +121,52 @@ async fn heal_storage_batch( mut batch: BTreeMap>, peers: PeerHandler, store: Store, -) -> Result<(BTreeMap>, bool), SyncError> { +) -> Result<(BTreeMap>, bool, usize), SyncError> { if let Some(mut nodes) = peers .request_storage_trienodes(state_root, batch.clone()) .await { + let nodes_received = nodes.len(); debug!("Received {} storage nodes", nodes.len()); - // Process the nodes for each account path + // Sort nodes by trie & update current batch + let mut nodes_to_commit = HashMap::new(); for (acc_path, paths) in batch.iter_mut() { let trie = store.open_storage_trie(*acc_path, *EMPTY_TRIE_HASH)?; - // Get the corresponding nodes - let trie_nodes: Vec = - nodes.drain(..paths.len().min(nodes.len())).collect(); - // Update batch: remove fetched paths & add children - let children = trie_nodes + // Collect fetched nodes for that particular trie + let trie_nodes = nodes + .drain(..paths.len().min(nodes.len())) + .collect::>(); + // Collect missing children paths for the fetched nodes (And also remove the fetched paths from the batch) + let missing_children = trie_nodes .iter() .zip(paths.drain(..trie_nodes.len())) .map(|(node, path)| node_missing_children(node, &path, trie.db())) .collect::, _>>()?; - paths.extend(children.into_iter().flatten()); - // Write nodes to trie - trie.db().put_batch( - nodes - .iter() - .filter_map(|node| match node.compute_hash() { + // Add the missing children paths of the nodes we fetched to the batch + paths.extend(missing_children.into_iter().flatten()); + // Push nodes to commit list + let trie_nodes = trie_nodes + .into_iter() + .filter_map(|node| { + match node.compute_hash() { hash @ NodeHash::Hashed(_) => Some((hash, node.encode_to_vec())), + // Filter out inline nodes NodeHash::Inline(_) => None, - }) - .collect(), - )?; + } + }) + .collect(); + nodes_to_commit.insert(*acc_path, trie_nodes); + if nodes.is_empty() { break; } } + store.commit_storage_nodes(nodes_to_commit).await?; // Return remaining and added paths to be added to the queue // Filter out the storages we completely fetched batch.retain(|_, v| !v.is_empty()); - return Ok((batch, false)); + return Ok((batch, false, nodes_received)); } // Pivot became stale, lets inform the fetcher - Ok((batch, true)) + Ok((batch, true, 0)) } diff --git a/crates/storage/trie_db/libmdbx_dupsort.rs b/crates/storage/trie_db/libmdbx_dupsort.rs index 16994e2ff5..f3b0a4417a 100644 --- a/crates/storage/trie_db/libmdbx_dupsort.rs +++ b/crates/storage/trie_db/libmdbx_dupsort.rs @@ -32,10 +32,11 @@ where } } +#[async_trait::async_trait] impl TrieDB for LibmdbxDupsortTrieDB where T: DupSort>, - SK: Clone + Encodable, + SK: Clone + Encodable + 'static, { fn get(&self, key: NodeHash) -> Result>, TrieError> { let txn = self.db.begin_read().map_err(TrieError::DbError)?; @@ -54,6 +55,21 @@ where } txn.commit().map_err(TrieError::DbError) } + + async fn put_batch_async(&self, key_values: Vec<(NodeHash, Vec)>) -> Result<(), TrieError> { + let db = self.db.clone(); + let fixed_key = self.fixed_key.clone(); + tokio::task::spawn_blocking(move || { + let txn = db.begin_readwrite().map_err(TrieError::DbError)?; + for (key, value) in key_values { + txn.upsert::((fixed_key.clone(), node_hash_to_fixed_size(key)), value) + .map_err(TrieError::DbError)?; + } + txn.commit().map_err(TrieError::DbError) + }) + .await + .map_err(|e| TrieError::DbError(e.into()))? + } } #[cfg(test)]