diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f3a88ca6a26..4badf6b0ab0 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -39,6 +39,7 @@ use lightning::events::EventHandler; use lightning::events::EventsProvider; use lightning::events::ReplayEvent; use lightning::events::{Event, PathFailure}; +use lightning::util::ser::Writeable; use lightning::ln::channelmanager::AChannelManager; use lightning::ln::msgs::OnionMessageHandler; @@ -53,7 +54,13 @@ use lightning::sign::ChangeDestinationSourceSync; use lightning::sign::EntropySource; use lightning::sign::OutputSpender; use lightning::util::logger::Logger; -use lightning::util::persist::{KVStore, Persister}; +use lightning::util::persist::{ + KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning::util::sweep::OutputSweeper; #[cfg(feature = "std")] use lightning::util::sweep::OutputSweeperSync; @@ -324,15 +331,25 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri true } +macro_rules! maybe_await { + (true, $e:expr) => { + $e.await + }; + (false, $e:expr) => { + $e + }; +} + macro_rules! define_run_body { ( - $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $kv_store: ident, + $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, $onion_messenger: ident, $process_onion_message_handler_events: expr, $peer_manager: ident, $gossip_sync: ident, $process_sweeper: expr, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr, + $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr, $async_persist: tt, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.get_cm().timer_tick_occurred(); @@ -404,7 +421,12 @@ macro_rules! define_run_body { if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&$channel_manager)?; + maybe_await!($async_persist, $kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &$channel_manager.get_cm().encode(), + ))?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { @@ -465,7 +487,12 @@ macro_rules! define_run_body { log_trace!($logger, "Persisting network graph."); } - if let Err(e) = $persister.persist_graph(network_graph) { + if let Err(e) = maybe_await!($async_persist, $kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + )) { log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) } @@ -493,7 +520,12 @@ macro_rules! define_run_body { } else { log_trace!($logger, "Persisting scorer"); } - if let Err(e) = $persister.persist_scorer(&scorer) { + if let Err(e) = maybe_await!($async_persist, $kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + )) { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } @@ -516,16 +548,31 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&$channel_manager)?; + maybe_await!($async_persist, $kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &$channel_manager.get_cm().encode(), + ))?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { - $persister.persist_scorer(&scorer)?; + maybe_await!($async_persist, $kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ))?; } // Persist NetworkGraph on exit if let Some(network_graph) = $gossip_sync.network_graph() { - $persister.persist_graph(network_graph)?; + maybe_await!($async_persist, $kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ))?; } Ok(()) @@ -682,22 +729,30 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// ``` /// # use lightning::io; /// # use lightning::events::ReplayEvent; -/// # use lightning::util::sweep::OutputSweeper; /// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; /// # use lightning_background_processor::{process_events_async, GossipSync}; +/// # use core::future::Future; +/// # use core::pin::Pin; /// # struct Logger {} /// # impl lightning::util::logger::Logger for Logger { /// # fn log(&self, _record: lightning::util::logger::Record) {} /// # } -/// # struct Store {} -/// # impl lightning::util::persist::KVStore for Store { +/// # struct StoreSync {} +/// # impl lightning::util::persist::KVStoreSync for StoreSync { /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } +/// # struct Store {} +/// # impl lightning::util::persist::KVStore for Store { +/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin, io::Error>> + 'static + Send>> { todo!() } +/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin> + 'static + Send>> { todo!() } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin> + 'static + Send>> { todo!() } +/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin, io::Error>> + 'static + Send>> { todo!() } +/// # } /// # struct EventHandler {} /// # impl EventHandler { /// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) } @@ -708,22 +763,22 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } /// # fn disconnect_socket(&mut self) {} /// # } -/// # type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc, Arc>; +/// # type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc, Arc>; /// # type NetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type P2PGossipSync
    = lightning::routing::gossip::P2PGossipSync, Arc
      , Arc>; /// # type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager, B, FE, Logger>; /// # type OnionMessenger = lightning::onion_message::messenger::OnionMessenger, Arc, Arc, Arc>, Arc, Arc, Arc>>, Arc>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>; /// # type LiquidityManager = lightning_liquidity::LiquidityManager, Arc>, Arc>; /// # type Scorer = RwLock, Arc>>; -/// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
        , Logger, F, Store>; -/// # +/// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
          , Logger, F, StoreSync>; +/// # type OutputSweeper = lightning::util::sweep::OutputSweeper, Arc, Arc, Arc, Arc, Arc, Arc>; +/// /// # struct Node< /// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, /// # F: lightning::chain::Filter + Send + Sync + 'static, /// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, /// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static, -/// # K: lightning::util::persist::KVStore + Send + Sync + 'static, /// # O: lightning::sign::OutputSpender + Send + Sync + 'static, /// # > { /// # peer_manager: Arc>, @@ -736,7 +791,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # persister: Arc, /// # logger: Arc, /// # scorer: Arc, -/// # sweeper: Arc, Arc, Arc, Arc, Arc, Arc, Arc>>, +/// # sweeper: Arc>, /// # } /// # /// # async fn setup_background_processing< @@ -745,9 +800,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, /// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static, -/// # K: lightning::util::persist::KVStore + Send + Sync + 'static, /// # O: lightning::sign::OutputSpender + Send + Sync + 'static, -/// # >(node: Node) { +/// # >(node: Node) { /// let background_persister = Arc::clone(&node.persister); /// let background_event_handler = Arc::clone(&node.event_handler); /// let background_chain_mon = Arc::clone(&node.chain_monitor); @@ -819,7 +873,6 @@ pub async fn process_events_async< P: 'static + Deref, EventHandlerFuture: core::future::Future>, EventHandler: Fn(Event) -> EventHandlerFuture, - PS: 'static + Deref + Send, ES: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P, ES>> @@ -841,7 +894,7 @@ pub async fn process_events_async< Sleeper: Fn(Duration) -> SleepFuture, FetchTime: Fn() -> Option, >( - persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, + kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, @@ -853,7 +906,6 @@ where F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, P::Target: 'static + Persist<::Signer>, - PS::Target: 'static + Persister<'a, CM, L, S>, ES::Target: 'static + EntropySource, CM::Target: AChannelManager, OM::Target: AOnionMessenger, @@ -869,7 +921,7 @@ where let event_handler = &event_handler; let scorer = &scorer; let logger = &logger; - let persister = &persister; + let kv_store = &kv_store; let fetch_time = &fetch_time; // We should be able to drop the Box once our MSRV is 1.68 Box::pin(async move { @@ -880,7 +932,15 @@ where if let Some(duration_since_epoch) = fetch_time() { if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&*scorer) { + if let Err(e) = kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await + { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); // We opt not to abort early on persistence failure here as persisting // the scorer is non-critical and we still hope that it will have @@ -895,7 +955,7 @@ where }; let mut batch_delay = BatchDelay::new(); define_run_body!( - persister, + kv_store, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, @@ -968,30 +1028,105 @@ where mobile_interruptable_platform, fetch_time, batch_delay, + true, ) } +/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for +/// synchronous background persistence. +pub async fn process_events_async_with_kv_store_sync< + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref, + G: 'static + Deref>, + L: 'static + Deref + Send + Sync, + P: 'static + Deref, + EventHandlerFuture: core::future::Future>, + EventHandler: Fn(Event) -> EventHandlerFuture, + ES: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P, ES>> + + Send + + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref, + PGS: 'static + Deref>, + RGS: 'static + Deref>, + PM: 'static + Deref, + LM: 'static + Deref, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref, L, O>>, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + SleepFuture: core::future::Future + core::marker::Unpin, + Sleeper: Fn(Duration) -> SleepFuture, + FetchTime: Fn() -> Option, +>( + kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, + onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, + liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, + sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, +) -> Result<(), lightning::io::Error> +where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + O::Target: 'static + OutputSpender, + D::Target: 'static + ChangeDestinationSource, + K::Target: 'static + KVStoreSync, +{ + let kv_store = KVStoreSyncWrapper(kv_store); + process_events_async( + kv_store, + event_handler, + chain_monitor, + channel_manager, + onion_messenger, + gossip_sync, + peer_manager, + liquidity_manager, + sweeper, + logger, + scorer, + sleeper, + mobile_interruptable_platform, + fetch_time, + ) + .await +} + #[cfg(feature = "std")] impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level /// documentation]. /// /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or - /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling + /// [`KVStoreSync`] returns an error. In case of an error, the error is retrieved by calling /// either [`join`] or [`stop`]. /// /// # Data Persistence /// - /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or + /// [`KVStoreSync`] is responsible for writing out the [`ChannelManager`] to disk, and/or /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's /// provided implementation. /// - /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if + /// [`KVStoreSync`] is also responsible for writing out the [`NetworkGraph`] to disk, if /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. /// See the `lightning-persister` crate for LDK's provided implementation. /// - /// Typically, users should either implement [`Persister::persist_manager`] to never return an + /// Typically, users should either implement [`KVStoreSync`] to never return an /// error or call [`join`] and handle any error that may arise. For the latter case, /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error. /// @@ -1013,8 +1148,6 @@ impl BackgroundProcessor { /// [`stop`]: Self::stop /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable - /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager - /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable pub fn start< @@ -1027,7 +1160,6 @@ impl BackgroundProcessor { L: 'static + Deref + Send, P: 'static + Deref, EH: 'static + EventHandler + Send, - PS: 'static + Deref + Send, ES: 'static + Deref + Send, M: 'static + Deref< @@ -1045,10 +1177,10 @@ impl BackgroundProcessor { SC: for<'b> WriteableScore<'b>, D: 'static + Deref, O: 'static + Deref, - K: 'static + Deref, + K: 'static + Deref + Send, OS: 'static + Deref> + Send, >( - persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM, onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, ) -> Self @@ -1059,7 +1191,6 @@ impl BackgroundProcessor { F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, P::Target: 'static + Persist<::Signer>, - PS::Target: 'static + Persister<'a, CM, L, S>, ES::Target: 'static + EntropySource, CM::Target: AChannelManager, OM::Target: AOnionMessenger, @@ -1067,7 +1198,7 @@ impl BackgroundProcessor { LM::Target: ALiquidityManager, D::Target: ChangeDestinationSourceSync, O::Target: 'static + OutputSpender, - K::Target: 'static + KVStore, + K::Target: 'static + KVStoreSync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = Arc::clone(&stop_thread); @@ -1084,7 +1215,12 @@ impl BackgroundProcessor { .expect("Time should be sometime after 1970"); if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&scorer) { + if let Err(e) = kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } @@ -1093,7 +1229,7 @@ impl BackgroundProcessor { }; let mut batch_delay = BatchDelay::new(); define_run_body!( - persister, + kv_store, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, @@ -1155,6 +1291,7 @@ impl BackgroundProcessor { ) }, batch_delay, + false, ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } @@ -1243,7 +1380,8 @@ mod tests { use lightning::types::payment::PaymentHash; use lightning::util::config::UserConfig; use lightning::util::persist::{ - KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1314,7 +1452,7 @@ mod tests { Arc, Arc, Arc, - Arc, + Arc, Arc, >; @@ -1372,7 +1510,7 @@ mod tests { >, liquidity_manager: Arc, chain_monitor: Arc, - kv_store: Arc, + kv_store: Arc, tx_broadcaster: Arc, network_graph: Arc>>, logger: Arc, @@ -1384,7 +1522,7 @@ mod tests { Arc, Arc, Arc, - Arc, + Arc, Arc, Arc, >, @@ -1476,9 +1614,13 @@ mod tests { fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { Self { scorer_error: Some((error, message)), ..self } } + + pub fn get_data_dir(&self) -> PathBuf { + self.kv_store.get_data_dir() + } } - impl KVStore for Persister { + impl KVStoreSync for Persister { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result> { @@ -1720,7 +1862,7 @@ mod tests { )); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = - Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); + Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new( @@ -2164,12 +2306,13 @@ mod tests { open_channel!(nodes[0], nodes[1], 100000); let data_dir = nodes[0].kv_store.get_data_dir(); - let persister = Arc::new( + let kv_store_sync = Arc::new( Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); let bp_future = super::process_events_async( - persister, + kv_store, |_: _| async { Ok(()) }, Arc::clone(&nodes[0].chain_monitor), Arc::clone(&nodes[0].node), @@ -2672,11 +2815,13 @@ mod tests { let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async"); let data_dir = nodes[0].kv_store.get_data_dir(); - let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); + let kv_store_sync = + Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( - persister, + kv_store, |_: _| async { Ok(()) }, Arc::clone(&nodes[0].chain_monitor), Arc::clone(&nodes[0].node), @@ -2885,12 +3030,13 @@ mod tests { let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async"); let data_dir = nodes[0].kv_store.get_data_dir(); - let persister = Arc::new(Persister::new(data_dir)); + let kv_store_sync = Arc::new(Persister::new(data_dir)); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( - persister, + kv_store, event_handler, Arc::clone(&nodes[0].chain_monitor), Arc::clone(&nodes[0].node), diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 86c19de2144..9f490eb6fb2 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -2,7 +2,7 @@ use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; use lightning::types::string::PrintableString; -use lightning::util::persist::{KVStore, MigratableKVStore}; +use lightning::util::persist::{KVStoreSync, MigratableKVStore}; use std::collections::HashMap; use std::fs; @@ -33,7 +33,7 @@ fn path_to_windows_str>(path: &T) -> Vec { // The number of read/write/remove/list operations after which we clean up our `locks` HashMap. const GC_LOCK_INTERVAL: usize = 25; -/// A [`KVStore`] implementation that writes to and reads from the file system. +/// A [`KVStoreSync`] implementation that writes to and reads from the file system. pub struct FilesystemStore { data_dir: PathBuf, tmp_file_counter: AtomicUsize, @@ -92,7 +92,7 @@ impl FilesystemStore { } } -impl KVStore for FilesystemStore { +impl KVStoreSync for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result> { diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs index c6617e8be1e..18d643c7443 100644 --- a/lightning-persister/src/test_utils.rs +++ b/lightning-persister/src/test_utils.rs @@ -4,7 +4,7 @@ use lightning::ln::functional_test_utils::{ create_network, create_node_cfgs, create_node_chanmgrs, send_payment, }; use lightning::util::persist::{ - migrate_kv_store_data, read_channel_monitors, KVStore, MigratableKVStore, + migrate_kv_store_data, read_channel_monitors, KVStoreSync, MigratableKVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, }; use lightning::util::test_utils; @@ -12,7 +12,7 @@ use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event use std::panic::RefUnwindSafe; -pub(crate) fn do_read_write_remove_list_persist(kv_store: &K) { +pub(crate) fn do_read_write_remove_list_persist(kv_store: &K) { let data = [42u8; 32]; let primary_namespace = "testspace"; @@ -113,7 +113,7 @@ pub(crate) fn do_test_data_migration // Integration-test the given KVStore implementation. Test relaying a few payments and check that // the persisted data is updated the appropriate number of times. -pub(crate) fn do_test_store(store_0: &K, store_1: &K) { +pub(crate) fn do_test_store(store_0: &K, store_1: &K) { let chanmon_cfgs = create_chanmon_cfgs(2); let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new( diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index e8e1c474736..e2dcb890fd4 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1829,7 +1829,7 @@ where /// - Perform any periodic channel and payment checks by calling [`timer_tick_occurred`] roughly /// every minute /// - Persist to disk whenever [`get_and_clear_needs_persistence`] returns `true` using a -/// [`Persister`] such as a [`KVStore`] implementation +/// [`KVStoreSync`] implementation /// - Handle [`Event`]s obtained via its [`EventsProvider`] implementation /// /// The [`Future`] returned by [`get_event_or_persistence_needed_future`] is useful in determining @@ -2411,8 +2411,7 @@ where /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events /// [`timer_tick_occurred`]: Self::timer_tick_occurred /// [`get_and_clear_needs_persistence`]: Self::get_and_clear_needs_persistence -/// [`Persister`]: crate::util::persist::Persister -/// [`KVStore`]: crate::util::persist::KVStore +/// [`KVStoreSync`]: crate::util::persist::KVStoreSync /// [`get_event_or_persistence_needed_future`]: Self::get_event_or_persistence_needed_future /// [`lightning-block-sync`]: https://docs.rs/lightning_block_sync/latest/lightning_block_sync /// [`lightning-transaction-sync`]: https://docs.rs/lightning_transaction_sync/latest/lightning_transaction_sync diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 97a7687cb7b..69415d112f3 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -4,16 +4,19 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! This module contains a simple key-value store trait [`KVStore`] that +//! This module contains a simple key-value store trait [`KVStoreSync`] that //! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`], //! and [`ChannelMonitor`] all in one place. //! //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +//! [`NetworkGraph`]: crate::routing::gossip::NetworkGraph use bitcoin::hashes::hex::FromHex; use bitcoin::{BlockHash, Txid}; use core::cmp; +use core::future::Future; use core::ops::Deref; +use core::pin::Pin; use core::str::FromStr; use crate::prelude::*; @@ -24,10 +27,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; -use crate::ln::channelmanager::AChannelManager; use crate::ln::types::ChannelId; -use crate::routing::gossip::NetworkGraph; -use crate::routing::scoring::WriteableScore; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; @@ -65,17 +65,29 @@ pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archiv pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The primary namespace under which the [`NetworkGraph`] will be persisted. +/// +/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; /// The secondary namespace under which the [`NetworkGraph`] will be persisted. +/// +/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph pub const NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The key under which the [`NetworkGraph`] will be persisted. +/// +/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph"; /// The primary namespace under which the [`WriteableScore`] will be persisted. +/// +/// [`WriteableScore`]: crate::routing::scoring::WriteableScore pub const SCORER_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; /// The secondary namespace under which the [`WriteableScore`] will be persisted. +/// +/// [`WriteableScore`]: crate::routing::scoring::WriteableScore pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The key under which the [`WriteableScore`] will be persisted. +/// +/// [`WriteableScore`]: crate::routing::scoring::WriteableScore pub const SCORER_PERSISTENCE_KEY: &str = "scorer"; /// The primary namespace under which [`OutputSweeper`] state will be persisted. @@ -98,6 +110,79 @@ pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper"; /// updates applied to be current) with another implementation. pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2]; +/// A synchronous version of the [`KVStore`] trait. +pub trait KVStoreSync { + /// A synchronous version of the [`KVStore::read`] method. + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error>; + /// A synchronous version of the [`KVStore::write`] method. + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Result<(), io::Error>; + /// A synchronous version of the [`KVStore::remove`] method. + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), io::Error>; + /// A synchronous version of the [`KVStore::list`] method. + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error>; +} + +/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. It is not necessary to use this type +/// directly. +pub struct KVStoreSyncWrapper(pub K) +where + K::Target: KVStoreSync; + +impl Deref for KVStoreSyncWrapper +where + K::Target: KVStoreSync, +{ + type Target = Self; + fn deref(&self) -> &Self::Target { + self + } +} + +impl KVStore for KVStoreSyncWrapper +where + K::Target: KVStoreSync, +{ + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, io::Error>> + 'static + Send>> { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + Box::pin(async move { res }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Pin> + 'static + Send>> { + let res = self.0.write(primary_namespace, secondary_namespace, key, buf); + + Box::pin(async move { res }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + 'static + Send>> { + let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + + Box::pin(async move { res }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, io::Error>> + 'static + Send>> { + let res = self.0.list(primary_namespace, secondary_namespace); + + Box::pin(async move { res }) + } +} + /// Provides an interface that allows storage and retrieval of persisted values that are associated /// with given keys. /// @@ -129,20 +214,22 @@ pub trait KVStore { /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Result, io::Error>; - /// Persists the given data under the given `key`. + ) -> Pin, io::Error>> + 'static + Send>>; + /// Persists the given data under the given `key`. Note that the order of multiple writes calls needs to be retained + /// when persisting asynchronously. One possible way to accomplish this is by assigning a version number to each + /// write before returning the future, and then during asynchronous execution, ensuring that the writes are executed in + /// the correct order. /// - /// Will create the given `primary_namespace` and `secondary_namespace` if not already present - /// in the store. + /// Will create the given `primary_namespace` and `secondary_namespace` if not already present in the store. fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], - ) -> Result<(), io::Error>; + ) -> Pin> + 'static + Send>>; /// Removes any data that had previously been persisted under the given `key`. /// /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily /// remove the given `key` at some point in time after the method returns, e.g., as part of an /// eventual batch deletion of multiple keys. As a consequence, subsequent calls to - /// [`KVStore::list`] might include the removed key until the changes are actually persisted. + /// [`KVStoreSync::list`] might include the removed key until the changes are actually persisted. /// /// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent /// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could @@ -154,7 +241,7 @@ pub trait KVStore { /// invokation or not. fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Result<(), io::Error>; + ) -> Pin> + 'static + Send>>; /// Returns a list of keys that are stored under the given `secondary_namespace` in /// `primary_namespace`. /// @@ -162,15 +249,15 @@ pub trait KVStore { /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown. fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Result, io::Error>; + ) -> Pin, io::Error>> + 'static + Send>>; } /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. -pub trait MigratableKVStore: KVStore { +pub trait MigratableKVStore: KVStoreSync { /// Returns *all* known keys as a list of `primary_namespace`, `secondary_namespace`, `key` tuples. /// - /// This is useful for migrating data from [`KVStore`] implementation to [`KVStore`] + /// This is useful for migrating data from [`KVStoreSync`] implementation to [`KVStoreSync`] /// implementation. /// /// Must exhaustively return all entries known to the store to ensure no data is missed, but @@ -199,62 +286,7 @@ pub fn migrate_kv_store_data( Ok(()) } -/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. -/// -/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager -pub trait Persister<'a, CM: Deref, L: Deref, S: Deref> -where - CM::Target: 'static + AChannelManager, - L::Target: 'static + Logger, - S::Target: WriteableScore<'a>, -{ - /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. - /// - /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager - fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>; - - /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>; - - /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed. - fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>; -} - -impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A -where - CM::Target: 'static + AChannelManager, - L::Target: 'static + Logger, - S::Target: WriteableScore<'a>, -{ - fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { - self.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.get_cm().encode(), - ) - } - - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { - self.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - ) - } - - fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { - self.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - ) - } -} - -impl Persist for K { +impl Persist for K { // TODO: We really need a way for the persister to inform the user that its time to crash/shut // down once these start returning failure. // Then we should return InProgress rather than UnrecoverableError, implying we should probably @@ -322,7 +354,7 @@ pub fn read_channel_monitors( kv_store: K, entropy_source: ES, signer_provider: SP, ) -> Result::EcdsaSigner>)>, io::Error> where - K::Target: KVStore, + K::Target: KVStoreSync, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, { @@ -367,7 +399,7 @@ where /// /// # Overview /// -/// The main benefit this provides over the [`KVStore`]'s [`Persist`] implementation is decreased +/// The main benefit this provides over the [`KVStoreSync`]'s [`Persist`] implementation is decreased /// I/O bandwidth and storage churn, at the expense of more IOPS (including listing, reading, and /// deleting) and complexity. This is because it writes channel monitor differential updates, /// whereas the other (default) implementation rewrites the entire monitor on each update. For @@ -375,7 +407,7 @@ where /// of megabytes (or more). Updates can be as small as a few hundred bytes. /// /// Note that monitors written with `MonitorUpdatingPersister` are _not_ backward-compatible with -/// the default [`KVStore`]'s [`Persist`] implementation. They have a prepended byte sequence, +/// the default [`KVStoreSync`]'s [`Persist`] implementation. They have a prepended byte sequence, /// [`MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL`], applied to prevent deserialization with other /// persisters. This is because monitors written by this struct _may_ have unapplied updates. In /// order to downgrade, you must ensure that all updates are applied to the monitor, and remove the @@ -427,7 +459,7 @@ where /// /// ## EXTREMELY IMPORTANT /// -/// It is extremely important that your [`KVStore::read`] implementation uses the +/// It is extremely important that your [`KVStoreSync::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in /// that circumstance (not when there is really a permissions error, for example). This is because /// neither channel monitor reading function lists updates. Instead, either reads the monitor, and @@ -439,7 +471,7 @@ where /// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`. /// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates` /// are deleted. -/// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions +/// The `lazy` flag is used on the [`KVStoreSync::remove`] method, so there are no guarantees that the deletions /// will complete. However, stale updates are not a problem for data integrity, since updates are /// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`. /// @@ -448,7 +480,7 @@ where /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. pub struct MonitorUpdatingPersister where - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, @@ -468,7 +500,7 @@ where impl MonitorUpdatingPersister where - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, @@ -508,7 +540,7 @@ where /// Reads all stored channel monitors, along with any stored updates for them. /// - /// It is extremely important that your [`KVStore::read`] implementation uses the + /// It is extremely important that your [`KVStoreSync::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// documentation for [`MonitorUpdatingPersister`]. pub fn read_all_channel_monitors_with_updates( @@ -530,7 +562,7 @@ where /// Read a single channel monitor, along with any stored updates for it. /// - /// It is extremely important that your [`KVStore::read`] implementation uses the + /// It is extremely important that your [`KVStoreSync::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// documentation for [`MonitorUpdatingPersister`]. /// @@ -657,7 +689,7 @@ where /// This function works by first listing all monitors, and then for each of them, listing all /// updates. The updates that have an `update_id` less than or equal to than the stored monitor /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will - /// be passed to [`KVStore::remove`]. + /// be passed to [`KVStoreSync::remove`]. pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { let monitor_keys = self.kv_store.list( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -696,7 +728,7 @@ impl< FE: Deref, > Persist for MonitorUpdatingPersister where - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, @@ -704,7 +736,7 @@ where FE::Target: FeeEstimator, { /// Persists a new channel. This means writing the entire monitor to the - /// parametrized [`KVStore`]. + /// parametrized [`KVStoreSync`]. fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { @@ -737,11 +769,11 @@ where } } - /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible. + /// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible. /// /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: /// - /// - No full monitor is found in [`KVStore`] + /// - No full monitor is found in [`KVStoreSync`] /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] /// - LDK commands re-persisting the entire monitor through this function, specifically when /// `update` is `None`. @@ -851,7 +883,7 @@ impl MonitorUpdatingPersister where ES::Target: EntropySource + Sized, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, @@ -932,7 +964,7 @@ pub enum MonitorName { } impl MonitorName { - /// Attempts to construct a `MonitorName` from a storage key returned by [`KVStore::list`]. + /// Attempts to construct a `MonitorName` from a storage key returned by [`KVStoreSync::list`]. /// /// This is useful when you need to reconstruct the original data the key represents. fn from_str(monitor_key: &str) -> Result { @@ -1471,7 +1503,7 @@ mod tests { #[test] fn kvstore_trait_object_usage() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = Arc::new(TestStore::new(false)); assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store))); } } diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 10558b3fdea..bec72112c69 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -5,7 +5,7 @@ // licenses. //! This module contains an [`OutputSweeper`] utility that keeps track of -//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries +//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStoreSync`] and regularly retries //! sweeping them. use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; @@ -23,8 +23,8 @@ use crate::sync::Arc; use crate::sync::Mutex; use crate::util::logger::Logger; use crate::util::persist::{ - KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStore, KVStoreSync, KVStoreSyncWrapper, OUTPUT_SWEEPER_PERSISTENCE_KEY, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::{impl_writeable_tlv_based, log_debug, log_error}; @@ -36,6 +36,7 @@ use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid}; use core::future::Future; use core::ops::Deref; +use core::pin::Pin; use core::sync::atomic::{AtomicBool, Ordering}; use core::task; @@ -328,7 +329,7 @@ impl_writeable_tlv_based_enum!(OutputSpendStatus, ); /// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given -/// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor +/// [`KVStoreSync`] and regularly retries sweeping them based on a callback given to the constructor /// methods. /// /// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`]. @@ -362,6 +363,47 @@ where logger: L, } +impl + OutputSweeper, L, O> +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Send + Sync, + K::Target: KVStoreSync, + L::Target: Logger, + O::Target: OutputSpender, +{ + /// Constructs a new [`OutputSweeper`] based on a [`KVStoreSync`]. + pub fn new_with_kv_store_sync( + best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option, + output_spender: O, change_destination_source: D, kv_store_sync: K, logger: L, + ) -> Self { + let kv_store = KVStoreSyncWrapper(kv_store_sync); + + Self::new( + best_block, + broadcaster, + fee_estimator, + chain_data_source, + output_spender, + change_destination_source, + kv_store, + logger, + ) + } + + /// Reads an [`OutputSweeper`] from the given reader and returns it with a synchronous [`KVStoreSync`]. + pub fn read_with_kv_store_sync( + reader: &mut R, args: (B, E, Option, O, D, K, L), + ) -> Result { + let kv_store = KVStoreSyncWrapper(args.5); + let args = (args.0, args.1, args.2, args.3, args.4, kv_store, args.6); + + Self::read(reader, args) + } +} + impl OutputSweeper where @@ -411,7 +453,7 @@ where /// Returns `Err` on persistence failure, in which case the call may be safely retried. /// /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs - pub fn track_spendable_outputs( + pub async fn track_spendable_outputs( &self, output_descriptors: Vec, channel_id: Option, exclude_static_outputs: bool, delay_until_height: Option, ) -> Result<(), ()> { @@ -427,29 +469,32 @@ where return Ok(()); } - let mut state_lock = self.sweeper_state.lock().unwrap(); - for descriptor in relevant_descriptors { - let output_info = TrackedSpendableOutput { - descriptor, - channel_id, - status: OutputSpendStatus::PendingInitialBroadcast { - delayed_until_height: delay_until_height, - }, - }; - - if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some() - { - continue; - } + self.update_state(|state_lock| -> Result<((), bool), ()> { + for descriptor in relevant_descriptors { + let output_info = TrackedSpendableOutput { + descriptor, + channel_id, + status: OutputSpendStatus::PendingInitialBroadcast { + delayed_until_height: delay_until_height, + }, + }; - state_lock.outputs.push(output_info); - } - self.persist_state(&*state_lock).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; - state_lock.dirty = false; + if state_lock + .outputs + .iter() + .find(|o| o.descriptor == output_info.descriptor) + .is_some() + { + continue; + } - Ok(()) + state_lock.outputs.push(output_info); + state_lock.dirty = true; + } + + Ok(((), false)) + }) + .await } /// Returns a list of the currently tracked spendable outputs. @@ -505,22 +550,18 @@ where }; // See if there is anything to sweep before requesting a change address. - { - let mut sweeper_state = self.sweeper_state.lock().unwrap(); - - let cur_height = sweeper_state.best_block.height; - let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height)); - if !has_respends { - // If there is nothing to sweep, we still persist the state if it is dirty. - if sweeper_state.dirty { - self.persist_state(&sweeper_state).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; - sweeper_state.dirty = false; - } + let has_respends = self + .update_state(|sweeper_state| -> Result<(bool, bool), ()> { + let cur_height = sweeper_state.best_block.height; + let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height)); - return Ok(()); - } + // If there are respends, we can postpone persisting a potentially dirty state until after the sweep. + Ok((has_respends, has_respends)) + }) + .await?; + + if !has_respends { + return Ok(()); } // Request a new change address outside of the mutex to avoid the mutex crossing await. @@ -528,62 +569,59 @@ where self.change_destination_source.get_change_destination_script().await?; // Sweep the outputs. - { - let mut sweeper_state = self.sweeper_state.lock().unwrap(); - - let cur_height = sweeper_state.best_block.height; - let cur_hash = sweeper_state.best_block.block_hash; - - let respend_descriptors: Vec<&SpendableOutputDescriptor> = sweeper_state - .outputs - .iter() - .filter(|o| filter_fn(*o, cur_height)) - .map(|o| &o.descriptor) - .collect(); - - if respend_descriptors.is_empty() { - // It could be that a tx confirmed and there is now nothing to sweep anymore. We still persist the state - // if it is dirty. - if sweeper_state.dirty { - self.persist_state(&sweeper_state).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; - sweeper_state.dirty = false; - } - - return Ok(()); - } + let spending_tx = self + .update_state(|sweeper_state| -> Result<(Option, bool), ()> { + let cur_height = sweeper_state.best_block.height; + let cur_hash = sweeper_state.best_block.block_hash; + + let respend_descriptors: Vec<&SpendableOutputDescriptor> = sweeper_state + .outputs + .iter() + .filter(|o| filter_fn(*o, cur_height)) + .map(|o| &o.descriptor) + .collect(); + + // Generate the spending transaction and broadcast it. + if !respend_descriptors.is_empty() { + let spending_tx = self + .spend_outputs( + &sweeper_state, + &respend_descriptors, + change_destination_script, + ) + .map_err(|e| { + log_error!(self.logger, "Error spending outputs: {:?}", e); + })?; + + log_debug!( + self.logger, + "Generating and broadcasting sweeping transaction {}", + spending_tx.compute_txid() + ); - let spending_tx = self - .spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script) - .map_err(|e| { - log_error!(self.logger, "Error spending outputs: {:?}", e); - })?; - - log_debug!( - self.logger, - "Generating and broadcasting sweeping transaction {}", - spending_tx.compute_txid() - ); - - // As we didn't modify the state so far, the same filter_fn yields the same elements as - // above. - let respend_outputs = - sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); - for output_info in respend_outputs { - if let Some(filter) = self.chain_data_source.as_ref() { - let watched_output = output_info.to_watched_output(cur_hash); - filter.register_output(watched_output); + // As we didn't modify the state so far, the same filter_fn yields the same elements as + // above. + let respend_outputs = + sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); + for output_info in respend_outputs { + if let Some(filter) = self.chain_data_source.as_ref() { + let watched_output = output_info.to_watched_output(cur_hash); + filter.register_output(watched_output); + } + + output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); + sweeper_state.dirty = true; + } + + Ok((Some(spending_tx), false)) + } else { + Ok((None, false)) } + }) + .await?; - output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); - } - - self.persist_state(&sweeper_state).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; - sweeper_state.dirty = false; - + // Persistence completely successfully. If we have a spending transaction, we broadcast it. + if let Some(spending_tx) = spending_tx { self.broadcaster.broadcast_transactions(&[&spending_tx]); } @@ -612,25 +650,45 @@ where sweeper_state.dirty = true; } - fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { - self.kv_store - .write( - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - &sweeper_state.encode(), - ) - .map_err(|e| { - log_error!( - self.logger, - "Write for key {}/{}/{} failed due to: {}", - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - e - ); - e - }) + fn persist_state<'a>( + &self, sweeper_state: &SweeperState, + ) -> Pin> + 'a + Send>> { + let encoded = sweeper_state.encode(); + + self.kv_store.write( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + &encoded, + ) + } + + /// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty, + /// unless skip_persist is true. Returning true for skip_persist allows the callback to postpone persisting a + /// potentially dirty state. + async fn update_state( + &self, callback: impl FnOnce(&mut SweeperState) -> Result<(X, bool), ()>, + ) -> Result { + let (fut, res) = { + let mut state_lock = self.sweeper_state.lock().unwrap(); + + let (res, skip_persist) = callback(&mut state_lock)?; + if !state_lock.dirty || skip_persist { + return Ok(res); + } + + state_lock.dirty = false; + + (self.persist_state(&state_lock), res) + }; + + fut.await.map_err(|e| { + self.sweeper_state.lock().unwrap().dirty = true; + + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + })?; + + Ok(res) } fn spend_outputs( @@ -929,11 +987,21 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { - sweeper: Arc>, E, F, K, L, O>>, + sweeper: Arc< + OutputSweeper< + B, + Arc>, + E, + F, + Arc>, + L, + O, + >, + >, } impl @@ -943,7 +1011,7 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -955,6 +1023,8 @@ where let change_destination_source = Arc::new(ChangeDestinationSourceSyncWrapper::new(change_destination_source)); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store)); + let sweeper = OutputSweeper::new( best_block, broadcaster, @@ -983,17 +1053,26 @@ where } } - /// Tells the sweeper to track the given outputs descriptors. Wraps [`OutputSweeper::track_spendable_outputs`]. + /// Wrapper around [`OutputSweeper::track_spendable_outputs`]. pub fn track_spendable_outputs( &self, output_descriptors: Vec, channel_id: Option, exclude_static_outputs: bool, delay_until_height: Option, ) -> Result<(), ()> { - self.sweeper.track_spendable_outputs( + let mut fut = Box::pin(self.sweeper.track_spendable_outputs( output_descriptors, channel_id, exclude_static_outputs, delay_until_height, - ) + )); + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + // In a sync context, we can't wait for the future to complete. + unreachable!("OutputSweeper::track_spendable_outputs should not be pending in a sync context"); + }, + } } /// Returns a list of the currently tracked spendable outputs. Wraps [`OutputSweeper::tracked_spendable_outputs`]. @@ -1005,7 +1084,17 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn sweeper_async( &self, - ) -> Arc>, E, F, K, L, O>> { + ) -> Arc< + OutputSweeper< + B, + Arc>, + E, + F, + Arc>, + L, + O, + >, + > { Arc::clone(&self.sweeper) } } @@ -1017,7 +1106,7 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 4165afea767..c04af39d481 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -57,7 +57,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; -use crate::util::persist::{KVStore, MonitorName}; +use crate::util::persist::{KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; @@ -806,7 +806,7 @@ impl TestStore { } } -impl KVStore for TestStore { +impl KVStoreSync for TestStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { diff --git a/pending_changelog/3905-async-background-persistence.txt b/pending_changelog/3905-async-background-persistence.txt new file mode 100644 index 00000000000..caa16d34895 --- /dev/null +++ b/pending_changelog/3905-async-background-persistence.txt @@ -0,0 +1,9 @@ +## API Updates (0.2) + +* The `Persister` trait has been removed, and `KVStore` is now used directly. If you're persisting `ChannelManager`, +`NetworkGraph`, or `Scorer` to a custom location, you can maintain that behavior by intercepting and rewriting the +corresponding namespaces and keys. + +* The `KVStore` trait has been updated to be asynchronous, while the original synchronous version is now available as +`KVStoreSync`. For channel persistence, `KVStoreSync` is still mandatory. However, for background persistence, an +asynchronous `KVStore` can be provided optionally.