From 54e22f9f533c60ee58cae91b4a83be26814e38fb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 7 Sep 2025 19:05:33 +0000 Subject: [PATCH 01/21] Correct `maximum_pending_updates` of 0 in MonitorUpdatingPersister Though users maybe shouldn't use `MonitorUpdatingPersister` if they don't actually want to persist `ChannelMonitorUpdate`s, we also shouldn't panic if `maximum_pending_updates` is set to zero. --- lightning/src/util/persist.rs | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 6b2ceaf4c34..b735e98cde4 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -796,6 +796,7 @@ where const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; if let Some(update) = update { let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID + && self.maximum_pending_updates != 0 && update.update_id % self.maximum_pending_updates != 0; if persist_update { let monitor_key = monitor_name.to_string(); @@ -1188,12 +1189,9 @@ mod tests { } // Exercise the `MonitorUpdatingPersister` with real channels and payments. - #[test] - fn persister_with_real_monitors() { - // This value is used later to limit how many iterations we perform. - let persister_0_max_pending_updates = 7; - // Intentionally set this to a smaller value to test a different alignment. - let persister_1_max_pending_updates = 3; + fn do_persister_with_real_monitors(persisters_max_pending_updates: (u64, u64)) { + let persister_0_max_pending_updates = persisters_max_pending_updates.0; + let persister_1_max_pending_updates = persisters_max_pending_updates.1; let chanmon_cfgs = create_chanmon_cfgs(4); let persister_0 = MonitorUpdatingPersister { kv_store: &TestStore::new(false), @@ -1256,6 +1254,11 @@ mod tests { assert_eq!(mon.get_latest_update_id(), $expected_update_id); let monitor_name = mon.persistence_key(); + let expected_updates = if persister_0_max_pending_updates == 0 { + 0 + } else { + mon.get_latest_update_id() % persister_0_max_pending_updates + }; assert_eq!( persister_0 .kv_store @@ -1265,7 +1268,7 @@ mod tests { ) .unwrap() .len() as u64, - mon.get_latest_update_id() % persister_0_max_pending_updates, + expected_updates, "Wrong number of updates stored in persister 0", ); } @@ -1275,6 +1278,11 @@ mod tests { for (_, mon) in persisted_chan_data_1.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); let monitor_name = mon.persistence_key(); + let expected_updates = if persister_1_max_pending_updates == 0 { + 0 + } else { + mon.get_latest_update_id() % persister_1_max_pending_updates + }; assert_eq!( persister_1 .kv_store @@ -1284,7 +1292,7 @@ mod tests { ) .unwrap() .len() as u64, - mon.get_latest_update_id() % persister_1_max_pending_updates, + expected_updates, "Wrong number of updates stored in persister 1", ); } @@ -1348,10 +1356,18 @@ mod tests { // Make sure everything is persisted as expected after close. check_persisted_data!( - persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1 + cmp::max(2, persister_0_max_pending_updates * 2) * EXPECTED_UPDATES_PER_PAYMENT + 1 ); } + #[test] + fn persister_with_real_monitors() { + // Test various alignments + do_persister_with_real_monitors((7, 3)); + do_persister_with_real_monitors((0, 1)); + do_persister_with_real_monitors((4, 2)); + } + // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a // monitor or update with it results in the persister returning an UnrecoverableError status. #[test] From 5206829d5d0173e03acb3d3827e44a7017287538 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 7 Sep 2025 23:17:32 +0000 Subject: [PATCH 02/21] Use public `MonitorUpdatingPersister` API in tests In the coming commits `MonitorUpdatingPersister`'s internal state will be reworked. To avoid spurious test diff, we instead use the public API of `MonitorUpdatingPersister` rather than internal bits in tests. --- lightning/src/util/persist.rs | 123 +++++++++++++++++----------------- 1 file changed, 62 insertions(+), 61 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index b735e98cde4..d882b1df4da 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -1112,7 +1112,7 @@ mod tests { use crate::ln::msgs::BaseMessageHandler; use crate::sync::Arc; use crate::util::test_channel_signer::TestChannelSigner; - use crate::util::test_utils::{self, TestLogger, TestStore}; + use crate::util::test_utils::{self, TestStore}; use crate::{check_added_monitors, check_closed_broadcast}; use bitcoin::hashes::hex::FromHex; @@ -1193,24 +1193,26 @@ mod tests { let persister_0_max_pending_updates = persisters_max_pending_updates.0; let persister_1_max_pending_updates = persisters_max_pending_updates.1; let chanmon_cfgs = create_chanmon_cfgs(4); - let persister_0 = MonitorUpdatingPersister { - kv_store: &TestStore::new(false), - logger: &TestLogger::new(), - maximum_pending_updates: persister_0_max_pending_updates, - entropy_source: &chanmon_cfgs[0].keys_manager, - signer_provider: &chanmon_cfgs[0].keys_manager, - broadcaster: &chanmon_cfgs[0].tx_broadcaster, - fee_estimator: &chanmon_cfgs[0].fee_estimator, - }; - let persister_1 = MonitorUpdatingPersister { - kv_store: &TestStore::new(false), - logger: &TestLogger::new(), - maximum_pending_updates: persister_1_max_pending_updates, - entropy_source: &chanmon_cfgs[1].keys_manager, - signer_provider: &chanmon_cfgs[1].keys_manager, - broadcaster: &chanmon_cfgs[1].tx_broadcaster, - fee_estimator: &chanmon_cfgs[1].fee_estimator, - }; + let kv_store_0 = TestStore::new(false); + let persister_0 = MonitorUpdatingPersister::new( + &kv_store_0, + &chanmon_cfgs[0].logger, + persister_0_max_pending_updates, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].fee_estimator, + ); + let kv_store_1 = TestStore::new(false); + let persister_1 = MonitorUpdatingPersister::new( + &kv_store_1, + &chanmon_cfgs[1].logger, + persister_1_max_pending_updates, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].tx_broadcaster, + &chanmon_cfgs[1].fee_estimator, + ); let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new( Some(&chanmon_cfgs[0].chain_source), @@ -1260,8 +1262,7 @@ mod tests { mon.get_latest_update_id() % persister_0_max_pending_updates }; assert_eq!( - persister_0 - .kv_store + kv_store_0 .list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, &monitor_name.to_string() @@ -1284,8 +1285,7 @@ mod tests { mon.get_latest_update_id() % persister_1_max_pending_updates }; assert_eq!( - persister_1 - .kv_store + kv_store_1 .list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, &monitor_name.to_string() @@ -1395,15 +1395,16 @@ mod tests { let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap(); let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0]; - let ro_persister = MonitorUpdatingPersister { - kv_store: &TestStore::new(true), - logger: &TestLogger::new(), - maximum_pending_updates: 11, - entropy_source: node_cfgs[0].keys_manager, - signer_provider: node_cfgs[0].keys_manager, - broadcaster: node_cfgs[0].tx_broadcaster, - fee_estimator: node_cfgs[0].fee_estimator, - }; + let store = TestStore::new(true); + let ro_persister = MonitorUpdatingPersister::new( + &store, + node_cfgs[0].logger, + 11, + node_cfgs[0].keys_manager, + node_cfgs[0].keys_manager, + node_cfgs[0].tx_broadcaster, + node_cfgs[0].fee_estimator, + ); let monitor_name = added_monitors[0].1.persistence_key(); match ro_persister.persist_new_channel(monitor_name, &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => { @@ -1441,24 +1442,26 @@ mod tests { fn clean_stale_updates_works() { let test_max_pending_updates = 7; let chanmon_cfgs = create_chanmon_cfgs(3); - let persister_0 = MonitorUpdatingPersister { - kv_store: &TestStore::new(false), - logger: &TestLogger::new(), - maximum_pending_updates: test_max_pending_updates, - entropy_source: &chanmon_cfgs[0].keys_manager, - signer_provider: &chanmon_cfgs[0].keys_manager, - broadcaster: &chanmon_cfgs[0].tx_broadcaster, - fee_estimator: &chanmon_cfgs[0].fee_estimator, - }; - let persister_1 = MonitorUpdatingPersister { - kv_store: &TestStore::new(false), - logger: &TestLogger::new(), - maximum_pending_updates: test_max_pending_updates, - entropy_source: &chanmon_cfgs[1].keys_manager, - signer_provider: &chanmon_cfgs[1].keys_manager, - broadcaster: &chanmon_cfgs[1].tx_broadcaster, - fee_estimator: &chanmon_cfgs[1].fee_estimator, - }; + let kv_store_0 = TestStore::new(false); + let persister_0 = MonitorUpdatingPersister::new( + &kv_store_0, + &chanmon_cfgs[0].logger, + test_max_pending_updates, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].keys_manager, + &chanmon_cfgs[0].tx_broadcaster, + &chanmon_cfgs[0].fee_estimator, + ); + let kv_store_1 = TestStore::new(false); + let persister_1 = MonitorUpdatingPersister::new( + &kv_store_1, + &chanmon_cfgs[1].logger, + test_max_pending_updates, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].keys_manager, + &chanmon_cfgs[1].tx_broadcaster, + &chanmon_cfgs[1].fee_estimator, + ); let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new( Some(&chanmon_cfgs[0].chain_source), @@ -1497,22 +1500,20 @@ mod tests { let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap(); let (_, monitor) = &persisted_chan_data[0]; let monitor_name = monitor.persistence_key(); - persister_0 - .kv_store - .write( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - &monitor_name.to_string(), - UpdateName::from(1).as_str(), - vec![0u8; 1], - ) - .unwrap(); + KVStoreSync::write( + &kv_store_0, + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_name.to_string(), + UpdateName::from(1).as_str(), + vec![0u8; 1], + ) + .unwrap(); // Do the stale update cleanup persister_0.cleanup_stale_updates(false).unwrap(); // Confirm the stale update is unreadable/gone - assert!(persister_0 - .kv_store + assert!(kv_store_0 .read( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, &monitor_name.to_string(), From 3fd1a76d0b3a67a8ab7d51113e43a90c39dd6625 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Sep 2025 00:06:23 +0000 Subject: [PATCH 03/21] Support `maximum_pending_updates` = 0 in `MonitorUpdatingPersister` In the coming commits, we'll use the `MonitorUpdatingPersister` as *the* way to do async monitor updating in the `ChainMonitor`. However, to support folks who don't actually want a `MonitorUpdatingPersister` in that case, we explicitly support them setting `maximum_pending_updates` to 0, disabling all of the update-writing behavior. --- lightning/src/util/persist.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index d882b1df4da..5fafb582d52 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -534,6 +534,10 @@ where /// less frequent "waves." /// - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run /// [`MonitorUpdatingPersister::cleanup_stale_updates`]. + /// + /// Note that you can disable the update-writing entirely by setting `maximum_pending_updates` + /// to zero, causing this [`Persist`] implementation to behave like the blanket [`Persist`] + /// implementation for all [`KVStoreSync`]s. pub fn new( kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE, @@ -757,7 +761,12 @@ where let mut monitor_bytes = Vec::with_capacity( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), ); - monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); + // If `maximum_pending_updates` is zero, we aren't actually writing monitor updates at all. + // Thus, there's no need to add the sentinel prefix as the monitor can be read directly + // from disk without issue. + if self.maximum_pending_updates != 0 { + monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); + } monitor.write(&mut monitor_bytes).unwrap(); match self.kv_store.write( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, From c90374973614197cb9b673fb42916986a14544fc Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Sep 2025 00:11:34 +0000 Subject: [PATCH 04/21] Migrate `MonitorUpdatingPersister` to an async + async-sync wrapper As we've done with several other structs, this adds an async variant of `MonitorUpdatingPersister` and adds an async-sync wrapper for those using `KVStoreSync`. Unlike with other structs, we leave `MonitorUpdatingPersister` as the sync variant and make the new async logic a `MonitorUpdatingPersisterAsync` as the async monitor updating flow is still considered beta. This does not yet expose the async monitor updating logic anywhere, as doing a standard `Persist` async variant would not work for ensuring the `ChannelManager` and `ChainMonitor` don't block on async writes or suddenly require a runtime. --- lightning/src/util/persist.rs | 395 +++++++++++++++++++++++----------- 1 file changed, 274 insertions(+), 121 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 5fafb582d52..ba37ca426ca 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -11,13 +11,17 @@ //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager //! [`NetworkGraph`]: crate::routing::gossip::NetworkGraph +use alloc::sync::Arc; + use bitcoin::hashes::hex::FromHex; use bitcoin::{BlockHash, Txid}; + use core::cmp; use core::future::Future; use core::ops::Deref; use core::pin::Pin; use core::str::FromStr; +use core::task; use crate::prelude::*; use crate::{io, log_error}; @@ -29,6 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; +use crate::util::async_poll::dummy_waker; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; @@ -405,6 +410,19 @@ where Ok(res) } +fn poll_sync_future(future: F) -> F::Output { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + // TODO A future MSRV bump to 1.68 should allow for the pin macro + match Pin::new(&mut Box::pin(future)).poll(&mut ctx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + // In a sync context, we can't wait for the future to complete. + unreachable!("Sync KVStore-derived futures can not be pending in a sync context"); + }, + } +} + /// Implements [`Persist`] in a way that writes and reads both [`ChannelMonitor`]s and /// [`ChannelMonitorUpdate`]s. /// @@ -490,24 +508,15 @@ where /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. pub struct MonitorUpdatingPersister +(MonitorUpdatingPersisterAsync, L, ES, SP, BI, FE>) where K::Target: KVStoreSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator, -{ - kv_store: K, - logger: L, - maximum_pending_updates: u64, - entropy_source: ES, - signer_provider: SP, - broadcaster: BI, - fee_estimator: FE, -} + FE::Target: FeeEstimator; -#[allow(dead_code)] impl MonitorUpdatingPersister where @@ -542,15 +551,15 @@ where kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE, ) -> Self { - MonitorUpdatingPersister { - kv_store, + MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new( + KVStoreSyncWrapper(kv_store), logger, maximum_pending_updates, entropy_source, signer_provider, broadcaster, fee_estimator, - } + )) } /// Reads all stored channel monitors, along with any stored updates for them. @@ -564,13 +573,198 @@ where Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { - let monitor_list = self.kv_store.list( + poll_sync_future(self.0.read_all_channel_monitors_with_updates()) + } + + /// Read a single channel monitor, along with any stored updates for it. + /// + /// It is extremely important that your [`KVStoreSync::read`] implementation uses the + /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the + /// documentation for [`MonitorUpdatingPersister`]. + /// + /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an + /// underscore `_` between txid and index for v1 channels. For example, given: + /// + /// - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef` + /// - Index: `1` + /// + /// The correct `monitor_key` would be: + /// `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1` + /// + /// For v2 channels, the hex-encoded [`ChannelId`] is used directly for `monitor_key` instead. + /// + /// Loading a large number of monitors will be faster if done in parallel. You can use this + /// function to accomplish this. Take care to limit the number of parallel readers. + pub fn read_channel_monitor_with_updates( + &self, monitor_key: &str, + ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + { + poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key)) + } + + /// Cleans up stale updates for all monitors. + /// + /// This function works by first listing all monitors, and then for each of them, listing all + /// updates. The updates that have an `update_id` less than or equal to than the stored monitor + /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will + /// be passed to [`KVStoreSync::remove`]. + pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { + poll_sync_future(self.0.cleanup_stale_updates(lazy)) + } +} + +impl< + ChannelSigner: EcdsaChannelSigner, + K: Deref, + L: Deref, + ES: Deref, + SP: Deref, + BI: Deref, + FE: Deref, + > Persist for MonitorUpdatingPersister +where + K::Target: KVStoreSync, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, +{ + /// Persists a new channel. This means writing the entire monitor to the + /// parametrized [`KVStoreSync`]. + fn persist_new_channel( + &self, monitor_name: MonitorName, monitor: &ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let res = poll_sync_future(self.0.0.persist_new_channel(monitor_name, monitor)); + match res { + Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, + Err(e) => { + log_error!( + self.0.0.logger, + "Failed to write ChannelMonitor {}/{}/{} reason: {}", + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_name, + e + ); + chain::ChannelMonitorUpdateStatus::UnrecoverableError + } + } + } + + /// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible. + /// + /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: + /// + /// - No full monitor is found in [`KVStoreSync`] + /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] + /// - LDK commands re-persisting the entire monitor through this function, specifically when + /// `update` is `None`. + /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. + fn update_persisted_channel( + &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, + monitor: &ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let res = poll_sync_future(self.0.0.update_persisted_channel(monitor_name, update, monitor)); + match res { + Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, + Err(e) => { + log_error!( + self.0.0.logger, + "Failed to write ChannelMonitorUpdate {} id {} reason: {}", + monitor_name, + update.as_ref().map(|upd| upd.update_id).unwrap_or(0), + e + ); + chain::ChannelMonitorUpdateStatus::UnrecoverableError + }, + } + } + + fn archive_persisted_channel(&self, monitor_name: MonitorName) { + poll_sync_future(self.0.0.archive_persisted_channel(monitor_name)); + } +} + +/// A variant of the [`MonitorUpdatingPersister`] which utilizes the async [`KVStore`] and offers +/// async versions of the public accessors. +/// +/// Note that async monitor updating is considered beta, and bugs may be triggered by its use. +pub struct MonitorUpdatingPersisterAsync +(Arc>) +where + K::Target: KVStore, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator; + +struct MonitorUpdatingPersisterAsyncInner +where + K::Target: KVStore, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, +{ + kv_store: K, + logger: L, + maximum_pending_updates: u64, + entropy_source: ES, + signer_provider: SP, + broadcaster: BI, + fee_estimator: FE, +} + +impl + MonitorUpdatingPersisterAsync +where + K::Target: KVStore, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, +{ + /// Constructs a new [`MonitorUpdatingPersisterAsync`]. + /// + /// See [`MonitorUpdatingPersister::new`] for more info. + pub fn new( + kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, + signer_provider: SP, broadcaster: BI, fee_estimator: FE, + ) -> Self { + MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner { + kv_store, + logger, + maximum_pending_updates, + entropy_source, + signer_provider, + broadcaster, + fee_estimator, + })) + } + + /// Reads all stored channel monitors, along with any stored updates for them. + /// + /// It is extremely important that your [`KVStore::read`] implementation uses the + /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the + /// documentation for [`MonitorUpdatingPersister`]. + pub async fn read_all_channel_monitors_with_updates( + &self, + ) -> Result< + Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + io::Error, + > { + let monitor_list = self.0.kv_store.list( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - )?; + ).await?; let mut res = Vec::with_capacity(monitor_list.len()); + // TODO: Parallelize this loop for monitor_key in monitor_list { - res.push(self.read_channel_monitor_with_updates(monitor_key.as_str())?) + res.push(self.read_channel_monitor_with_updates(monitor_key.as_str()).await?) } Ok(res) } @@ -594,20 +788,50 @@ where /// /// Loading a large number of monitors will be faster if done in parallel. You can use this /// function to accomplish this. Take care to limit the number of parallel readers. - pub fn read_channel_monitor_with_updates( + pub async fn read_channel_monitor_with_updates( + &self, monitor_key: &str, + ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + { + self.0.read_channel_monitor_with_updates(monitor_key).await + } + + /// Cleans up stale updates for all monitors. + /// + /// This function works by first listing all monitors, and then for each of them, listing all + /// updates. The updates that have an `update_id` less than or equal to than the stored monitor + /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will + /// be passed to [`KVStoreSync::remove`]. + pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { + self.0.cleanup_stale_updates(lazy).await + } +} + + +impl + MonitorUpdatingPersisterAsyncInner +where + K::Target: KVStore, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, +{ + pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { let monitor_name = MonitorName::from_str(monitor_key)?; - let (block_hash, monitor) = self.read_monitor(&monitor_name, monitor_key)?; + let (block_hash, monitor) = self.read_monitor(&monitor_name, monitor_key).await?; let mut current_update_id = monitor.get_latest_update_id(); + // TODO: Parallelize this loop by speculatively reading a batch of updates loop { current_update_id = match current_update_id.checked_add(1) { Some(next_update_id) => next_update_id, None => break, }; let update_name = UpdateName::from(current_update_id); - let update = match self.read_monitor_update(monitor_key, &update_name) { + let update = match self.read_monitor_update(monitor_key, &update_name).await { Ok(update) => update, Err(err) if err.kind() == io::ErrorKind::NotFound => { // We can't find any more updates, so we are done. @@ -633,7 +857,7 @@ where } /// Read a channel monitor. - fn read_monitor( + async fn read_monitor( &self, monitor_name: &MonitorName, monitor_key: &str, ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { @@ -641,7 +865,7 @@ where CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key, - )?); + ).await?); // Discard the sentinel bytes if found. if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) { monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64); @@ -678,14 +902,14 @@ where } /// Read a channel monitor update. - fn read_monitor_update( + async fn read_monitor_update( &self, monitor_key: &str, update_name: &UpdateName, ) -> Result { let update_bytes = self.kv_store.read( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key, update_name.as_str(), - )?; + ).await?; ChannelMonitorUpdate::read(&mut io::Cursor::new(update_bytes)).map_err(|e| { log_error!( self.logger, @@ -699,23 +923,18 @@ where }) } - /// Cleans up stale updates for all monitors. - /// - /// This function works by first listing all monitors, and then for each of them, listing all - /// updates. The updates that have an `update_id` less than or equal to than the stored monitor - /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will - /// be passed to [`KVStoreSync::remove`]. - pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { + async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { let monitor_keys = self.kv_store.list( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - )?; + ).await?; for monitor_key in monitor_keys { let monitor_name = MonitorName::from_str(&monitor_key)?; - let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key)?; + let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?; let updates = self .kv_store - .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str())?; + .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str()) + .await?; for update in updates { let update_name = UpdateName::new(update)?; // if the update_id is lower than the stored monitor, delete @@ -725,36 +944,16 @@ where monitor_key.as_str(), update_name.as_str(), lazy, - )?; + ).await?; } } } Ok(()) } -} -impl< - ChannelSigner: EcdsaChannelSigner, - K: Deref, - L: Deref, - ES: Deref, - SP: Deref, - BI: Deref, - FE: Deref, - > Persist for MonitorUpdatingPersister -where - K::Target: KVStoreSync, - L::Target: Logger, - ES::Target: EntropySource + Sized, - SP::Target: SignerProvider + Sized, - BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator, -{ - /// Persists a new channel. This means writing the entire monitor to the - /// parametrized [`KVStoreSync`]. - fn persist_new_channel( + async fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, - ) -> chain::ChannelMonitorUpdateStatus { + ) -> Result<(), io::Error> { // Determine the proper key for this monitor let monitor_key = monitor_name.to_string(); // Serialize and write the new monitor @@ -768,40 +967,18 @@ where monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); } monitor.write(&mut monitor_bytes).unwrap(); - match self.kv_store.write( + self.kv_store.write( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key.as_str(), monitor_bytes, - ) { - Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.logger, - "Failed to write ChannelMonitor {}/{}/{} reason: {}", - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + ).await } - /// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible. - /// - /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: - /// - /// - No full monitor is found in [`KVStoreSync`] - /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] - /// - LDK commands re-persisting the entire monitor through this function, specifically when - /// `update` is `None`. - /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. - fn update_persisted_channel( + async fn update_persisted_channel( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, - ) -> chain::ChannelMonitorUpdateStatus { + ) -> Result<(), io::Error> { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; if let Some(update) = update { let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID @@ -810,40 +987,27 @@ where if persist_update { let monitor_key = monitor_name.to_string(); let update_name = UpdateName::from(update.update_id); - match self.kv_store.write( + self.kv_store.write( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str(), update_name.as_str(), update.encode(), - ) { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.logger, - "Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}", - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key.as_str(), - update_name.as_str(), - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + ).await } else { // In case of channel-close monitor update, we need to read old monitor before persisting // the new one in order to determine the cleanup range. let maybe_old_monitor = match monitor.get_latest_update_id() { LEGACY_CLOSED_CHANNEL_UPDATE_ID => { let monitor_key = monitor_name.to_string(); - self.read_monitor(&monitor_name, &monitor_key).ok() + self.read_monitor(&monitor_name, &monitor_key).await.ok() }, _ => None, }; // We could write this update, but it meets criteria of our design that calls for a full monitor write. - let monitor_update_status = self.persist_new_channel(monitor_name, monitor); + let write_status = self.persist_new_channel(monitor_name, monitor).await; - if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status { + if let Ok(()) = write_status { let channel_closed_legacy = monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID; let cleanup_range = if channel_closed_legacy { @@ -864,21 +1028,21 @@ where }; if let Some((start, end)) = cleanup_range { - self.cleanup_in_range(monitor_name, start, end); + self.cleanup_in_range(monitor_name, start, end).await; } } - monitor_update_status + write_status } } else { // There is no update given, so we must persist a new monitor. - self.persist_new_channel(monitor_name, monitor) + self.persist_new_channel(monitor_name, monitor).await } } - fn archive_persisted_channel(&self, monitor_name: MonitorName) { + async fn archive_persisted_channel(&self, monitor_name: MonitorName) { let monitor_key = monitor_name.to_string(); - let monitor = match self.read_channel_monitor_with_updates(&monitor_key) { + let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await { Ok((_block_hash, monitor)) => monitor, Err(_) => return, }; @@ -887,7 +1051,7 @@ where ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key.as_str(), monitor.encode(), - ) { + ).await { Ok(()) => {}, Err(_e) => return, }; @@ -896,22 +1060,11 @@ where CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, monitor_key.as_str(), true, - ); + ).await; } -} -impl - MonitorUpdatingPersister -where - ES::Target: EntropySource + Sized, - K::Target: KVStoreSync, - L::Target: Logger, - SP::Target: SignerProvider + Sized, - BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator, -{ // Cleans up monitor updates for given monitor in range `start..=end`. - fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { + async fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { let monitor_key = monitor_name.to_string(); for update_id in start..=end { let update_name = UpdateName::from(update_id); @@ -920,7 +1073,7 @@ where monitor_key.as_str(), update_name.as_str(), true, - ) { + ).await { log_error!( self.logger, "Failed to clean up channel monitor updates for monitor {}, reason: {}", From b8ad37186d6f23c7006cc593bdb54b1b98d7072f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:27:58 +0000 Subject: [PATCH 05/21] f rustfmt --- lightning/src/util/persist.rs | 39 +++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index ba37ca426ca..75f0750cd70 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -507,8 +507,9 @@ fn poll_sync_future(future: F) -> F::Output { /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. -pub struct MonitorUpdatingPersister -(MonitorUpdatingPersisterAsync, L, ES, SP, BI, FE>) +pub struct MonitorUpdatingPersister( + MonitorUpdatingPersisterAsync, L, ES, SP, BI, FE>, +) where K::Target: KVStoreSync, L::Target: Logger, @@ -635,12 +636,12 @@ where fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let res = poll_sync_future(self.0.0.persist_new_channel(monitor_name, monitor)); + let res = poll_sync_future(self.0 .0.persist_new_channel(monitor_name, monitor)); match res { Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, Err(e) => { log_error!( - self.0.0.logger, + self.0 .0.logger, "Failed to write ChannelMonitor {}/{}/{} reason: {}", CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, @@ -648,7 +649,7 @@ where e ); chain::ChannelMonitorUpdateStatus::UnrecoverableError - } + }, } } @@ -665,12 +666,13 @@ where &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let res = poll_sync_future(self.0.0.update_persisted_channel(monitor_name, update, monitor)); + let res = + poll_sync_future(self.0 .0.update_persisted_channel(monitor_name, update, monitor)); match res { Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, Err(e) => { log_error!( - self.0.0.logger, + self.0 .0.logger, "Failed to write ChannelMonitorUpdate {} id {} reason: {}", monitor_name, update.as_ref().map(|upd| upd.update_id).unwrap_or(0), @@ -682,7 +684,7 @@ where } fn archive_persisted_channel(&self, monitor_name: MonitorName) { - poll_sync_future(self.0.0.archive_persisted_channel(monitor_name)); + poll_sync_future(self.0 .0.archive_persisted_channel(monitor_name)); } } @@ -690,8 +692,14 @@ where /// async versions of the public accessors. /// /// Note that async monitor updating is considered beta, and bugs may be triggered by its use. -pub struct MonitorUpdatingPersisterAsync -(Arc>) +pub struct MonitorUpdatingPersisterAsync< + K: Deref, + L: Deref, + ES: Deref, + SP: Deref, + BI: Deref, + FE: Deref, +>(Arc>) where K::Target: KVStore, L::Target: Logger, @@ -700,8 +708,14 @@ where BI::Target: BroadcasterInterface, FE::Target: FeeEstimator; -struct MonitorUpdatingPersisterAsyncInner -where +struct MonitorUpdatingPersisterAsyncInner< + K: Deref, + L: Deref, + ES: Deref, + SP: Deref, + BI: Deref, + FE: Deref, +> where K::Target: KVStore, L::Target: Logger, ES::Target: EntropySource + Sized, @@ -806,7 +820,6 @@ where } } - impl MonitorUpdatingPersisterAsyncInner where From d037526bc1243aed97740be4f55e1bd32041b6d9 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:28:04 +0000 Subject: [PATCH 06/21] Clean up and rustfmt `persist.rs` --- lightning/src/util/persist.rs | 89 ++++++++++++----------------------- 1 file changed, 30 insertions(+), 59 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 75f0750cd70..00c09ba1a6b 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -771,10 +771,9 @@ where Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, io::Error, > { - let monitor_list = self.0.kv_store.list( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - ).await?; + let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; + let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; + let monitor_list = self.0.kv_store.list(primary, secondary).await?; let mut res = Vec::with_capacity(monitor_list.len()); // TODO: Parallelize this loop for monitor_key in monitor_list { @@ -874,11 +873,10 @@ where &self, monitor_name: &MonitorName, monitor_key: &str, ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { - let mut monitor_cursor = io::Cursor::new(self.kv_store.read( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key, - ).await?); + let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; + let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; + let monitor_bytes = self.kv_store.read(primary, secondary, monitor_key).await?; + let mut monitor_cursor = io::Cursor::new(monitor_bytes); // Discard the sentinel bytes if found. if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) { monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64); @@ -918,12 +916,9 @@ where async fn read_monitor_update( &self, monitor_key: &str, update_name: &UpdateName, ) -> Result { - let update_bytes = self.kv_store.read( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key, - update_name.as_str(), - ).await?; - ChannelMonitorUpdate::read(&mut io::Cursor::new(update_bytes)).map_err(|e| { + let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; + let update_bytes = self.kv_store.read(primary, monitor_key, update_name.as_str()).await?; + ChannelMonitorUpdate::read(&mut &update_bytes[..]).map_err(|e| { log_error!( self.logger, "Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}", @@ -937,27 +932,19 @@ where } async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { - let monitor_keys = self.kv_store.list( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - ).await?; + let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; + let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; + let monitor_keys = self.kv_store.list(primary, secondary).await?; for monitor_key in monitor_keys { let monitor_name = MonitorName::from_str(&monitor_key)?; let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?; - let updates = self - .kv_store - .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key.as_str()) - .await?; + let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; + let updates = self.kv_store.list(primary, monitor_key.as_str()).await?; for update in updates { let update_name = UpdateName::new(update)?; // if the update_id is lower than the stored monitor, delete if update_name.0 <= current_monitor.get_latest_update_id() { - self.kv_store.remove( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key.as_str(), - update_name.as_str(), - lazy, - ).await?; + self.kv_store.remove(primary, &monitor_key, update_name.as_str(), lazy).await?; } } } @@ -980,12 +967,9 @@ where monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); } monitor.write(&mut monitor_bytes).unwrap(); - self.kv_store.write( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - monitor_bytes, - ).await + let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; + let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; + self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes).await } async fn update_persisted_channel( @@ -1000,12 +984,10 @@ where if persist_update { let monitor_key = monitor_name.to_string(); let update_name = UpdateName::from(update.update_id); - self.kv_store.write( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key.as_str(), - update_name.as_str(), - update.encode(), - ).await + let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; + self.kv_store + .write(primary, &monitor_key, update_name.as_str(), update.encode()) + .await } else { // In case of channel-close monitor update, we need to read old monitor before persisting // the new one in order to determine the cleanup range. @@ -1059,21 +1041,13 @@ where Ok((_block_hash, monitor)) => monitor, Err(_) => return, }; - match self.kv_store.write( - ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - monitor.encode(), - ).await { + let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; + let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; + match self.kv_store.write(primary, secondary, &monitor_key, monitor.encode()).await { Ok(()) => {}, Err(_e) => return, }; - let _ = self.kv_store.remove( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_key.as_str(), - true, - ).await; + let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await; } // Cleans up monitor updates for given monitor in range `start..=end`. @@ -1081,12 +1055,9 @@ where let monitor_key = monitor_name.to_string(); for update_id in start..=end { let update_name = UpdateName::from(update_id); - if let Err(e) = self.kv_store.remove( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_key.as_str(), - update_name.as_str(), - true, - ).await { + let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; + let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await; + if let Err(e) = res { log_error!( self.logger, "Failed to clean up channel monitor updates for monitor {}, reason: {}", From 30455d902e62ce8a00910397056c5e42514cb3ff Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Sep 2025 00:22:17 +0000 Subject: [PATCH 07/21] Simplify legacy closed-channel monitor update persistence handling Pre-0.1, after a channel was closed we generated `ChannelMonitorUpdate`s with a static `update_id` of `u64::MAX`. In this case, when using `MonitorUpdatingPersister`, we had to read the persisted `ChannelMonitor` to figure out what range of monitor updates to remove from disk. However, now that we have a `list` method there's no reason to do this anymore, we can just use that. Simplifying code that we anticipate never hitting anymore is always a win. --- lightning/src/util/persist.rs | 61 +++++++++++++++-------------------- 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 00c09ba1a6b..c45cafa43ac 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -16,7 +16,6 @@ use alloc::sync::Arc; use bitcoin::hashes::hex::FromHex; use bitcoin::{BlockHash, Txid}; -use core::cmp; use core::future::Future; use core::ops::Deref; use core::pin::Pin; @@ -938,14 +937,22 @@ where for monitor_key in monitor_keys { let monitor_name = MonitorName::from_str(&monitor_key)?; let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?; - let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - let updates = self.kv_store.list(primary, monitor_key.as_str()).await?; - for update in updates { - let update_name = UpdateName::new(update)?; - // if the update_id is lower than the stored monitor, delete - if update_name.0 <= current_monitor.get_latest_update_id() { - self.kv_store.remove(primary, &monitor_key, update_name.as_str(), lazy).await?; - } + let latest_update_id = current_monitor.get_latest_update_id(); + self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await; + } + Ok(()) + } + + async fn cleanup_stale_updates_for_monitor_to( + &self, monitor_key: &str, latest_update_id: u64, lazy: bool, + ) -> Result<(), io::Error> { + let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; + let updates = self.kv_store.list(primary, monitor_key).await?; + for update in updates { + let update_name = UpdateName::new(update)?; + // if the update_id is lower than the stored monitor, delete + if update_name.0 <= latest_update_id { + self.kv_store.remove(primary, monitor_key, update_name.as_str(), lazy).await?; } } Ok(()) @@ -989,40 +996,23 @@ where .write(primary, &monitor_key, update_name.as_str(), update.encode()) .await } else { - // In case of channel-close monitor update, we need to read old monitor before persisting - // the new one in order to determine the cleanup range. - let maybe_old_monitor = match monitor.get_latest_update_id() { - LEGACY_CLOSED_CHANNEL_UPDATE_ID => { - let monitor_key = monitor_name.to_string(); - self.read_monitor(&monitor_name, &monitor_key).await.ok() - }, - _ => None, - }; - // We could write this update, but it meets criteria of our design that calls for a full monitor write. let write_status = self.persist_new_channel(monitor_name, monitor).await; if let Ok(()) = write_status { let channel_closed_legacy = monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID; - let cleanup_range = if channel_closed_legacy { - // If there is an error while reading old monitor, we skip clean up. - maybe_old_monitor.map(|(_, ref old_monitor)| { - let start = old_monitor.get_latest_update_id(); - // We never persist an update with the legacy closed update_id - let end = cmp::min( - start.saturating_add(self.maximum_pending_updates), - LEGACY_CLOSED_CHANNEL_UPDATE_ID - 1, - ); - (start, end) - }) + let latest_update_id = monitor.get_latest_update_id(); + if channel_closed_legacy { + let monitor_key = monitor_name.to_string(); + self.cleanup_stale_updates_for_monitor_to( + &monitor_key, + latest_update_id, + true, + ).await; } else { - let end = monitor.get_latest_update_id(); + let end = latest_update_id; let start = end.saturating_sub(self.maximum_pending_updates); - Some((start, end)) - }; - - if let Some((start, end)) = cleanup_range { self.cleanup_in_range(monitor_name, start, end).await; } } @@ -1261,6 +1251,7 @@ mod tests { use crate::util::test_utils::{self, TestStore}; use crate::{check_added_monitors, check_closed_broadcast}; use bitcoin::hashes::hex::FromHex; + use core::cmp; const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5; From 9bb2a41c964d6c58f0bd5f0cde2142279d924e82 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:51:26 +0000 Subject: [PATCH 08/21] f rustfmt --- lightning/src/util/persist.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index c45cafa43ac..f92fb99ea96 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -1009,7 +1009,8 @@ where &monitor_key, latest_update_id, true, - ).await; + ) + .await; } else { let end = latest_update_id; let start = end.saturating_sub(self.maximum_pending_updates); From 0aa3c6201f8c0e2acd579bf8f93f4b0443e47c3c Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 14:53:59 +0000 Subject: [PATCH 09/21] Add a generic public `FutureSpawner` in LDK directly In the next commit we'll use this to spawn async persistence operations in the background, but for now we just move the `lightning-block-sync` `FutureSpawner` into `lightning`. --- lightning-block-sync/src/gossip.rs | 14 +------------- lightning/src/util/mod.rs | 1 + lightning/src/util/native_async.rs | 19 +++++++++++++++++++ 3 files changed, 21 insertions(+), 13 deletions(-) create mode 100644 lightning/src/util/native_async.rs diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 083156baab3..0fe221b9231 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -10,11 +10,10 @@ use bitcoin::hash_types::BlockHash; use bitcoin::transaction::{OutPoint, TxOut}; use lightning::ln::peer_handler::APeerManager; - use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; - use lightning::util::logger::Logger; +use lightning::util::native_async::FutureSpawner; use std::collections::VecDeque; use std::future::Future; @@ -43,17 +42,6 @@ pub trait UtxoSource: BlockSource + 'static { fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool>; } -/// A generic trait which is able to spawn futures in the background. -/// -/// If the `tokio` feature is enabled, this is implemented on `TokioSpawner` struct which -/// delegates to `tokio::spawn()`. -pub trait FutureSpawner: Send + Sync + 'static { - /// Spawns the given future as a background task. - /// - /// This method MUST NOT block on the given future immediately. - fn spawn + Send + 'static>(&self, future: T); -} - #[cfg(feature = "tokio")] /// A trivial [`FutureSpawner`] which delegates to `tokio::spawn`. pub struct TokioSpawner; diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index 84c0c113f85..968f8222d9a 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -26,6 +26,7 @@ pub mod base32; pub(crate) mod base32; pub mod errors; pub mod message_signing; +pub mod native_async; pub mod persist; pub mod scid_utils; pub mod ser; diff --git a/lightning/src/util/native_async.rs b/lightning/src/util/native_async.rs new file mode 100644 index 00000000000..910e24a7b1f --- /dev/null +++ b/lightning/src/util/native_async.rs @@ -0,0 +1,19 @@ +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This module contains a few public utility which are used to run LDK in a native Rust async +//! environment. + +use crate::util::async_poll::MaybeSend; +use core::future::Future; + +/// A generic trait which is able to spawn futures in the background. +pub trait FutureSpawner: Send + Sync + 'static { + /// Spawns the given future as a background task. + /// + /// This method MUST NOT block on the given future immediately. + fn spawn + MaybeSend + 'static>(&self, future: T); +} From 56b58c8134064f3cb09b05179973d8497d794b8b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:04:48 +0000 Subject: [PATCH 10/21] Add async persistence logic in `MonitorUpdatingPersister` In the next commit we'll add the ability to use an async `KVStore` as the backing for a `ChainMonitor`. Here we tee this up by adding an async API to `MonitorUpdatingPersisterAsync`. Its not intended for public use and is thus only `pub(crate)` but allows spawning all operations via a generic `FutureSpawner` trait, initiating the write via the `KVStore` before any `await`s (or async functions). Because we aren't going to make the `ChannelManager` (or `ChainMonitor`) fully async, we need a way to alert the `ChainMonitor` when a persistence completes, but we leave that for the next commit. --- lightning/src/util/persist.rs | 197 ++++++++++++++++++++++++++-------- 1 file changed, 155 insertions(+), 42 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index f92fb99ea96..11a3de700d3 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -17,6 +17,7 @@ use bitcoin::hashes::hex::FromHex; use bitcoin::{BlockHash, Txid}; use core::future::Future; +use core::mem; use core::ops::Deref; use core::pin::Pin; use core::str::FromStr; @@ -32,7 +33,8 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; -use crate::util::async_poll::dummy_waker; +use crate::sync::Mutex; +use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; @@ -409,6 +411,21 @@ where Ok(res) } +/// A generic trait which is able to spawn futures in the background. +pub trait FutureSpawner: Send + Sync + 'static { + /// Spawns the given future as a background task. + /// + /// This method MUST NOT block on the given future immediately. + fn spawn + MaybeSend + 'static>(&self, future: T); +} + +struct PanicingSpawner; +impl FutureSpawner for PanicingSpawner { + fn spawn + MaybeSend + 'static>(&self, _: T) { + unreachable!(); + } +} + fn poll_sync_future(future: F) -> F::Output { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); @@ -507,7 +524,7 @@ fn poll_sync_future(future: F) -> F::Output { /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. pub struct MonitorUpdatingPersister( - MonitorUpdatingPersisterAsync, L, ES, SP, BI, FE>, + MonitorUpdatingPersisterAsync, PanicingSpawner, L, ES, SP, BI, FE>, ) where K::Target: KVStoreSync, @@ -553,6 +570,7 @@ where ) -> Self { MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new( KVStoreSyncWrapper(kv_store), + PanicingSpawner, logger, maximum_pending_updates, entropy_source, @@ -665,8 +683,8 @@ where &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let res = - poll_sync_future(self.0 .0.update_persisted_channel(monitor_name, update, monitor)); + let inner = Arc::clone(&self.0.0); + let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor)); match res { Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, Err(e) => { @@ -691,14 +709,18 @@ where /// async versions of the public accessors. /// /// Note that async monitor updating is considered beta, and bugs may be triggered by its use. +/// +/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used +/// directly by the [`ChainMonitor`]. pub struct MonitorUpdatingPersisterAsync< K: Deref, + S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref, ->(Arc>) +>(Arc>) where K::Target: KVStore, L::Target: Logger, @@ -709,12 +731,14 @@ where struct MonitorUpdatingPersisterAsyncInner< K: Deref, + S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref, -> where +> +where K::Target: KVStore, L::Target: Logger, ES::Target: EntropySource + Sized, @@ -723,6 +747,7 @@ struct MonitorUpdatingPersisterAsyncInner< FE::Target: FeeEstimator, { kv_store: K, + future_spawner: S, logger: L, maximum_pending_updates: u64, entropy_source: ES, @@ -731,8 +756,8 @@ struct MonitorUpdatingPersisterAsyncInner< fee_estimator: FE, } -impl - MonitorUpdatingPersisterAsync +impl + MonitorUpdatingPersisterAsync where K::Target: KVStore, L::Target: Logger, @@ -745,11 +770,12 @@ where /// /// See [`MonitorUpdatingPersister::new`] for more info. pub fn new( - kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, - signer_provider: SP, broadcaster: BI, fee_estimator: FE, + kv_store: K, future_spawner: S, logger: L, maximum_pending_updates: u64, + entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE, ) -> Self { MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner { kv_store, + future_spawner, logger, maximum_pending_updates, entropy_source, @@ -818,8 +844,78 @@ where } } -impl - MonitorUpdatingPersisterAsyncInner +impl< + K: Deref + MaybeSend + MaybeSync + 'static, + S: FutureSpawner, + L: Deref + MaybeSend + MaybeSync + 'static, + ES: Deref + MaybeSend + MaybeSync + 'static, + SP: Deref + MaybeSend + MaybeSync + 'static, + BI: Deref + MaybeSend + MaybeSync + 'static, + FE: Deref + MaybeSend + MaybeSync + 'static, +> + MonitorUpdatingPersisterAsync +where + K::Target: KVStore + MaybeSync, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, + ::EcdsaSigner: MaybeSend + 'static, +{ + pub(crate) fn spawn_async_persist_new_channel( + &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, + ) { + let inner = Arc::clone(&self.0); + let future = inner.persist_new_channel(monitor_name, monitor); + let channel_id = monitor.channel_id(); + self.0.future_spawner.spawn(async move { + match future.await { + Ok(()) => {}, // TODO: expose completions + Err(e) => { + log_error!( + inner.logger, + "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", + ); + }, + } + }); + } + + pub(crate) fn spawn_async_update_persisted_channel( + &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, + monitor: &ChannelMonitor<::EcdsaSigner>, + ) { + let inner = Arc::clone(&self.0); + let future = inner.update_persisted_channel(monitor_name, update, monitor); + let channel_id = monitor.channel_id(); + let inner = Arc::clone(&self.0); + self.0.future_spawner.spawn(async move { + match future.await { + Ok(()) => {}, // TODO: expose completions + Err(e) => { + log_error!( + inner.logger, + "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", + ); + }, + } + }); + } + + pub(crate) fn spawn_async_archive_persisted_channel( + &self, monitor_name: MonitorName, + ) { + let inner = Arc::clone(&self.0); + self.0.future_spawner.spawn(async move { + inner.archive_persisted_channel(monitor_name).await; + }); + } +} + + +impl + MonitorUpdatingPersisterAsyncInner where K::Target: KVStore, L::Target: Logger, @@ -938,7 +1034,7 @@ where let monitor_name = MonitorName::from_str(&monitor_key)?; let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?; let latest_update_id = current_monitor.get_latest_update_id(); - self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await; + self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await?; } Ok(()) } @@ -958,9 +1054,9 @@ where Ok(()) } - async fn persist_new_channel( + fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, - ) -> Result<(), io::Error> { + ) -> impl Future> { // Determine the proper key for this monitor let monitor_key = monitor_name.to_string(); // Serialize and write the new monitor @@ -974,16 +1070,22 @@ where monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); } monitor.write(&mut monitor_bytes).unwrap(); + // Note that this is NOT an async function, but rather calls the *sync* KVStore write + // method, allowing it to do its queueing immediately, and then return a future for the + // completion of the write. This ensures monitor persistence ordering is preserved. let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; - self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes).await + self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes) } - async fn update_persisted_channel( - &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, + fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>( + self: Arc, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, - ) -> Result<(), io::Error> { + ) -> impl Future> + 'a where Self: 'a { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; + let mut res_a = None; + let mut res_b = None; + let mut res_c = None; if let Some(update) = update { let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID && self.maximum_pending_updates != 0 @@ -992,37 +1094,48 @@ where let monitor_key = monitor_name.to_string(); let update_name = UpdateName::from(update.update_id); let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - self.kv_store + res_a = Some(self.kv_store .write(primary, &monitor_key, update_name.as_str(), update.encode()) - .await + ); } else { // We could write this update, but it meets criteria of our design that calls for a full monitor write. - let write_status = self.persist_new_channel(monitor_name, monitor).await; - - if let Ok(()) = write_status { - let channel_closed_legacy = - monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID; - let latest_update_id = monitor.get_latest_update_id(); - if channel_closed_legacy { - let monitor_key = monitor_name.to_string(); - self.cleanup_stale_updates_for_monitor_to( - &monitor_key, - latest_update_id, - true, - ) - .await; - } else { - let end = latest_update_id; - let start = end.saturating_sub(self.maximum_pending_updates); - self.cleanup_in_range(monitor_name, start, end).await; + let write_fut = self.persist_new_channel(monitor_name, monitor); + let latest_update_id = monitor.get_latest_update_id(); + + res_b = Some(async move { + let write_status = write_fut.await; + if let Ok(()) = write_status { + if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID { + let monitor_key = monitor_name.to_string(); + self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, true).await?; + } else { + let end = latest_update_id; + let start = end.saturating_sub(self.maximum_pending_updates); + self.cleanup_in_range(monitor_name, start, end).await; + } } - } - write_status + write_status + }); } } else { // There is no update given, so we must persist a new monitor. - self.persist_new_channel(monitor_name, monitor).await + // Note that this is NOT an async function, but rather calls the *sync* KVStore write + // method, allowing it to do its queueing immediately, and then return a future for the + // completion of the write. This ensures monitor persistence ordering is preserved. + res_c = Some(self.persist_new_channel(monitor_name, monitor)); + } + async move { + if let Some(a) = res_a { + a.await?; + } + if let Some(b) = res_b { + b.await?; + } + if let Some(c) = res_c { + c.await?; + } + Ok(()) } } From d56620def4e80ce91b9ea71561d4480b897f32ce Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Sep 2025 20:58:52 +0000 Subject: [PATCH 11/21] f more comments --- lightning/src/util/persist.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 11a3de700d3..e7b40cf9355 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -1094,11 +1094,19 @@ where let monitor_key = monitor_name.to_string(); let update_name = UpdateName::from(update.update_id); let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; + // Note that this is NOT an async function, but rather calls the *sync* KVStore + // write method, allowing it to do its queueing immediately, and then return a + // future for the completion of the write. This ensures monitor persistence + // ordering is preserved. res_a = Some(self.kv_store .write(primary, &monitor_key, update_name.as_str(), update.encode()) ); } else { // We could write this update, but it meets criteria of our design that calls for a full monitor write. + // Note that this is NOT an async function, but rather calls the *sync* KVStore + // write method, allowing it to do its queueing immediately, and then return a + // future for the completion of the write. This ensures monitor persistence + // ordering is preserved. This, thus, must happen before any await we do below. let write_fut = self.persist_new_channel(monitor_name, monitor); let latest_update_id = monitor.get_latest_update_id(); @@ -1126,6 +1134,9 @@ where res_c = Some(self.persist_new_channel(monitor_name, monitor)); } async move { + // Complete any pending future(s). Note that to keep one return type we have to end + // with a single async move block that we return, rather than trying to return the + // individual futures themselves. if let Some(a) = res_a { a.await?; } From 20be88be99a594acf21a72d0b4395f10e8ca536d Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:04:23 +0000 Subject: [PATCH 12/21] f doc link --- lightning/src/util/persist.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index e7b40cf9355..d54b31fe202 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -712,6 +712,8 @@ where /// /// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used /// directly by the [`ChainMonitor`]. +/// +/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor pub struct MonitorUpdatingPersisterAsync< K: Deref, S: FutureSpawner, From 664c6cb1ee7714bf63d0d823b013f2d907800478 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:04:57 +0000 Subject: [PATCH 13/21] f move poller --- lightning/src/util/persist.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index d54b31fe202..8d72b269370 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -36,6 +36,7 @@ use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; use crate::util::logger::Logger; +use crate::util::native_async::FutureSpawner; use crate::util::ser::{Readable, ReadableArgs, Writeable}; /// The alphabet of characters allowed for namespaces and keys. @@ -411,14 +412,6 @@ where Ok(res) } -/// A generic trait which is able to spawn futures in the background. -pub trait FutureSpawner: Send + Sync + 'static { - /// Spawns the given future as a background task. - /// - /// This method MUST NOT block on the given future immediately. - fn spawn + MaybeSend + 'static>(&self, future: T); -} - struct PanicingSpawner; impl FutureSpawner for PanicingSpawner { fn spawn + MaybeSend + 'static>(&self, _: T) { From cc97a7ae7a0f008c5510402a698891c6eafa1877 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 16:42:49 +0000 Subject: [PATCH 14/21] f rustfmt --- lightning/src/util/persist.rs | 53 ++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 8d72b269370..f5794f22108 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -676,7 +676,7 @@ where &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let inner = Arc::clone(&self.0.0); + let inner = Arc::clone(&self.0 .0); let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor)); match res { Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, @@ -706,7 +706,7 @@ where /// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used /// directly by the [`ChainMonitor`]. /// -/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor +/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor pub struct MonitorUpdatingPersisterAsync< K: Deref, S: FutureSpawner, @@ -732,8 +732,7 @@ struct MonitorUpdatingPersisterAsyncInner< SP: Deref, BI: Deref, FE: Deref, -> -where +> where K::Target: KVStore, L::Target: Logger, ES::Target: EntropySource + Sized, @@ -840,15 +839,14 @@ where } impl< - K: Deref + MaybeSend + MaybeSync + 'static, - S: FutureSpawner, - L: Deref + MaybeSend + MaybeSync + 'static, - ES: Deref + MaybeSend + MaybeSync + 'static, - SP: Deref + MaybeSend + MaybeSync + 'static, - BI: Deref + MaybeSend + MaybeSync + 'static, - FE: Deref + MaybeSend + MaybeSync + 'static, -> - MonitorUpdatingPersisterAsync + K: Deref + MaybeSend + MaybeSync + 'static, + S: FutureSpawner, + L: Deref + MaybeSend + MaybeSync + 'static, + ES: Deref + MaybeSend + MaybeSync + 'static, + SP: Deref + MaybeSend + MaybeSync + 'static, + BI: Deref + MaybeSend + MaybeSync + 'static, + FE: Deref + MaybeSend + MaybeSync + 'static, + > MonitorUpdatingPersisterAsync where K::Target: KVStore + MaybeSync, L::Target: Logger, @@ -859,7 +857,8 @@ where ::EcdsaSigner: MaybeSend + 'static, { pub(crate) fn spawn_async_persist_new_channel( - &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, + &self, monitor_name: MonitorName, + monitor: &ChannelMonitor<::EcdsaSigner>, ) { let inner = Arc::clone(&self.0); let future = inner.persist_new_channel(monitor_name, monitor); @@ -898,9 +897,7 @@ where }); } - pub(crate) fn spawn_async_archive_persisted_channel( - &self, monitor_name: MonitorName, - ) { + pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) { let inner = Arc::clone(&self.0); self.0.future_spawner.spawn(async move { inner.archive_persisted_channel(monitor_name).await; @@ -908,7 +905,6 @@ where } } - impl MonitorUpdatingPersisterAsyncInner where @@ -1076,7 +1072,10 @@ where fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>( self: Arc, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, - ) -> impl Future> + 'a where Self: 'a { + ) -> impl Future> + 'a + where + Self: 'a, + { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; let mut res_a = None; let mut res_b = None; @@ -1093,9 +1092,12 @@ where // write method, allowing it to do its queueing immediately, and then return a // future for the completion of the write. This ensures monitor persistence // ordering is preserved. - res_a = Some(self.kv_store - .write(primary, &monitor_key, update_name.as_str(), update.encode()) - ); + res_a = Some(self.kv_store.write( + primary, + &monitor_key, + update_name.as_str(), + update.encode(), + )); } else { // We could write this update, but it meets criteria of our design that calls for a full monitor write. // Note that this is NOT an async function, but rather calls the *sync* KVStore @@ -1110,7 +1112,12 @@ where if let Ok(()) = write_status { if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID { let monitor_key = monitor_name.to_string(); - self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, true).await?; + self.cleanup_stale_updates_for_monitor_to( + &monitor_key, + latest_update_id, + true, + ) + .await?; } else { let end = latest_update_id; let start = end.saturating_sub(self.maximum_pending_updates); From 139ceb00aa798110b42cdd19a4f9b5bd263334c2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Sep 2025 21:04:52 +0000 Subject: [PATCH 15/21] Add support for native async `KVStore` persist to `ChainMonitor` This finally adds support for full native Rust `async` persistence to `ChainMonitor`. Way back when, before we had any other persistence, we added the `Persist` trait to persist `ChannelMonitor`s. It eventualy grew homegrown async persistence support via a simple immediate return and callback upon completion. We later added a persistence trait in `lightning-background-processor` to persist the few fields that it needed to drive writes for. Over time, we found more places where persistence was useful, and we eventually added a generic `KVStore` trait. In dc75436c673fad8b5b8ed8d5a9db1ac95650685a we removed the `lightning-background-processor` `Persister` in favor of simply using the native `KVStore` directly. Here we continue that trend, building native `async` `ChannelMonitor` persistence on top of our native `KVStore` rather than hacking support for it into the `chain::Persist` trait. Because `MonitorUpdatingPersister` already exists as a common way to wrap a `KVStore` into a `ChannelMonitor` persister, we build exclusively on that (though note that the "monitor updating" part is now optional), utilizing its new async option as our native async driver. Thus, we end up with a `ChainMonitor::new_async_beta` which takes a `MonitorUpdatingPersisterAsync` rather than a classic `chain::Persist` and then operates the same as a normal `ChainMonitor`. While the requirement that users now use a `MonitorUpdatingPersister` to wrap their `KVStore` before providing it to `ChainMonitor` is somewhat awkward, as we move towards a `KVStore`-only world it seems like `MonitorUpdatingPersister` should eventually merge into `ChainMonitor`. --- lightning/src/chain/chainmonitor.rs | 133 +++++++++++++++++++++++++++- lightning/src/util/persist.rs | 20 ++++- 2 files changed, 148 insertions(+), 5 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 36d26aee971..4151a36c7fc 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -46,12 +46,13 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol use crate::ln::types::ChannelId; use crate::prelude::*; use crate::sign::ecdsa::EcdsaChannelSigner; -use crate::sign::{EntropySource, PeerStorageKey}; +use crate::sign::{EntropySource, PeerStorageKey, SignerProvider}; use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::types::features::{InitFeatures, NodeFeatures}; +use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; -use crate::util::persist::MonitorName; +use crate::util::persist::{FutureSpawner, MonitorName, MonitorUpdatingPersisterAsync, KVStore}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; @@ -192,6 +193,15 @@ pub trait Persist { /// restart, this method must in that case be idempotent, ensuring it can handle scenarios where /// the monitor already exists in the archive. fn archive_persisted_channel(&self, monitor_name: MonitorName); + + /// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with + /// [`Self::update_persisted_channel`], which have completed. + /// + /// Returning an update here is equivalent to calling + /// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and + /// hidden in the docs. + #[doc(hidden)] + fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { Vec::new() } } struct MonitorHolder { @@ -235,6 +245,73 @@ impl Deref for LockedChannelMonitor<'_, Chann } } + +/// An unconstructable [`Persist`]er which is used under the hood when you call +/// [`ChainMonitor::new_async_beta`]. +pub struct AsyncPersister +where + K::Target: KVStore + MaybeSync, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator +{ + persister: MonitorUpdatingPersisterAsync, +} + +impl +Deref for AsyncPersister +where + K::Target: KVStore + MaybeSync, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator +{ + type Target = Self; + fn deref(&self) -> &Self { + self + } +} + +impl +Persist<::EcdsaSigner> for AsyncPersister +where + K::Target: KVStore + MaybeSync, + L::Target: Logger, + ES::Target: EntropySource + Sized, + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator, + ::EcdsaSigner: MaybeSend + 'static, +{ + fn persist_new_channel( + &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, + ) -> ChannelMonitorUpdateStatus { + self.persister.spawn_async_persist_new_channel(monitor_name, monitor); + ChannelMonitorUpdateStatus::InProgress + } + + fn update_persisted_channel( + &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, + monitor: &ChannelMonitor<::EcdsaSigner>, + ) -> ChannelMonitorUpdateStatus { + self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor); + ChannelMonitorUpdateStatus::InProgress + } + + fn archive_persisted_channel(&self, monitor_name: MonitorName) { + self.persister.spawn_async_archive_persisted_channel(monitor_name); + } + + fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { + self.persister.get_and_clear_completed_updates() + } +} + + /// An implementation of [`chain::Watch`] for monitoring channels. /// /// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by @@ -291,6 +368,55 @@ pub struct ChainMonitor< our_peerstorage_encryption_key: PeerStorageKey, } +impl< + K: Deref + MaybeSend + MaybeSync + 'static, + S: FutureSpawner, + SP: Deref + MaybeSend + MaybeSync + 'static, + C: Deref, + T: Deref + MaybeSend + MaybeSync + 'static, + F: Deref + MaybeSend + MaybeSync + 'static, + L: Deref + MaybeSend + MaybeSync + 'static, + ES: Deref + MaybeSend + MaybeSync + 'static, + > ChainMonitor<::EcdsaSigner, C, T, F, L, AsyncPersister, ES> +where + K::Target: KVStore + MaybeSync, + SP::Target: SignerProvider + Sized, + C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + ES::Target: EntropySource + Sized, + ::EcdsaSigner: MaybeSend + 'static, +{ + /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. + /// + /// This behaves the same as [`ChainMonitor::new`] except that it relies on + /// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async. + /// + /// Note that async monitor updating is considered beta, and bugs may be triggered by its use. + pub fn new_async_beta( + chain_source: Option, broadcaster: T, logger: L, feeest: F, + persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, + _our_peerstorage_encryption_key: PeerStorageKey, + ) -> Self { + Self { + monitors: RwLock::new(new_hash_map()), + chain_source, + broadcaster, + logger, + fee_estimator: feeest, + persister: AsyncPersister { persister }, + _entropy_source, + pending_monitor_events: Mutex::new(Vec::new()), + highest_chain_height: AtomicUsize::new(0), + event_notifier: Notifier::new(), + pending_send_only_events: Mutex::new(Vec::new()), + #[cfg(peer_storage)] + our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + } + } +} + impl< ChannelSigner: EcdsaChannelSigner, C: Deref, @@ -1357,6 +1483,9 @@ where fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { + self.channel_monitor_updated(channel_id, update_id); + } let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index f5794f22108..738e8fc69f5 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -704,7 +704,7 @@ where /// Note that async monitor updating is considered beta, and bugs may be triggered by its use. /// /// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used -/// directly by the [`ChainMonitor`]. +/// directly by the [`ChainMonitor`] via [`ChainMonitor::new_async_beta`]. /// /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor pub struct MonitorUpdatingPersisterAsync< @@ -741,6 +741,7 @@ struct MonitorUpdatingPersisterAsyncInner< FE::Target: FeeEstimator, { kv_store: K, + async_completed_updates: Mutex>, future_spawner: S, logger: L, maximum_pending_updates: u64, @@ -769,6 +770,7 @@ where ) -> Self { MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner { kv_store, + async_completed_updates: Mutex::new(Vec::new()), future_spawner, logger, maximum_pending_updates, @@ -863,9 +865,10 @@ where let inner = Arc::clone(&self.0); let future = inner.persist_new_channel(monitor_name, monitor); let channel_id = monitor.channel_id(); + let completion = (monitor.channel_id(), monitor.get_latest_update_id()); self.0.future_spawner.spawn(async move { match future.await { - Ok(()) => {}, // TODO: expose completions + Ok(()) => inner.async_completed_updates.lock().unwrap().push(completion), Err(e) => { log_error!( inner.logger, @@ -883,10 +886,17 @@ where let inner = Arc::clone(&self.0); let future = inner.update_persisted_channel(monitor_name, update, monitor); let channel_id = monitor.channel_id(); + let completion = if let Some(update) = update { + Some((monitor.channel_id(), update.update_id)) + } else { + None + }; let inner = Arc::clone(&self.0); self.0.future_spawner.spawn(async move { match future.await { - Ok(()) => {}, // TODO: expose completions + Ok(()) => if let Some(completion) = completion { + inner.async_completed_updates.lock().unwrap().push(completion); + }, Err(e) => { log_error!( inner.logger, @@ -903,6 +913,10 @@ where inner.archive_persisted_channel(monitor_name).await; }); } + + pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { + mem::take(&mut *self.0.async_completed_updates.lock().unwrap()) + } } impl From 4b3713f4c6681bf185f0496c1226f74959c7cd01 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Sep 2025 21:02:06 +0000 Subject: [PATCH 16/21] f more comments --- lightning/src/util/persist.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 738e8fc69f5..9bb67077093 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -863,6 +863,8 @@ where monitor: &ChannelMonitor<::EcdsaSigner>, ) { let inner = Arc::clone(&self.0); + // Note that `persist_new_channel` is a sync method which calls all the way through to the + // sync KVStore::write method (which returns a future) to ensure writes are well-ordered. let future = inner.persist_new_channel(monitor_name, monitor); let channel_id = monitor.channel_id(); let completion = (monitor.channel_id(), monitor.get_latest_update_id()); @@ -884,6 +886,8 @@ where monitor: &ChannelMonitor<::EcdsaSigner>, ) { let inner = Arc::clone(&self.0); + // Note that `update_persisted_channel` is a sync method which calls all the way through to + // the sync KVStore::write method (which returns a future) to ensure writes are well-ordered let future = inner.update_persisted_channel(monitor_name, update, monitor); let channel_id = monitor.channel_id(); let completion = if let Some(update) = update { From c289186830f0350a266da51369cadb4d49699df7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Sep 2025 21:06:15 +0000 Subject: [PATCH 17/21] f doc link --- lightning/src/util/persist.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 9bb67077093..56484521196 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -707,6 +707,7 @@ where /// directly by the [`ChainMonitor`] via [`ChainMonitor::new_async_beta`]. /// /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor +/// [`ChainMonitor::new_async_beta`]: crate::chain::chainmonitor::ChainMonitor::new_async_beta pub struct MonitorUpdatingPersisterAsync< K: Deref, S: FutureSpawner, From 2ab9e83a76eb31fef0b598613498daba7aab05a5 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 14:50:55 +0000 Subject: [PATCH 18/21] f explain why its okay to use a panicing spawner --- lightning/src/util/persist.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 56484521196..b42deb67edb 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -561,6 +561,9 @@ where kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE, ) -> Self { + // Note that calling the spawner only happens in the `pub(crate)` `spawn_*` methods defined + // with additional bounds on `MonitorUpdatingPersisterAsync`. Thus its safe to provide a + // dummy always-panic implementation here. MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new( KVStoreSyncWrapper(kv_store), PanicingSpawner, From 09df3eda268bb7a7c1b307137deaf2cef73591a2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:02:04 +0000 Subject: [PATCH 19/21] f ignore useless return valuet --- lightning/src/chain/chainmonitor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 4151a36c7fc..581247c4dc9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1484,7 +1484,7 @@ where &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { - self.channel_monitor_updated(channel_id, update_id); + let _ = self.channel_monitor_updated(channel_id, update_id); } let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { From 37cf6d33f582f21b0db827f1fb06440d3ea81fd4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 15:06:50 +0000 Subject: [PATCH 20/21] f move spawner --- lightning/src/chain/chainmonitor.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 581247c4dc9..9e4ee62f8ad 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -52,7 +52,8 @@ use crate::types::features::{InitFeatures, NodeFeatures}; use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; -use crate::util::persist::{FutureSpawner, MonitorName, MonitorUpdatingPersisterAsync, KVStore}; +use crate::util::native_async::FutureSpawner; +use crate::util::persist::{MonitorName, MonitorUpdatingPersisterAsync, KVStore}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; From dfaa102f069fe9c28697748c9fe01122ce21aaf8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 11 Sep 2025 16:54:58 +0000 Subject: [PATCH 21/21] f rustfmt --- lightning/src/chain/chainmonitor.rs | 60 +++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9e4ee62f8ad..4abd0cd88c0 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -53,7 +53,7 @@ use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; use crate::util::native_async::FutureSpawner; -use crate::util::persist::{MonitorName, MonitorUpdatingPersisterAsync, KVStore}; +use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; @@ -202,7 +202,9 @@ pub trait Persist { /// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and /// hidden in the docs. #[doc(hidden)] - fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { Vec::new() } + fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { + Vec::new() + } } struct MonitorHolder { @@ -246,30 +248,43 @@ impl Deref for LockedChannelMonitor<'_, Chann } } - /// An unconstructable [`Persist`]er which is used under the hood when you call /// [`ChainMonitor::new_async_beta`]. -pub struct AsyncPersister -where +pub struct AsyncPersister< + K: Deref + MaybeSend + MaybeSync + 'static, + S: FutureSpawner, + L: Deref + MaybeSend + MaybeSync + 'static, + ES: Deref + MaybeSend + MaybeSync + 'static, + SP: Deref + MaybeSend + MaybeSync + 'static, + BI: Deref + MaybeSend + MaybeSync + 'static, + FE: Deref + MaybeSend + MaybeSync + 'static, +> where K::Target: KVStore + MaybeSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator + FE::Target: FeeEstimator, { persister: MonitorUpdatingPersisterAsync, } -impl -Deref for AsyncPersister +impl< + K: Deref + MaybeSend + MaybeSync + 'static, + S: FutureSpawner, + L: Deref + MaybeSend + MaybeSync + 'static, + ES: Deref + MaybeSend + MaybeSync + 'static, + SP: Deref + MaybeSend + MaybeSync + 'static, + BI: Deref + MaybeSend + MaybeSync + 'static, + FE: Deref + MaybeSend + MaybeSync + 'static, + > Deref for AsyncPersister where K::Target: KVStore + MaybeSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator + FE::Target: FeeEstimator, { type Target = Self; fn deref(&self) -> &Self { @@ -277,8 +292,15 @@ where } } -impl -Persist<::EcdsaSigner> for AsyncPersister +impl< + K: Deref + MaybeSend + MaybeSync + 'static, + S: FutureSpawner, + L: Deref + MaybeSend + MaybeSync + 'static, + ES: Deref + MaybeSend + MaybeSync + 'static, + SP: Deref + MaybeSend + MaybeSync + 'static, + BI: Deref + MaybeSend + MaybeSync + 'static, + FE: Deref + MaybeSend + MaybeSync + 'static, + > Persist<::EcdsaSigner> for AsyncPersister where K::Target: KVStore + MaybeSync, L::Target: Logger, @@ -289,7 +311,8 @@ where ::EcdsaSigner: MaybeSend + 'static, { fn persist_new_channel( - &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, + &self, monitor_name: MonitorName, + monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { self.persister.spawn_async_persist_new_channel(monitor_name, monitor); ChannelMonitorUpdateStatus::InProgress @@ -312,7 +335,6 @@ where } } - /// An implementation of [`chain::Watch`] for monitoring channels. /// /// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by @@ -378,8 +400,16 @@ impl< F: Deref + MaybeSend + MaybeSync + 'static, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, - > ChainMonitor<::EcdsaSigner, C, T, F, L, AsyncPersister, ES> -where + > + ChainMonitor< + ::EcdsaSigner, + C, + T, + F, + L, + AsyncPersister, + ES, + > where K::Target: KVStore + MaybeSync, SP::Target: SignerProvider + Sized, C::Target: chain::Filter,