diff --git a/node/src/actors/chain_manager/handlers.rs b/node/src/actors/chain_manager/handlers.rs index 6dfcdacc4..3df33536e 100644 --- a/node/src/actors/chain_manager/handlers.rs +++ b/node/src/actors/chain_manager/handlers.rs @@ -165,7 +165,7 @@ impl Handler> for ChainManager { }) = best_candidate { // Persist block and update ChainState - self.consolidate_block(ctx, block, utxo_diff, false); + self.consolidate_block(ctx, block, utxo_diff, None); } else if msg.checkpoint > 0 { let previous_epoch = msg.checkpoint - 1; log::warn!( @@ -309,7 +309,7 @@ impl Handler for ChainManager { if msg.blocks.len() == 1 && msg.blocks[0].hash() == consensus_constants.genesis_hash { let block = msg.blocks.into_iter().next().unwrap(); - match act.process_requested_block(ctx, block, false) { + match act.process_requested_block(ctx, block, None) { Ok(()) => { log::debug!("Successfully consolidated genesis block"); @@ -1024,7 +1024,7 @@ impl Handler for ChainManager { } let mut consolidated_consensus_candidate = false; for consensus_block in candidates { - match self.process_requested_block(ctx, consensus_block, false) { + match self.process_requested_block(ctx, consensus_block, None) { Ok(()) => { consolidated_consensus_candidate = true; log::info!( @@ -1720,12 +1720,20 @@ impl Handler for ChainManager { fn handle(&mut self, msg: Rewind, ctx: &mut Self::Context) -> Self::Result { // Save list of blocks that are known to be valid - let old_block_chain: VecDeque<(Epoch, Hash)> = self - .chain_state - .block_chain - .range(0..=msg.epoch) - .map(|(k, v)| (*k, *v)) - .collect(); + let old_block_chain: VecDeque<(Epoch, Hash)> = if let Some(epoch) = msg.epoch { + self.chain_state + .block_chain + .range(0..=epoch) + .map(|(k, v)| (*k, *v)) + .collect() + } else { + // If rewind epoch is None, rewind to the latest block + self.chain_state + .block_chain + .range(0..) + .map(|(k, v)| (*k, *v)) + .collect() + }; self.delete_chain_state_and_reinitialize() .map(|_res, act, ctx| { @@ -1741,7 +1749,7 @@ impl Handler for ChainManager { .into_actor(act) .map(|_res, _act, _ctx| ()) .spawn(ctx); - act.resync_from_storage(old_block_chain, ctx, |act, ctx| { + act.resync_from_storage(old_block_chain, msg, ctx, |act, ctx| { // After the resync is done: // Persist chain state to storage ctx.wait( diff --git a/node/src/actors/chain_manager/mod.rs b/node/src/actors/chain_manager/mod.rs index 4ed4dd92b..4e9c0689b 100644 --- a/node/src/actors/chain_manager/mod.rs +++ b/node/src/actors/chain_manager/mod.rs @@ -82,7 +82,7 @@ use crate::{ json_rpc::JsonRpcServer, messages::{ AddItem, AddItems, AddTransaction, Anycast, BlockNotify, Broadcast, DropOutboundPeers, - GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, RemoveAddressesFromTried, + GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, RemoveAddressesFromTried, Rewind, SendInventoryItem, SendInventoryRequest, SendLastBeacon, SendSuperBlockVote, SetLastBeacon, SetSuperBlockTargetBeacon, StoreInventoryItem, SuperBlockNotify, }, @@ -376,6 +376,7 @@ impl ChainManager { fn resync_from_storage( &mut self, mut block_list: VecDeque<(Epoch, Hash)>, + msg: Rewind, ctx: &mut Context, done: F, ) where @@ -403,7 +404,7 @@ impl ChainManager { last_epoch, hash ); - act.process_requested_block(ctx, block, true) + act.process_requested_block(ctx, block, Some(&msg)) .expect("resync from storage fail"); // We need to persist the chain state periodically, otherwise the entire // UTXO set will be in memory, consuming a huge amount of memory. @@ -413,7 +414,7 @@ impl ChainManager { .wait(ctx); } // Recursion - act.resync_from_storage(block_list, ctx, done); + act.resync_from_storage(block_list, msg, ctx, done); } Ok(Err(e)) => { panic!("{:?}", e); @@ -496,7 +497,7 @@ impl ChainManager { &mut self, ctx: &mut Context, block: Block, - resynchronizing: bool, + rewind: Option<&Rewind>, ) -> Result<(), failure::Error> { if let ( Some(epoch_constants), @@ -538,12 +539,12 @@ impl ChainManager { secp_ctx, block_number, &chain_info.consensus_constants, - resynchronizing, + rewind, &active_wips, )?; // Persist block and update ChainState - self.consolidate_block(ctx, block, utxo_diff, resynchronizing); + self.consolidate_block(ctx, block, utxo_diff, rewind); Ok(()) } else { @@ -672,7 +673,7 @@ impl ChainManager { .expect("No initialized SECP256K1 context"), self.chain_state.block_number(), &chain_info.consensus_constants, - false, + None, &active_wips, ) { Ok(utxo_diff) => { @@ -721,7 +722,7 @@ impl ChainManager { ctx: &mut Context, block: Block, utxo_diff: Diff, - resynchronizing: bool, + rewind: Option<&Rewind>, ) { // Update chain_info and reputation_engine let own_pkh = match self.own_pkh { @@ -732,6 +733,12 @@ impl ChainManager { } }; + let persist_to_storage = if let Some(rewind) = rewind { + rewind.mode.write_items_to_storage + } else { + true + }; + match self.chain_state { ChainState { chain_info: Some(ref mut chain_info), @@ -826,7 +833,7 @@ impl ChainManager { let to_be_stored = self.chain_state.data_request_pool.finished_data_requests(); - if !resynchronizing { + if persist_to_storage { self.persist_data_requests(ctx, to_be_stored); } @@ -844,7 +851,7 @@ impl ChainManager { }) } - if !resynchronizing { + if persist_to_storage { self.persist_items( ctx, vec![StoreInventoryItem::Block(Box::new(block))], @@ -876,7 +883,7 @@ impl ChainManager { show_tally_info(dr_info.tally.as_ref().unwrap(), block_epoch); } - if !resynchronizing { + if persist_to_storage { self.persist_data_requests(ctx, to_be_stored); } @@ -902,7 +909,7 @@ impl ChainManager { // getTransaction will show the content without any warning that the block // is not on the main chain. To fix this we could remove forked blocks when // a reorganization is detected. - if !resynchronizing { + if persist_to_storage { self.persist_items( ctx, vec![StoreInventoryItem::Block(Box::new(block.clone()))], @@ -1916,7 +1923,7 @@ impl ChainManager { let mut num_processed_blocks = 0; for block in blocks.iter() { - if let Err(e) = self.process_requested_block(ctx, block.clone(), false) { + if let Err(e) = self.process_requested_block(ctx, block.clone(), None) { log::error!("Error processing block: {}", e); if num_processed_blocks > 0 { // Restore only in case there were several blocks consolidated before @@ -2420,10 +2427,15 @@ pub fn process_validations( secp_ctx: &CryptoEngine, block_number: u32, consensus_constants: &ConsensusConstants, - resynchronizing: bool, + rewind: Option<&Rewind>, active_wips: &ActiveWips, ) -> Result { - if !resynchronizing { + let validate_signatures = if let Some(rewind) = rewind { + rewind.mode.validate_signatures + } else { + true + }; + if validate_signatures { let mut signatures_to_verify = vec![]; validate_block( block, @@ -2452,7 +2464,7 @@ pub fn process_validations( active_wips, )?; - if !resynchronizing { + if validate_signatures { verify_signatures(signatures_to_verify, vrf_ctx, secp_ctx)?; } diff --git a/node/src/actors/json_rpc/json_rpc_methods.rs b/node/src/actors/json_rpc/json_rpc_methods.rs index b425c7a43..b51da1bce 100644 --- a/node/src/actors/json_rpc/json_rpc_methods.rs +++ b/node/src/actors/json_rpc/json_rpc_methods.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, convert::TryFrom, + future::Future, net::SocketAddr, sync::atomic::{AtomicUsize, Ordering}, sync::Arc, @@ -9,6 +10,8 @@ use std::{ use actix::MailboxError; #[cfg(not(test))] use actix::SystemService; +use futures::FutureExt; +use futures_util::compat::Compat; use itertools::Itertools; use jsonrpc_core::{MetaIoHandler, Params, Value}; use jsonrpc_pubsub::{PubSubHandler, Session, Subscriber, SubscriptionId}; @@ -35,7 +38,8 @@ use crate::{ GetBalance, GetBlocksEpochRange, GetConsolidatedPeers, GetDataRequestInfo, GetEpoch, GetHighestCheckpointBeacon, GetItemBlock, GetItemSuperblock, GetItemTransaction, GetKnownPeers, GetMemoryTransaction, GetMempool, GetNodeStats, GetReputation, - GetSignalingInfo, GetState, GetUtxoInfo, InitializePeers, IsConfirmedBlock, Rewind, + GetSignalingInfo, GetState, GetSupplyInfo, GetUtxoInfo, InitializePeers, + IsConfirmedBlock, Rewind, RewindMode, }, peers_manager::PeersManager, sessions_manager::SessionsManager, @@ -47,10 +51,6 @@ use super::Subscriptions; #[cfg(test)] use self::mock_actix::SystemService; -use crate::actors::messages::GetSupplyInfo; -use futures::FutureExt; -use futures_util::compat::Compat; -use std::future::Future; type JsonRpcResult = Result; @@ -238,7 +238,7 @@ pub fn jsonrpc_io_handler( enable_sensitive_methods, "rewind", params, - |params| rewind(params.parse()), + rewind, ))) }); @@ -1648,15 +1648,48 @@ pub async fn get_consensus_constants(params: Result<(), jsonrpc_core::Error>) -> } /// Rewind -pub async fn rewind(params: Result<(Epoch,), jsonrpc_core::Error>) -> JsonRpcResult { - let epoch = match params { - Ok((epoch,)) => epoch, - Err(e) => return Err(e), +pub async fn rewind(params: Params) -> JsonRpcResult { + let rewind_params: Rewind; + + // Handle parameters as an array with an epoch field, or an object which is deserialized as a + // Rewind struct + if let Params::Array(params) = params { + if params.len() != 1 { + return Err(jsonrpc_core::Error::invalid_params( + "Argument of `rewind` must be either a one-element array or an object", + )); + } else if let Some(Value::Number(epoch)) = params.get(0) { + // Convert Number to Epoch, return error on out of range + match epoch.as_u64().and_then(|epoch| Epoch::try_from(epoch).ok()) { + Some(epoch) => { + rewind_params = Rewind { + epoch: Some(epoch), + mode: RewindMode::default(), + } + } + None => { + return Err(jsonrpc_core::Error::invalid_params( + "First argument of `rewind` must have type `Epoch`", + )); + } + } + } else { + return Err(jsonrpc_core::Error::invalid_params( + "First argument of `rewind` must have type `Epoch`", + )); + }; + } else if let Params::Map(_map) = ¶ms { + let parsed_params = params.parse()?; + rewind_params = parsed_params; + } else { + return Err(jsonrpc_core::Error::invalid_params( + "Argument of `rewind` must be either a one-element array or an object", + )); }; let chain_manager_addr = ChainManager::from_registry(); chain_manager_addr - .send(Rewind { epoch }) + .send(rewind_params) .map(|res| { res.map_err(internal_error) .and_then(|success| match success { diff --git a/node/src/actors/messages.rs b/node/src/actors/messages.rs index e4d9e3f71..9c12d5586 100644 --- a/node/src/actors/messages.rs +++ b/node/src/actors/messages.rs @@ -378,10 +378,27 @@ impl Message for IsConfirmedBlock { type Result = Result; } -/// Rewind +/// Additional configuration for the rewind method +#[derive(Debug, Default, Clone, Deserialize, Serialize)] +pub struct RewindMode { + /// Validate block and transaction signatures + #[serde(default)] + pub validate_signatures: bool, + /// Write all the blocks, transactions, and data request reports to storage, regardless of + /// whether they already exist or not + #[serde(default)] + pub write_items_to_storage: bool, +} + +/// Rewind chain state back to some epoch +#[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct Rewind { - /// Epoch - pub epoch: u32, + /// Epoch of the last block that will be consolidated by the rewind method + #[serde(default)] + pub epoch: Option, + /// Additional configuration for the rewind method + #[serde(default)] + pub mode: RewindMode, } impl Message for Rewind {