diff --git a/crates/networking/p2p/net.rs b/crates/networking/p2p/net.rs index 62eacc96bb..fd3907f231 100644 --- a/crates/networking/p2p/net.rs +++ b/crates/networking/p2p/net.rs @@ -596,7 +596,7 @@ async fn peers_revalidation( table.update_peer_ping_with_revalidation(peer.node.node_id, ping_hash); previously_pinged_peers.insert(peer.node.node_id); - debug!("Pinging peer {:?} to re-validate!", peer.node.node_id); + debug!("Pinging peer {} to re-validate!", peer.node.node_id); } debug!("Peer revalidation finished"); @@ -1029,7 +1029,7 @@ pub fn node_id_from_signing_key(signer: &SigningKey) -> H512 { /// Shows the amount of connected peers, active peers, and peers suitable for snap sync on a set interval pub async fn periodically_show_peer_stats(peer_table: Arc>) { - const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(60); + const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(120); let mut interval = tokio::time::interval(INTERVAL_DURATION); loop { peer_table.lock().await.show_peer_stats(); diff --git a/crates/networking/p2p/peer_channels.rs b/crates/networking/p2p/peer_channels.rs index 327070cf7d..aca46ef7c6 100644 --- a/crates/networking/p2p/peer_channels.rs +++ b/crates/networking/p2p/peer_channels.rs @@ -260,7 +260,7 @@ impl PeerChannels { /// Requests storage ranges for accounts given their hashed address and storage roots, and the root of their state trie /// account_hashes & storage_roots must have the same length /// storage_roots must not contain empty trie hashes, we will treat empty ranges as invalid responses - /// Returns true if the last accoun't storage was not completely fetched by the request + /// Returns true if the last account's storage was not completely fetched by the request /// Returns the list of hashed storage keys and values for each account's storage or None if: /// - There are no available peers (the node just started up or was rejected by all other nodes) /// - The response timed out @@ -447,4 +447,68 @@ impl PeerChannels { }) .flatten() } + + /// Requests a single storage range for an accouns given its hashed address and storage root, and the root of its state trie + /// This is a simplified version of `request_storage_range` meant to be used for large tries that require their own single requests + /// account_hashes & storage_roots must have the same length + /// storage_root must not be an empty trie hash, we will treat empty ranges as invalid responses + /// Returns true if the account's storage was not completely fetched by the request + /// Returns the list of hashed storage keys and values for the account's storage or None if: + /// - There are no available peers (the node just started up or was rejected by all other nodes) + /// - The response timed out + /// - The response was empty or not valid + pub async fn request_storage_range( + &self, + state_root: H256, + storage_root: H256, + account_hash: H256, + start: H256, + ) -> Option<(Vec, Vec, bool)> { + let request_id = rand::random(); + let request = RLPxMessage::GetStorageRanges(GetStorageRanges { + id: request_id, + root_hash: state_root, + account_hashes: vec![account_hash], + starting_hash: start, + limit_hash: HASH_MAX, + response_bytes: MAX_RESPONSE_BYTES, + }); + let mut receiver = self.receiver.lock().await; + self.sender.send(request).await.ok()?; + let (mut slots, proof) = tokio::time::timeout(PEER_REPLY_TIMOUT, async move { + loop { + match receiver.recv().await { + Some(RLPxMessage::StorageRanges(StorageRanges { id, slots, proof })) + if id == request_id => + { + return Some((slots, proof)) + } + // Ignore replies that don't match the expected id (such as late responses) + Some(_) => continue, + None => return None, + } + } + }) + .await + .ok()??; + // Check we got a reasonable amount of storage ranges + if slots.len() != 1 { + return None; + } + // Unzip & validate response + let proof = encodable_to_proof(&proof); + let (storage_keys, storage_values): (Vec, Vec) = slots + .remove(0) + .into_iter() + .map(|slot| (slot.hash, slot.data)) + .unzip(); + let encoded_values = storage_values + .iter() + .map(|val| val.encode_to_vec()) + .collect::>(); + // Verify storage range + let should_continue = + verify_range(storage_root, &start, &storage_keys, &encoded_values, &proof).ok()?; + Some((storage_keys, storage_values, should_continue)) + } } diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 7ad881242a..26b88dc16f 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -614,6 +614,10 @@ impl RLPxConnection { let msg_size = u16::from_be_bytes(ack_data) as usize; // Read the rest of the message + // Guard unwrap + if buf.len() < msg_size + 2 { + return Err(RLPxError::CryptographyError(String::from("bad buf size"))); + } self.framed .get_mut() .read_exact(&mut buf[2..msg_size + 2]) diff --git a/crates/networking/p2p/rlpx/handshake.rs b/crates/networking/p2p/rlpx/handshake.rs index 91ea09d414..33ba010de7 100644 --- a/crates/networking/p2p/rlpx/handshake.rs +++ b/crates/networking/p2p/rlpx/handshake.rs @@ -116,7 +116,9 @@ fn decrypt_message( // Verify the MAC. let expected_d = sha256_hmac(&mac_key, &[iv, c], size_data); - assert_eq!(d, expected_d); + if d != expected_d { + return Err(RLPxError::HandshakeError(String::from("Invalid MAC"))); + } // Decrypt the message with the AES key. let mut stream_cipher = Aes128Ctr64BE::new_from_slices(aes_key, iv)?; diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index f96372e854..3b0497c38c 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -1,7 +1,7 @@ use ethrex_blockchain::error::ChainError; use ethrex_core::{ types::{AccountState, Block, BlockHash, EMPTY_KECCACK_HASH}, - H256, + BigEndianHash, H256, U256, U512, }; use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode, error::RLPDecodeError}; use ethrex_storage::{error::StoreError, Store}; @@ -21,9 +21,13 @@ use crate::{peer_channels::PeerChannels, rlpx::p2p::Capability}; /// Maximum amount of times we will ask a peer for an account/storage range /// If the max amount of retries is exceeded we will asume that the state we are requesting is old and no longer available -const MAX_RETRIES: usize = 10; +const MAX_RETRIES: usize = 5; /// The minimum amount of blocks from the head that we want to full sync during a snap sync const MIN_FULL_BLOCKS: usize = 64; +/// Max size of a bach to stat a fetch request in queues +const BATCH_SIZE: usize = 300; +/// Max size of a bach to stat a fetch request in queues for nodes +const NODE_BATCH_SIZE: usize = 900; #[derive(Debug)] pub enum SyncMode { @@ -70,18 +74,19 @@ impl SyncManager { /// After the sync cycle is complete, the sync mode will be set to full /// If the sync fails, no error will be returned but a warning will be emitted /// [WARNING] Sync is done optimistically, so headers and bodies may be stored even if their data has not been fully synced if the sync is aborted halfway + /// [WARNING] Sync is currenlty simplified and will not download bodies + receipts previous to the pivot during snap sync pub async fn start_sync(&mut self, current_head: H256, sync_head: H256, store: Store) { info!("Syncing from current head {current_head} to sync_head {sync_head}"); let start_time = Instant::now(); match self.sync_cycle(current_head, sync_head, store).await { Ok(()) => { info!( - "Sync finished, time elapsed: {} secs", + "Sync cycle finished, time elapsed: {} secs", start_time.elapsed().as_secs() ); } Err(error) => warn!( - "Sync failed due to {error}, time elapsed: {} secs ", + "Sync cycle failed due to {error}, time elapsed: {} secs ", start_time.elapsed().as_secs() ), } @@ -98,7 +103,15 @@ impl SyncManager { // We will begin from the current head so that we download the earliest state first // This step is not parallelized let mut all_block_hashes = vec![]; - loop { + // Check if we have some blocks downloaded from a previous sync attempt + if matches!(self.sync_mode, SyncMode::Snap) { + if let Some(last_header) = store.get_header_download_checkpoint()? { + // Set latest downloaded header as current head for header fetching + current_head = last_header; + } + } + let mut retry_count = 0; + while retry_count <= MAX_RETRIES { let peer = get_peer_channel_with_retry(self.peers.clone(), Capability::Eth).await; debug!("Requesting Block Headers from {current_head}"); // Request Block Headers from Peer @@ -106,20 +119,42 @@ impl SyncManager { .request_block_headers(current_head, BlockRequestOrder::OldToNew) .await { - debug!("Received {} block headers", block_headers.len()); + retry_count = 0; + debug!( + "Received {} block headers| Last Number: {}", + block_headers.len(), + block_headers.last().as_ref().unwrap().number + ); let mut block_hashes = block_headers .iter() .map(|header| header.compute_block_hash()) .collect::>(); - // Discard the first header as we already have it - block_hashes.remove(0); - block_headers.remove(0); // Check if we already found the sync head let sync_head_found = block_hashes.contains(&sync_head); // Update current fetch head if needed if !sync_head_found { current_head = *block_hashes.last().unwrap(); } + if matches!(self.sync_mode, SyncMode::Snap) { + if !sync_head_found { + // Update snap state + store.set_header_download_checkpoint(current_head)?; + } else { + // If the sync head is less than 64 blocks away from our current head switch to full-sync + let last_header_number = block_headers.last().unwrap().number; + let latest_block_number = store.get_latest_block_number()?; + if last_header_number.saturating_sub(latest_block_number) + < MIN_FULL_BLOCKS as u64 + { + // Too few blocks for a snap sync, switching to full sync + store.clear_snap_state()?; + self.sync_mode = SyncMode::Full + } + } + } + // Discard the first header as we already have it + block_hashes.remove(0); + block_headers.remove(0); // Store headers and save hashes for full block retrieval all_block_hashes.extend_from_slice(&block_hashes[..]); store.add_block_headers(block_hashes, block_headers)?; @@ -128,6 +163,12 @@ impl SyncManager { // No more headers to request break; } + } else { + retry_count += 1; + } + if retry_count > MAX_RETRIES { + warn!("Sync failed to find target block header, aborting"); + return Ok(()); } } // We finished fetching all headers, now we can process them @@ -136,63 +177,42 @@ impl SyncManager { // snap-sync: launch tasks to fetch blocks and state in parallel // - Fetch each block's body and its receipt via eth p2p requests // - Fetch the pivot block's state via snap p2p requests - // - Execute blocks after the pivote (like in full-sync) + // - Execute blocks after the pivot (like in full-sync) + let pivot_idx = all_block_hashes.len().saturating_sub(MIN_FULL_BLOCKS); + let pivot_header = store + .get_block_header_by_hash(all_block_hashes[pivot_idx])? + .ok_or(SyncError::CorruptDB)?; + debug!( + "Selected block {} as pivot for snap sync", + pivot_header.number + ); let store_bodies_handle = tokio::spawn(store_block_bodies( - all_block_hashes.clone(), + all_block_hashes[pivot_idx + 1..].to_vec(), self.peers.clone(), store.clone(), )); - let mut pivot_idx = if all_block_hashes.len() > MIN_FULL_BLOCKS { - all_block_hashes.len() - MIN_FULL_BLOCKS - } else { - all_block_hashes.len() - 1 - }; - let mut pivot_header = store - .get_block_header_by_hash(all_block_hashes[pivot_idx])? - .ok_or(SyncError::CorruptDB)?; - let mut stale_pivot = + let stale_pivot = !rebuild_state_trie(pivot_header.state_root, self.peers.clone(), store.clone()) .await?; - // If the pivot became stale, set a further pivot and try again - if stale_pivot && pivot_idx != all_block_hashes.len() - 1 { - warn!("Stale pivot, switching to newer head"); - pivot_idx = all_block_hashes.len() - 1; - pivot_header = store - .get_block_header_by_hash(all_block_hashes[pivot_idx])? - .ok_or(SyncError::CorruptDB)?; - stale_pivot = !rebuild_state_trie( - pivot_header.state_root, - self.peers.clone(), - store.clone(), - ) - .await?; - } if stale_pivot { warn!("Stale pivot, aborting sync"); return Ok(()); } // Wait for all bodies to be downloaded store_bodies_handle.await??; - // For all blocks before the pivot: Store the bodies and fetch the receipts + // For all blocks before the pivot: Store the bodies and fetch the receipts (TODO) // For all blocks after the pivot: Process them fully - let store_receipts_handle = tokio::spawn(store_receipts( - all_block_hashes[pivot_idx..].to_vec(), - self.peers.clone(), - store.clone(), - )); - for hash in all_block_hashes.into_iter() { - let block = store.get_block_by_hash(hash)?.ok_or(SyncError::CorruptDB)?; - if block.header.number <= pivot_header.number { - store.set_canonical_block(block.header.number, hash)?; - store.add_block(block)?; - } else { - store.set_canonical_block(block.header.number, hash)?; - store.update_latest_block_number(block.header.number)?; - ethrex_blockchain::add_block(&block, &store)?; - } + for hash in &all_block_hashes[pivot_idx + 1..] { + let block = store + .get_block_by_hash(*hash)? + .ok_or(SyncError::CorruptDB)?; + store.set_canonical_block(block.header.number, *hash)?; + store.update_latest_block_number(block.header.number)?; + ethrex_blockchain::add_block(&block, &store)?; } - store_receipts_handle.await??; self.last_snap_pivot = pivot_header.number; + // Finished a sync cycle without aborting halfway, clear current checkpoint + store.clear_snap_state()?; // Next sync will be full-sync self.sync_mode = SyncMode::Full; } @@ -273,6 +293,8 @@ async fn store_block_bodies( } /// Fetches all receipts for the given block hashes via p2p and stores them +// TODO: remove allow when used again +#[allow(unused)] async fn store_receipts( mut block_hashes: Vec, peers: Arc>, @@ -297,12 +319,15 @@ async fn store_receipts( } /// Rebuilds a Block's state trie by requesting snap state from peers, also performs state healing +/// Receives an optional checkpoint in case there was a previous snap sync process that became stale, in which +/// case it will continue from the checkpoint and then apply healing to fix inconsistencies with the older state /// Returns true if all state was fetched or false if the block is too old and the state is no longer available async fn rebuild_state_trie( state_root: H256, peers: Arc>, store: Store, ) -> Result { + debug!("Rebuilding State Trie"); // Spawn storage & bytecode fetchers let (bytecode_sender, bytecode_receiver) = mpsc::channel::>(500); let (storage_sender, storage_receiver) = mpsc::channel::>(500); @@ -317,21 +342,37 @@ async fn rebuild_state_trie( store.clone(), state_root, )); - let mut start_account_hash = H256::zero(); - // Start from an empty state trie + // Resume download from checkpoint if available or start from an empty trie // We cannot keep an open trie here so we will track the root between lookups - let mut current_state_root = *EMPTY_TRIE_HASH; + let mut current_state_root = store + .get_state_trie_root_checkpoint()? + .unwrap_or(*EMPTY_TRIE_HASH); + let mut start_account_hash = store.get_state_trie_key_checkpoint()?.unwrap_or_default(); + debug!("Starting/Resuming state trie download from key {start_account_hash}"); // Fetch Account Ranges // If we reached the maximum amount of retries then it means the state we are requesting is probably old and no longer available let mut retry_count = 0; + let mut progress_timer = Instant::now(); + let initial_timestamp = Instant::now(); + let initial_account_hash = start_account_hash.into_uint(); + const PROGRESS_OUTPUT_TIMER: std::time::Duration = std::time::Duration::from_secs(30); while retry_count <= MAX_RETRIES { + // Show Progress stats (this task is not vital so we can detach it) + if Instant::now().duration_since(progress_timer) >= PROGRESS_OUTPUT_TIMER { + progress_timer = Instant::now(); + tokio::spawn(show_progress( + start_account_hash, + initial_account_hash, + initial_timestamp, + )); + } let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - debug!("Requesting Account Range for state root {state_root}, starting hash: {start_account_hash}"); if let Some((account_hashes, accounts, should_continue)) = peer .request_account_range(state_root, start_account_hash) .await { + debug!("Received {} account ranges", accounts.len()); // Reset retry counter retry_count = 0; // Update starting hash for next batch @@ -382,21 +423,47 @@ async fn rebuild_state_trie( retry_count += 1; } } + if retry_count > MAX_RETRIES { + // Store current checkpoint + store.set_state_trie_root_checkpoint(current_state_root)?; + store.set_state_trie_key_checkpoint(start_account_hash)?; + } + debug!("Account Trie Fetching ended, signaling storage fetcher process"); // Send empty batch to signal that no more batches are incoming storage_sender.send(vec![]).await?; - storage_fetcher_handle.await??; - let sync_complete = if current_state_root == state_root { - debug!("Completed state sync for state root {state_root}"); - true - } else { - // Perform state healing to fix any potential inconsistency in the rebuilt tries - // As we are not fetching different chunks of the same trie this step is not necessary - heal_state_trie(bytecode_sender.clone(), state_root, store, peers).await? - }; + let pending_storage_accounts = storage_fetcher_handle.await??; + let pending_storages = !pending_storage_accounts.is_empty(); + // Next cycle may have different storage roots for these accounts so we will leave them to healing + if pending_storages { + let mut stored_pending_storages = store + .get_pending_storage_heal_accounts()? + .unwrap_or_default(); + stored_pending_storages.extend(pending_storage_accounts); + debug!( + "Current pending storage accounts: {}", + stored_pending_storages.len() + ); + store.set_pending_storage_heal_accounts(stored_pending_storages)?; + } + if retry_count > MAX_RETRIES || pending_storages { + // Skip healing and return stale status + return Ok(false); + } + // Perform state healing to fix inconsistencies with older state + info!("Starting state healing"); + let res = heal_state_trie( + bytecode_sender.clone(), + state_root, + current_state_root, + store.clone(), + peers.clone(), + ) + .await?; // Send empty batch to signal that no more batches are incoming + debug!("Account Trie fully rebuilt, signaling bytecode fetcher process"); bytecode_sender.send(vec![]).await?; bytecode_fetcher_handle.await??; - Ok(sync_complete) + Ok(res) } /// Waits for incoming code hashes from the receiver channel endpoint, queues them, and fetches and stores their bytecodes in batches @@ -405,7 +472,6 @@ async fn bytecode_fetcher( peers: Arc>, store: Store, ) -> Result<(), SyncError> { - const BATCH_SIZE: usize = 200; let mut pending_bytecodes: Vec = vec![]; let mut incoming = true; while incoming { @@ -452,17 +518,19 @@ async fn fetch_bytecode_batch( } /// Waits for incoming account hashes & storage roots from the receiver channel endpoint, queues them, and fetches and stores their bytecodes in batches +/// This function will remain active until either an empty vec is sent to the receiver or the pivot becomes stale +/// In the last case, the fetcher will return the account hashes of the accounts in the queue async fn storage_fetcher( mut receiver: Receiver>, peers: Arc>, store: Store, state_root: H256, -) -> Result<(), StoreError> { - const BATCH_SIZE: usize = 100; +) -> Result, SyncError> { // Pending list of storages to fetch let mut pending_storage: Vec<(H256, H256)> = vec![]; - // TODO: Also add a queue for storages that were incompletely fecthed, - // but for the first iteration we will asume not fully fetched -> fetch again + // The pivot may become stale while the fetcher is active, we will still keep the process + // alive until the end signal so we don't lose queued messages + let mut stale = false; let mut incoming = true; while incoming { // Fetch incoming requests @@ -475,26 +543,56 @@ async fn storage_fetcher( } // If we have enough pending bytecodes to fill a batch // or if we have no more incoming batches, spawn a fetch process - while pending_storage.len() >= BATCH_SIZE || !incoming && !pending_storage.is_empty() { - let next_batch = pending_storage - .drain(..BATCH_SIZE.min(pending_storage.len())) - .collect::>(); - let remaining = - fetch_storage_batch(next_batch, state_root, peers.clone(), store.clone()).await?; - // Add unfeched bytecodes back to the queue - pending_storage.extend(remaining); + // If the pivot became stale don't process anything and just save incoming requests + while !stale + && (pending_storage.len() >= NODE_BATCH_SIZE + || !incoming && !pending_storage.is_empty()) + { + // We will be spawning multiple tasks and then collecting their results + // This uses a loop inside the main loop as the result from these tasks may lead to more values in queue + let mut storage_tasks = tokio::task::JoinSet::new(); + while !stale + && (pending_storage.len() >= NODE_BATCH_SIZE + || !incoming && !pending_storage.is_empty()) + { + let next_batch = pending_storage + .drain(..NODE_BATCH_SIZE.min(pending_storage.len())) + .collect::>(); + storage_tasks.spawn(fetch_storage_batch( + next_batch.clone(), + state_root, + peers.clone(), + store.clone(), + )); + } + // Add unfetched accounts to queue and handle stale signal + for res in storage_tasks.join_all().await { + let (remaining, is_stale) = res?; + pending_storage.extend(remaining); + stale |= is_stale; + } } } - Ok(()) + debug!( + "Concluding storage fetcher, {} storages left in queue to be healed later", + pending_storage.len() + ); + Ok(pending_storage.into_iter().map(|(acc, _)| acc).collect()) } /// Receives a batch of account hashes with their storage roots, fetches their respective storage ranges via p2p and returns a list of the code hashes that couldn't be fetched in the request (if applicable) +/// Also returns a boolean indicating if the pivot became stale during the request async fn fetch_storage_batch( mut batch: Vec<(H256, H256)>, state_root: H256, peers: Arc>, store: Store, -) -> Result, StoreError> { +) -> Result<(Vec<(H256, H256)>, bool), SyncError> { + debug!( + "Requesting storage ranges for addresses {}..{}", + batch.first().unwrap().0, + batch.last().unwrap().0 + ); for _ in 0..MAX_RETRIES { let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; let (batch_hahses, batch_roots) = batch.clone().into_iter().unzip(); @@ -502,12 +600,34 @@ async fn fetch_storage_batch( .request_storage_ranges(state_root, batch_roots, batch_hahses, H256::zero()) .await { - debug!("Received {} storage ranges", keys.len()); - let mut _last_range; - // Hold on to the last batch (if incomplete) + debug!("Received {} storage ranges", keys.len(),); + // Handle incomplete ranges if incomplete { // An incomplete range cannot be empty - _last_range = (keys.pop().unwrap(), values.pop().unwrap()); + let (last_keys, last_values) = (keys.pop().unwrap(), values.pop().unwrap()); + // If only one incomplete range is returned then it must belong to a trie that is too big to fit into one request + // We will handle this large trie separately + if keys.is_empty() { + debug!("Large storage trie encountered, handling separately"); + let (account_hash, storage_root) = batch.remove(0); + if handle_large_storage_range( + state_root, + account_hash, + storage_root, + last_keys, + last_values, + peers.clone(), + store.clone(), + ) + .await? + { + // Pivot became stale + // Add trie back to the queue and return stale pivot status + batch.push((account_hash, storage_root)); + return Ok((batch, true)); + } + } + // The incomplete range is not the first, we cannot asume it is a large trie, so lets add it back to the queue } // Store the storage ranges & rebuild the storage trie for each account for (keys, values) in keys.into_iter().zip(values.into_iter()) { @@ -520,21 +640,76 @@ async fn fetch_storage_batch( warn!("State sync failed for storage root {storage_root}"); } } - // TODO: if the last range is incomplete add it to the incomplete batches queue - // For now we will fetch the full range again // Return remaining code hashes in the batch if we couldn't fetch all of them - return Ok(batch); + return Ok((batch, false)); } } - // This is a corner case where we fetched an account range for a block but the chain has moved on and the block - // was dropped by the peer's snapshot. We will keep the fetcher alive to avoid errors and stop fetching as from the next account - Ok(vec![]) + // Pivot became stale + Ok((batch, true)) +} + +/// Handles the returned incomplete storage range of a large storage trie and +/// fetches the rest of the trie using single requests +/// Returns a boolean indicating is the pivot became stale during fetching +// TODO: Later on this method can be refactored to use a separate queue process +// instead of blocking the current thread for the remainder of the retrieval +async fn handle_large_storage_range( + state_root: H256, + account_hash: H256, + storage_root: H256, + keys: Vec, + values: Vec, + peers: Arc>, + store: Store, +) -> Result { + // First process the initial range + // Keep hold of the last key as this will be the first key of the next range + let mut next_key = *keys.last().unwrap(); + let mut current_root = { + let mut trie = store.open_storage_trie(account_hash, *EMPTY_TRIE_HASH); + for (key, value) in keys.into_iter().zip(values.into_iter()) { + trie.insert(key.0.to_vec(), value.encode_to_vec())?; + } + // Compute current root so we can extend this trie later + trie.hash()? + }; + let mut should_continue = true; + // Fetch the remaining range + let mut retry_count = 0; + while should_continue { + while retry_count <= MAX_RETRIES { + debug!("Fetching large storage trie, current key: {}", next_key); + let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; + if let Some((keys, values, incomplete)) = peer + .request_storage_range(state_root, storage_root, account_hash, next_key) + .await + { + next_key = *keys.last().unwrap(); + should_continue = incomplete; + let mut trie = store.open_storage_trie(account_hash, current_root); + for (key, value) in keys.into_iter().zip(values.into_iter()) { + trie.insert(key.0.to_vec(), value.encode_to_vec())?; + } + // Compute current root so we can extend this trie later + current_root = trie.hash()?; + break; + } else { + retry_count += 1; + } + } + } + if current_root != storage_root && retry_count <= MAX_RETRIES { + warn!("State sync failed for storage root {storage_root}"); + } + Ok(retry_count > MAX_RETRIES) } /// Heals the trie given its state_root by fetching any missing nodes in it via p2p +/// Doesn't store nodes, only leaf values to avoid inconsistent tries on restarts async fn heal_state_trie( bytecode_sender: Sender>, state_root: H256, + mut current_root: H256, store: Store, peers: Arc>, ) -> Result { @@ -546,16 +721,29 @@ async fn heal_state_trie( peers.clone(), store.clone(), )); + // Check if we have pending storages to heal from a previous cycle + if let Some(pending) = store.get_pending_storage_heal_accounts()? { + debug!( + "Retrieved {} pending storage healing requests", + pending.len() + ); + storage_sender.send(pending).await?; + } // Begin by requesting the root node let mut paths = vec![Nibbles::default()]; // Count the number of request retries so we don't get stuck requesting old state let mut retry_count = 0; while !paths.is_empty() && retry_count < MAX_RETRIES { + // Fetch the latests paths first to prioritize reaching leaves as soon as possible + let batch: Vec = paths + .drain(paths.len().saturating_sub(NODE_BATCH_SIZE)..) + .collect(); let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; if let Some(nodes) = peer - .request_state_trienodes(state_root, paths.clone()) + .request_state_trienodes(state_root, batch.clone()) .await { + debug!("Received {} state nodes", nodes.len()); // Reset retry counter for next request retry_count = 0; let mut hahsed_addresses = vec![]; @@ -563,13 +751,13 @@ async fn heal_state_trie( // For each fetched node: // - Add its children to the queue (if we don't have them already) // - If it is a leaf, request its bytecode & storage - // - Add it to the trie's state - for node in nodes { - let path = paths.remove(0); + // - If it is a leaf, add its path & value to the trie + // Add unfetched nodes back to the queue (we do this first to ensure deph-focused fetching) + paths.extend_from_slice(&batch[nodes.len()..]); + for (node, path) in nodes.into_iter().zip(batch.into_iter()) { // We cannot keep the trie state open - let mut trie = store.open_state_trie(*EMPTY_TRIE_HASH); - let trie_state = trie.state_mut(); - paths.extend(node_missing_children(&node, &path, trie_state)?); + let mut trie = store.open_state_trie(current_root); + paths.extend(node_missing_children(&node, &path, trie.state())?); if let Node::Leaf(node) = &node { // Fetch bytecode & storage let account = AccountState::decode(&node.value)?; @@ -590,9 +778,11 @@ async fn heal_state_trie( { code_hashes.push(account.code_hash); } + // Write values to trie + trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?; + // Update current root + current_root = trie.hash()?; } - let hash = node.compute_hash(); - trie_state.write_node(node, hash)?; } // Send storage & bytecode requests if !hahsed_addresses.is_empty() { @@ -605,23 +795,35 @@ async fn heal_state_trie( retry_count += 1; } } + debug!("State Healing stopped, signaling storage healer"); // Send empty batch to signal that no more batches are incoming storage_sender.send(vec![]).await?; - storage_healer_handler.await??; - Ok(retry_count < MAX_RETRIES) + let pending_storage_heal_accounts = storage_healer_handler.await??; + // Update pending list + // If a storage trie was left mid-healing we will heal it again + let storage_healing_succesful = pending_storage_heal_accounts.is_empty(); + if !storage_healing_succesful { + store.set_pending_storage_heal_accounts(pending_storage_heal_accounts)?; + } + Ok(retry_count < MAX_RETRIES && storage_healing_succesful) } /// Waits for incoming hashed addresses from the receiver channel endpoint and queues the associated root nodes for state retrieval /// Also retrieves their children nodes until we have the full storage trie stored +/// If the state becomes stale while fetching, returns its current queued account hashes async fn storage_healer( state_root: H256, mut receiver: Receiver>, peers: Arc>, store: Store, -) -> Result<(), SyncError> { - const BATCH_SIZE: usize = 200; - // Pending list of bytecodes to fetch - let mut pending_storages: Vec<(H256, Nibbles)> = vec![]; +) -> Result, SyncError> { + // Pending list of storages to fetch + // Each entry is made up of AccountHash -> (CurrentRoot, Paths) + let mut pending_storages: BTreeMap)> = BTreeMap::new(); + //let mut pending_storages: Vec<(H256, Nibbles)> = vec![]; + // The pivot may become stale while the fetcher is active, we will still keep the process + // alive until the end signal so we don't lose queued messages + let mut stale = false; let mut incoming = true; while incoming { // Fetch incoming requests @@ -631,7 +833,7 @@ async fn storage_healer( pending_storages.extend( account_paths .into_iter() - .map(|acc_path| (acc_path, Nibbles::default())), + .map(|acc_path| (acc_path, (*EMPTY_TRIE_HASH, vec![Nibbles::default()]))), ); } // Disconnect / Empty message signaling no more bytecodes to sync @@ -639,67 +841,74 @@ async fn storage_healer( } // If we have enough pending storages to fill a batch // or if we have no more incoming batches, spawn a fetch process - while pending_storages.len() >= BATCH_SIZE || !incoming && !pending_storages.is_empty() { - let mut next_batch: BTreeMap> = BTreeMap::new(); - // Group pending storages by account path - // We do this here instead of keeping them sorted so we don't prioritize further nodes from the first tries - for (account, path) in pending_storages.drain(..BATCH_SIZE.min(pending_storages.len())) - { - next_batch.entry(account).or_default().push(path); - } - let return_batch = - heal_storage_batch(state_root, next_batch, peers.clone(), store.clone()).await?; - for (acc_path, paths) in return_batch { - for path in paths { - pending_storages.push((acc_path, path)); - } + // If the pivot became stale don't process anything and just save incoming requests + while !stale && !pending_storages.is_empty() { + let mut next_batch: BTreeMap)> = BTreeMap::new(); + // Fill batch + let mut batch_size = 0; + while batch_size < BATCH_SIZE { + let (key, val) = pending_storages.pop_first().unwrap(); + batch_size += val.1.len(); + next_batch.insert(key, val); } + let (return_batch, is_stale) = + heal_storage_batch(state_root, next_batch.clone(), peers.clone(), store.clone()) + .await?; + pending_storages.extend(return_batch.into_iter()); + stale |= is_stale; } } - Ok(()) + Ok(pending_storages.into_keys().collect()) } /// Receives a set of storage trie paths (grouped by their corresponding account's state trie path), -/// fetches their respective nodes, stores them, and returns their children paths and the paths that couldn't be fetched so they can be returned to the queue +/// fetches their respective nodes, stores their values, and returns their children paths and the paths that couldn't be fetched so they can be returned to the queue +/// Also returns a boolean indicating if the pivot became stale during the request async fn heal_storage_batch( state_root: H256, - mut batch: BTreeMap>, + mut batch: BTreeMap)>, peers: Arc>, store: Store, -) -> Result>, SyncError> { +) -> Result<(BTreeMap)>, bool), SyncError> { for _ in 0..MAX_RETRIES { let peer = get_peer_channel_with_retry(peers.clone(), Capability::Snap).await; - if let Some(mut nodes) = peer - .request_storage_trienodes(state_root, batch.clone()) - .await - { + let req_batch = batch.iter().map(|(k, v)| (*k, v.1.clone())).collect(); + if let Some(mut nodes) = peer.request_storage_trienodes(state_root, req_batch).await { debug!("Received {} nodes", nodes.len()); // Process the nodes for each account path - for (acc_path, paths) in batch.iter_mut() { - let mut trie = store.open_storage_trie(*acc_path, *EMPTY_TRIE_HASH); - let trie_state = trie.state_mut(); + for (acc_path, (root, paths)) in batch.iter_mut() { + let mut trie = store.open_storage_trie(*acc_path, *root); // Get the corresponding nodes for node in nodes.drain(..paths.len().min(nodes.len())) { let path = paths.remove(0); // Add children to batch - let children = node_missing_children(&node, &path, trie_state)?; + let children = node_missing_children(&node, &path, trie.state())?; paths.extend(children); - // Add node to the state - let hash = node.compute_hash(); - trie_state.write_node(node, hash)?; + // If it is a leaf node, insert values into the trie + if let Node::Leaf(leaf) = node { + let path = &path.concat(leaf.partial.clone()).to_bytes(); + if path.len() != 32 { + // Something went wrong + return Err(SyncError::CorruptPath); + } + trie.insert(path.to_vec(), leaf.value.encode_to_vec())?; + } } + // Update current root + *root = trie.hash()?; // Cut the loop if we ran out of nodes if nodes.is_empty() { break; } } // Return remaining and added paths to be added to the queue - return Ok(batch); + // Filter out the storages we completely fetched + batch.retain(|_, v| !v.1.is_empty()); + return Ok((batch, false)); } } - // This is a corner case where we fetched an account range for a block but the chain has moved on and the block - // was dropped by the peer's snapshot. We will keep the fetcher alive to avoid errors and stop fetching as from the next account - Ok(BTreeMap::new()) + // Pivot became stale, lets inform the fetcher + Ok((batch, true)) } /// Returns the partial paths to the node's children if they are not already part of the trie state @@ -727,6 +936,41 @@ fn node_missing_children( Ok(paths) } +/// Shows the completion rate & estimated remaining time of the state sync phase of snap sync +/// Does not take into account healing +async fn show_progress( + current_account_hash: H256, + initial_account_hash: U256, + start_time: Instant, +) { + // Calculate current progress percentage + // Add 1 here to avoid dividing by zero, the change should be inperceptible + let completion_rate: U512 = + U512::from(current_account_hash.into_uint() + 1) * 100 / U512::from(U256::MAX); + // Make a simple time to finish estimation based on current progress + // The estimation relies on account hashes being (close to) evenly distributed + let synced_account_hashes = current_account_hash.into_uint() - initial_account_hash; + let remaining_account_hashes = U256::MAX - current_account_hash.into_uint(); + // Time to finish = Time since start / synced_account_hashes * remaining_account_hashes + let time_to_finish_secs = U512::from(Instant::now().duration_since(start_time).as_secs()) + * U512::from(remaining_account_hashes) + / U512::from(synced_account_hashes); + info!( + "Downloading state trie, completion rate: {}%, estimated time to finish: {}", + completion_rate, + seconds_to_readable(time_to_finish_secs) + ) +} + +fn seconds_to_readable(seconds: U512) -> String { + let (days, rest) = seconds.div_mod(U512::from(60 * 60 * 24)); + let (hours, rest) = rest.div_mod(U512::from(60 * 60)); + let (minutes, seconds) = rest.div_mod(U512::from(60)); + if days > U512::zero() { + return format!("Over {days} days"); + } + format!("{hours}h{minutes}m{seconds}s") +} /// Returns the channel ends to an active peer connection that supports the given capability /// The peer is selected randomly, and doesn't guarantee that the selected peer is not currently busy /// If no peer is found, this method will try again after 10 seconds diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index d4d7cbc5ad..ae46a3b9fa 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -6,7 +6,7 @@ use ethrex_blockchain::{ }; use ethrex_core::types::BlockHeader; use serde_json::Value; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::{ types::{ @@ -14,7 +14,7 @@ use crate::{ payload::PayloadStatus, }, utils::RpcRequest, - RpcApiContext, RpcErr, RpcHandler, + RpcApiContext, RpcErr, RpcHandler, SyncStatus, }; #[derive(Debug)] @@ -163,20 +163,27 @@ fn handle_forkchoice( context: RpcApiContext, version: usize, ) -> Result<(Option, ForkChoiceResponse), RpcErr> { - info!( + debug!( "New fork choice request v{} with head: {:#x}, safe: {:#x}, finalized: {:#x}.", version, fork_choice_state.head_block_hash, fork_choice_state.safe_block_hash, fork_choice_state.finalized_block_hash ); + // Check if there is an ongoing sync before applying the forkchoice + let fork_choice_res = match context.sync_status()? { + // Apply current fork choice + SyncStatus::Inactive => apply_fork_choice( + &context.storage, + fork_choice_state.head_block_hash, + fork_choice_state.safe_block_hash, + fork_choice_state.finalized_block_hash, + ), + // Restart sync if needed + _ => Err(InvalidForkChoice::Syncing), + }; - match apply_fork_choice( - &context.storage, - fork_choice_state.head_block_hash, - fork_choice_state.safe_block_hash, - fork_choice_state.finalized_block_hash, - ) { + match fork_choice_res { Ok(head) => Ok(( Some(head), ForkChoiceResponse::from(PayloadStatus::valid_with_hash( diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index 07c803adf6..e6b382a118 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -4,13 +4,13 @@ use ethrex_blockchain::payload::build_payload; use ethrex_core::types::{BlobsBundle, Block, BlockBody, BlockHash, BlockNumber, Fork}; use ethrex_core::{H256, U256}; use serde_json::Value; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use crate::types::payload::{ ExecutionPayload, ExecutionPayloadBody, ExecutionPayloadResponse, PayloadStatus, }; use crate::utils::{parse_json_hex, RpcRequest}; -use crate::{RpcApiContext, RpcErr, RpcHandler}; +use crate::{RpcApiContext, RpcErr, RpcHandler, SyncStatus}; // Must support rquest sizes of at least 32 blocks // Chosen an arbitrary x4 value @@ -102,20 +102,28 @@ impl RpcHandler for NewPayloadV3Request { validate_fork(&block, Fork::Cancun, &context)?; validate_execution_payload_v3(&self.payload)?; let payload_status = { - if let Err(RpcErr::Internal(error_msg)) = validate_block_hash(&self.payload, &block) { - PayloadStatus::invalid_with_err(&error_msg) - } else { - let blob_versioned_hashes: Vec = block - .body - .transactions - .iter() - .flat_map(|tx| tx.blob_versioned_hashes()) - .collect(); - - if self.expected_blob_versioned_hashes != blob_versioned_hashes { - PayloadStatus::invalid_with_err("Invalid blob_versioned_hashes") - } else { - execute_payload(&block, &context)? + // Ignore incoming + match context.sync_status()? { + SyncStatus::Active | SyncStatus::Pending => PayloadStatus::syncing(), + SyncStatus::Inactive => { + if let Err(RpcErr::Internal(error_msg)) = + validate_block_hash(&self.payload, &block) + { + PayloadStatus::invalid_with_err(&error_msg) + } else { + let blob_versioned_hashes: Vec = block + .body + .transactions + .iter() + .flat_map(|tx| tx.blob_versioned_hashes()) + .collect(); + + if self.expected_blob_versioned_hashes != blob_versioned_hashes { + PayloadStatus::invalid_with_err("Invalid blob_versioned_hashes") + } else { + execute_payload(&block, &context)? + } + } } } }; @@ -343,11 +351,14 @@ fn handle_new_payload_v1_v2( context: RpcApiContext, ) -> Result { let block = get_block_from_payload(payload, None)?; - let payload_status = { - if let Err(RpcErr::Internal(error_msg)) = validate_block_hash(payload, &block) { - PayloadStatus::invalid_with_err(&error_msg) - } else { - execute_payload(&block, &context)? + let payload_status = match context.sync_status()? { + SyncStatus::Active | SyncStatus::Pending => PayloadStatus::syncing(), + SyncStatus::Inactive => { + if let Err(RpcErr::Internal(error_msg)) = validate_block_hash(payload, &block) { + PayloadStatus::invalid_with_err(&error_msg) + } else { + execute_payload(&block, &context)? + } } }; serde_json::to_value(payload_status).map_err(|error| RpcErr::Internal(error.to_string())) @@ -374,7 +385,7 @@ fn validate_block_hash(payload: &ExecutionPayload, block: &Block) -> Result<(), "Invalid block hash. Expected {actual_block_hash:#x}, got {block_hash:#x}" ))); } - info!("Block hash {block_hash} is valid"); + debug!("Block hash {block_hash} is valid"); Ok(()) } diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 38efee747d..2273e52fd1 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -63,7 +63,7 @@ mod web3; use axum::extract::State; use ethrex_net::types::Node; -use ethrex_storage::Store; +use ethrex_storage::{error::StoreError, Store}; #[derive(Debug, Clone)] pub struct RpcApiContext { @@ -74,6 +74,32 @@ pub struct RpcApiContext { syncer: Arc>, } +/// Describes the client's current sync status: +/// Inactive: There is no active sync process +/// Active: The client is currently syncing +/// Pending: The previous sync process became stale, awaiting restart +pub enum SyncStatus { + Inactive, + Active, + Pending, +} + +impl RpcApiContext { + /// Returns the engine's current sync status, see [SyncStatus] + pub fn sync_status(&self) -> Result { + // Try to get hold of the sync manager, if we can't then it means it is currently involved in a sync process + Ok(if self.syncer.try_lock().is_err() { + SyncStatus::Active + // Check if there is a checkpoint left from a previous aborted sync + } else if self.storage.get_header_download_checkpoint()?.is_some() { + SyncStatus::Pending + // No trace of a sync being handled + } else { + SyncStatus::Inactive + }) + } +} + trait RpcHandler: Sized { fn parse(params: &Option>) -> Result; diff --git a/crates/storage/store/engines/api.rs b/crates/storage/store/engines/api.rs index aa020fd29b..aca3c05fbe 100644 --- a/crates/storage/store/engines/api.rs +++ b/crates/storage/store/engines/api.rs @@ -175,61 +175,61 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Returns the stored chain configuration fn get_chain_config(&self) -> Result; - // Update earliest block number + /// Update earliest block number fn update_earliest_block_number(&self, block_number: BlockNumber) -> Result<(), StoreError>; - // Obtain earliest block number + /// Obtain earliest block number fn get_earliest_block_number(&self) -> Result, StoreError>; - // Update finalized block number + /// Update finalized block number fn update_finalized_block_number(&self, block_number: BlockNumber) -> Result<(), StoreError>; - // Obtain finalized block number + /// Obtain finalized block number fn get_finalized_block_number(&self) -> Result, StoreError>; - // Update safe block number + /// Update safe block number fn update_safe_block_number(&self, block_number: BlockNumber) -> Result<(), StoreError>; - // Obtain safe block number + /// Obtain safe block number fn get_safe_block_number(&self) -> Result, StoreError>; - // Update latest block number + /// Update latest block number fn update_latest_block_number(&self, block_number: BlockNumber) -> Result<(), StoreError>; - // Obtain latest block number + /// Obtain latest block number fn get_latest_block_number(&self) -> Result, StoreError>; // TODO (#307): Remove TotalDifficulty. - // Update latest total difficulty + /// Update latest total difficulty fn update_latest_total_difficulty( &self, latest_total_difficulty: U256, ) -> Result<(), StoreError>; // TODO (#307): Remove TotalDifficulty. - // Obtain latest total difficulty + /// Obtain latest total difficulty fn get_latest_total_difficulty(&self) -> Result, StoreError>; - // Update pending block number + /// Update pending block number fn update_pending_block_number(&self, block_number: BlockNumber) -> Result<(), StoreError>; - // Obtain pending block number + /// Obtain pending block number fn get_pending_block_number(&self) -> Result, StoreError>; - // Obtain a storage trie from the given address and storage_root - // Doesn't check if the account is stored - // Used for internal store operations + /// Obtain a storage trie from the given address and storage_root + /// Doesn't check if the account is stored + /// Used for internal store operations fn open_storage_trie(&self, hashed_address: H256, storage_root: H256) -> Trie; - // Obtain a state trie from the given state root - // Doesn't check if the state root is valid - // Used for internal store operations + /// Obtain a state trie from the given state root + /// Doesn't check if the state root is valid + /// Used for internal store operations fn open_state_trie(&self, state_root: H256) -> Trie; - // Set the canonical block hash for a given block number. + /// Set the canonical block hash for a given block number. fn set_canonical_block(&self, number: BlockNumber, hash: BlockHash) -> Result<(), StoreError>; - // Unsets canonical block for a block number. + /// Unsets canonical block for a block number. fn unset_canonical_block(&self, number: BlockNumber) -> Result<(), StoreError>; fn add_payload(&self, payload_id: u64, block: Block) -> Result<(), StoreError>; @@ -250,6 +250,44 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { fn get_receipts_for_block(&self, block_hash: &BlockHash) -> Result, StoreError>; + // Snap State methods + + /// Sets the hash of the last header downloaded during a snap sync + fn set_header_download_checkpoint(&self, block_hash: BlockHash) -> Result<(), StoreError>; + + /// Gets the hash of the last header downloaded during a snap sync + fn get_header_download_checkpoint(&self) -> Result, StoreError>; + + /// Clears the hash of the last header downloaded during a snap sync + fn clear_header_download_checkpoint(&self) -> Result<(), StoreError>; + + /// Sets the current state root of the state trie being rebuilt during snap sync + fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError>; + + /// Gets the current state root of the state trie being rebuilt during snap sync + fn get_state_trie_root_checkpoint(&self) -> Result, StoreError>; + + /// Clears the current state root of the state trie being rebuilt during snap sync + fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError>; + + /// Sets the last key fetched from the state trie being fetched during snap sync + fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError>; + + /// Gets the last key fetched from the state trie being fetched during snap sync + fn get_state_trie_key_checkpoint(&self) -> Result, StoreError>; + + /// Clears the last key fetched from the state trie being fetched during snap sync + fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError>; + + /// Sets the list of account hashes whose storage needs healing + fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError>; + + /// Gets the list of account hashes whos storage needs healing + fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError>; + + /// Clears the list of account hashes whose storage needs healing + fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError>; + fn is_synced(&self) -> Result; fn update_sync_status(&self, status: bool) -> Result<(), StoreError>; diff --git a/crates/storage/store/engines/in_memory.rs b/crates/storage/store/engines/in_memory.rs index 2f42c4859e..23e8be55c4 100644 --- a/crates/storage/store/engines/in_memory.rs +++ b/crates/storage/store/engines/in_memory.rs @@ -38,6 +38,8 @@ struct StoreInner { // Stores local blocks by payload id payloads: HashMap, pending_blocks: HashMap, + // Stores current Snap Sate + snap_state: SnapState, } #[derive(Default, Debug)] @@ -53,6 +55,19 @@ struct ChainData { is_synced: bool, } +// Keeps track of the state left by the latest snap attempt +#[derive(Default, Debug)] +pub struct SnapState { + /// Latest downloaded block header's hash from a previously aborted sync + header_download_checkpoint: Option, + /// Current root hash of the latest State Trie (Used for both fetching and healing) + state_trie_root_checkpoint: Option, + /// Last downloaded key of the latest State Trie + state_trie_key_checkpoint: Option, + /// Accounts which storage needs healing + pending_storage_heal_accounts: Option>, +} + impl Store { pub fn new() -> Self { Self::default() @@ -425,6 +440,66 @@ impl StoreEngine for Store { Ok(()) } + fn set_header_download_checkpoint(&self, block_hash: BlockHash) -> Result<(), StoreError> { + self.inner().snap_state.header_download_checkpoint = Some(block_hash); + Ok(()) + } + + fn get_header_download_checkpoint(&self) -> Result, StoreError> { + Ok(self.inner().snap_state.header_download_checkpoint) + } + + fn clear_header_download_checkpoint(&self) -> Result<(), StoreError> { + self.inner().snap_state.header_download_checkpoint = None; + Ok(()) + } + + fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError> { + self.inner().snap_state.state_trie_root_checkpoint = Some(current_root); + Ok(()) + } + + fn get_state_trie_root_checkpoint(&self) -> Result, StoreError> { + Ok(self.inner().snap_state.state_trie_root_checkpoint) + } + + fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError> { + self.inner().snap_state.state_trie_root_checkpoint = None; + Ok(()) + } + + fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError> { + self.inner().snap_state.state_trie_key_checkpoint = Some(last_key); + Ok(()) + } + + fn get_state_trie_key_checkpoint(&self) -> Result, StoreError> { + Ok(self.inner().snap_state.state_trie_key_checkpoint) + } + + fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError> { + self.inner().snap_state.state_trie_key_checkpoint = None; + Ok(()) + } + + fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { + self.inner().snap_state.pending_storage_heal_accounts = Some(accounts); + Ok(()) + } + + fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { + Ok(self + .inner() + .snap_state + .pending_storage_heal_accounts + .clone()) + } + + fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError> { + self.inner().snap_state.pending_storage_heal_accounts = None; + Ok(()) + } + fn is_synced(&self) -> Result { Ok(self.inner().chain_data.is_synced) } diff --git a/crates/storage/store/engines/libmdbx.rs b/crates/storage/store/engines/libmdbx.rs index 997fa2f32a..703989d3c2 100644 --- a/crates/storage/store/engines/libmdbx.rs +++ b/crates/storage/store/engines/libmdbx.rs @@ -1,5 +1,5 @@ use super::api::StoreEngine; -use super::utils::ChainDataIndex; +use super::utils::{ChainDataIndex, SnapStateIndex}; use crate::error::StoreError; use crate::rlp::{ AccountCodeHashRLP, AccountCodeRLP, BlockBodyRLP, BlockHashRLP, BlockHeaderRLP, BlockRLP, @@ -72,6 +72,17 @@ impl Store { txn.get::(key).map_err(StoreError::LibmdbxError) } + // Helper method to remove a value from a libmdbx table + fn delete(&self, key: T::Key) -> Result<(), StoreError> { + let txn = self + .db + .begin_readwrite() + .map_err(StoreError::LibmdbxError)?; + txn.delete::(key, None) + .map_err(StoreError::LibmdbxError)?; + txn.commit().map_err(StoreError::LibmdbxError) + } + fn get_block_hash_by_block_number( &self, number: BlockNumber, @@ -514,6 +525,79 @@ impl StoreEngine for Store { Ok(receipts.into_iter().map(|receipt| receipt.to()).collect()) } + + fn set_header_download_checkpoint(&self, block_hash: BlockHash) -> Result<(), StoreError> { + self.write::( + SnapStateIndex::HeaderDownloadCheckpoint, + block_hash.encode_to_vec(), + ) + } + + fn get_header_download_checkpoint(&self) -> Result, StoreError> { + self.read::(SnapStateIndex::HeaderDownloadCheckpoint)? + .map(|ref h| BlockHash::decode(h)) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_header_download_checkpoint(&self) -> Result<(), StoreError> { + self.delete::(SnapStateIndex::HeaderDownloadCheckpoint) + } + + fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError> { + self.write::( + SnapStateIndex::StateTrieRootCheckpoint, + current_root.encode_to_vec(), + ) + } + + fn get_state_trie_root_checkpoint(&self) -> Result, StoreError> { + self.read::(SnapStateIndex::StateTrieRootCheckpoint)? + .map(|ref h| H256::decode(h)) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError> { + self.delete::(SnapStateIndex::StateTrieRootCheckpoint) + } + + fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError> { + self.write::( + SnapStateIndex::StateTrieRootCheckpoint, + last_key.encode_to_vec(), + ) + } + + fn get_state_trie_key_checkpoint(&self) -> Result, StoreError> { + self.read::(SnapStateIndex::StateTrieRootCheckpoint)? + .map(|ref h| H256::decode(h)) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError> { + self.delete::(SnapStateIndex::StateTrieRootCheckpoint) + } + + fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { + self.write::( + SnapStateIndex::PendingStorageHealAccounts, + accounts.encode_to_vec(), + ) + } + + fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { + self.read::(SnapStateIndex::PendingStorageHealAccounts)? + .map(|ref h| >::decode(h)) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError> { + self.delete::(SnapStateIndex::PendingStorageHealAccounts) + } + fn is_synced(&self) -> Result { match self.read::(ChainDataIndex::IsSynced)? { None => Err(StoreError::Custom("Sync status not found".to_string())), @@ -584,6 +668,12 @@ table!( ( ChainData ) ChainDataIndex => Vec ); +table!( + /// Stores snap state, each value is unique and stored as its rlp encoding + /// See [SnapStateIndex] for available values + ( SnapState ) SnapStateIndex => Vec +); + // Trie storages table!( @@ -668,6 +758,13 @@ impl Encodable for ChainDataIndex { } } +impl Encodable for SnapStateIndex { + type Encoded = [u8; 4]; + + fn encode(self) -> Self::Encoded { + (self as u32).encode() + } +} /// Initializes a new database with the provided path. If the path is `None`, the database /// will be temporary. pub fn init_db(path: Option>) -> Database { @@ -686,6 +783,7 @@ pub fn init_db(path: Option>) -> Database { table_info!(CanonicalBlockHashes), table_info!(Payloads), table_info!(PendingBlocks), + table_info!(SnapState), ] .into_iter() .collect(); diff --git a/crates/storage/store/engines/redb.rs b/crates/storage/store/engines/redb.rs index ad88378493..c19f8c6dd5 100644 --- a/crates/storage/store/engines/redb.rs +++ b/crates/storage/store/engines/redb.rs @@ -22,6 +22,7 @@ use crate::{ }, }; +use super::utils::SnapStateIndex; use super::{api::StoreEngine, utils::ChainDataIndex}; const STATE_TRIE_NODES_TABLE: TableDefinition<&[u8], &[u8]> = @@ -52,6 +53,8 @@ const TRANSACTION_LOCATIONS_TABLE: MultimapTableDefinition< TransactionHashRLP, Rlp<(BlockNumber, BlockHash, Index)>, > = MultimapTableDefinition::new("TransactionLocations"); +const SNAP_STATE_TABLE: TableDefinition> = + TableDefinition::new("SnapState"); #[derive(Debug)] pub struct RedBStore { @@ -698,6 +701,82 @@ impl StoreEngine for RedBStore { .collect()) } + fn set_header_download_checkpoint(&self, block_hash: BlockHash) -> Result<(), StoreError> { + self.write( + SNAP_STATE_TABLE, + SnapStateIndex::HeaderDownloadCheckpoint, + block_hash.encode_to_vec(), + ) + } + + fn get_header_download_checkpoint(&self) -> Result, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::HeaderDownloadCheckpoint)? + .map(|rlp| RLPDecode::decode(&rlp.value())) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_header_download_checkpoint(&self) -> Result<(), StoreError> { + self.delete(SNAP_STATE_TABLE, SnapStateIndex::HeaderDownloadCheckpoint) + } + + fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError> { + self.write( + SNAP_STATE_TABLE, + SnapStateIndex::StateTrieRootCheckpoint, + current_root.encode_to_vec(), + ) + } + + fn get_state_trie_root_checkpoint(&self) -> Result, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::StateTrieRootCheckpoint)? + .map(|rlp| RLPDecode::decode(&rlp.value())) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_state_trie_root_checkpoint(&self) -> Result<(), StoreError> { + self.delete(SNAP_STATE_TABLE, SnapStateIndex::StateTrieRootCheckpoint) + } + + fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError> { + self.write( + SNAP_STATE_TABLE, + SnapStateIndex::StateTrieKeyCheckpoint, + last_key.encode_to_vec(), + ) + } + + fn get_state_trie_key_checkpoint(&self) -> Result, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::StateTrieKeyCheckpoint)? + .map(|rlp| RLPDecode::decode(&rlp.value())) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_state_trie_key_checkpoint(&self) -> Result<(), StoreError> { + self.delete(SNAP_STATE_TABLE, SnapStateIndex::StateTrieKeyCheckpoint) + } + + fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { + self.write( + SNAP_STATE_TABLE, + SnapStateIndex::PendingStorageHealAccounts, + accounts.encode_to_vec(), + ) + } + + fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { + self.read(SNAP_STATE_TABLE, SnapStateIndex::PendingStorageHealAccounts)? + .map(|rlp| RLPDecode::decode(&rlp.value())) + .transpose() + .map_err(StoreError::RLPDecode) + } + + fn clear_pending_storage_heal_accounts(&self) -> Result<(), StoreError> { + self.delete(SNAP_STATE_TABLE, SnapStateIndex::PendingStorageHealAccounts) + } + fn is_synced(&self) -> Result { match self.read(CHAIN_DATA_TABLE, ChainDataIndex::IsSynced)? { None => Err(StoreError::Custom("Sync status not found".to_string())), @@ -755,6 +834,47 @@ impl redb::Key for ChainDataIndex { } } +impl redb::Value for SnapStateIndex { + type SelfType<'a> + = SnapStateIndex + where + Self: 'a; + + type AsBytes<'a> + = [u8; 1] + where + Self: 'a; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + data[0].into() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + [*value as u8] + } + + fn type_name() -> redb::TypeName { + TypeName::new("SnapStateIndex") + } +} + +impl redb::Key for SnapStateIndex { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + data1.cmp(data2) + } +} + pub fn init_db() -> Result { let db = Database::create("ethrex.redb")?; @@ -770,6 +890,7 @@ pub fn init_db() -> Result { table_creation_txn.open_table(PAYLOADS_TABLE)?; table_creation_txn.open_table(PENDING_BLOCKS_TABLE)?; table_creation_txn.open_multimap_table(TRANSACTION_LOCATIONS_TABLE)?; + table_creation_txn.open_table(SNAP_STATE_TABLE)?; table_creation_txn.commit()?; Ok(db) diff --git a/crates/storage/store/engines/utils.rs b/crates/storage/store/engines/utils.rs index 7e57aad04b..cefd4f0fe6 100644 --- a/crates/storage/store/engines/utils.rs +++ b/crates/storage/store/engines/utils.rs @@ -1,5 +1,5 @@ /// Represents the key for each unique value of the chain data stored in the db -// (TODO: Remove this comment once full) Will store chain-specific data such as chain id and latest finalized/pending/safe block number +// Stores chain-specific data such as chain id and latest finalized/pending/safe block number #[derive(Debug, Copy, Clone)] pub enum ChainDataIndex { ChainConfig = 0, @@ -36,3 +36,38 @@ impl From for ChainDataIndex { } } } + +/// Represents the key for each unique value of the snap state stored in the db +// Stores the snap state from previous sync cycles. Currently stores the header & state trie download checkpoint +//, but will later on also include the body download checkpoint and the last pivot used +#[derive(Debug, Copy, Clone)] +pub enum SnapStateIndex { + // Hash of the last downloaded header in a previous sync cycle that was aborted + HeaderDownloadCheckpoint = 0, + // Current root hash of the latest State Trie (Used for both fetch & heal) + StateTrieRootCheckpoint = 1, + // Accounts which storage needs healing + PendingStorageHealAccounts = 2, + // Last key fetched from the state trie + StateTrieKeyCheckpoint = 3, +} + +impl From for SnapStateIndex { + fn from(value: u8) -> Self { + match value { + x if x == SnapStateIndex::HeaderDownloadCheckpoint as u8 => { + SnapStateIndex::HeaderDownloadCheckpoint + } + x if x == SnapStateIndex::StateTrieRootCheckpoint as u8 => { + SnapStateIndex::StateTrieRootCheckpoint + } + x if x == SnapStateIndex::PendingStorageHealAccounts as u8 => { + SnapStateIndex::PendingStorageHealAccounts + } + x if x == SnapStateIndex::StateTrieKeyCheckpoint as u8 => { + SnapStateIndex::StateTrieKeyCheckpoint + } + _ => panic!("Invalid value when casting to SnapDataIndex: {}", value), + } + } +} diff --git a/crates/storage/store/storage.rs b/crates/storage/store/storage.rs index cc14f0b562..da8863b59b 100644 --- a/crates/storage/store/storage.rs +++ b/crates/storage/store/storage.rs @@ -1003,6 +1003,54 @@ impl Store { .is_some()) } + /// Sets the hash of the last header downloaded during a snap sync + pub fn set_header_download_checkpoint(&self, block_hash: BlockHash) -> Result<(), StoreError> { + self.engine.set_header_download_checkpoint(block_hash) + } + + /// Gets the hash of the last header downloaded during a snap sync + pub fn get_header_download_checkpoint(&self) -> Result, StoreError> { + self.engine.get_header_download_checkpoint() + } + + /// Sets the current state root of the state trie being rebuilt during snap sync + pub fn set_state_trie_root_checkpoint(&self, current_root: H256) -> Result<(), StoreError> { + self.engine.set_state_trie_root_checkpoint(current_root) + } + + /// Gets the current state root of the state trie being rebuilt during snap sync + pub fn get_state_trie_root_checkpoint(&self) -> Result, StoreError> { + self.engine.get_state_trie_root_checkpoint() + } + + /// Sets the last key fetched from the state trie being fetched during snap sync + pub fn set_state_trie_key_checkpoint(&self, last_key: H256) -> Result<(), StoreError> { + self.engine.set_state_trie_key_checkpoint(last_key) + } + + /// Gets the last key fetched from the state trie being fetched during snap sync + pub fn get_state_trie_key_checkpoint(&self) -> Result, StoreError> { + self.engine.get_state_trie_key_checkpoint() + } + + /// Sets the list of account hashes whose storage needs healing + pub fn set_pending_storage_heal_accounts(&self, accounts: Vec) -> Result<(), StoreError> { + self.engine.set_pending_storage_heal_accounts(accounts) + } + + /// Gets the list of account hashes whose storage needs healing + pub fn get_pending_storage_heal_accounts(&self) -> Result>, StoreError> { + self.engine.get_pending_storage_heal_accounts() + } + + /// Clears all checkpoints written during a snap sync + pub fn clear_snap_state(&self) -> Result<(), StoreError> { + self.engine.clear_header_download_checkpoint()?; + self.engine.clear_pending_storage_heal_accounts()?; + self.engine.clear_state_trie_root_checkpoint()?; + self.engine.clear_state_trie_key_checkpoint() + } + pub fn is_synced(&self) -> Result { self.engine.is_synced() } diff --git a/crates/storage/trie/db/utils.rs b/crates/storage/trie/db/utils.rs index c1ab27e82a..10141a2e5a 100644 --- a/crates/storage/trie/db/utils.rs +++ b/crates/storage/trie/db/utils.rs @@ -1,4 +1,4 @@ -#[cfg(feature = "libmdbx")] +#[cfg(any(feature = "libmdbx", feature = "redb"))] // In order to use NodeHash as key in a dupsort table we must encode it into a fixed size type pub fn node_hash_to_fixed_size(node_hash: Vec) -> [u8; 33] { // keep original len so we can re-construct it later