diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 46d990bb37e..39da6e95460 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -44,6 +44,7 @@ use lightning_rapid_gossip_sync::RapidGossipSync; use core::ops::Deref; use core::time::Duration; +use std::marker::PhantomData; #[cfg(feature = "std")] use core::sync::atomic::{AtomicBool, Ordering}; @@ -751,10 +752,27 @@ pub async fn process_events_async< Sleeper: Fn(Duration) -> SleepFuture, FetchTime: Fn() -> Option, >( - persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, - onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, - logger: L, scorer: Option, sleeper: Sleeper, mobile_interruptable_platform: bool, - fetch_time: FetchTime, + #[rustfmt::skip] config: BackgroundProcessorConfig< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EventHandler, + PS, + M, + CM, + OM, + PGS, + RGS, + PM, + S, + SC, + >, + sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, ) -> Result<(), lightning::io::Error> where UL::Target: 'static + UtxoLookup, @@ -770,11 +788,11 @@ where { let mut should_break = false; let async_event_handler = |event| { - let network_graph = gossip_sync.network_graph(); - let event_handler = &event_handler; - let scorer = &scorer; - let logger = &logger; - let persister = &persister; + let network_graph = config.gossip_sync.network_graph(); + let event_handler = &config.event_handler; + let scorer = &config.scorer; + let logger = &config.logger; + let persister = &config.persister; let fetch_time = &fetch_time; // We should be able to drop the Box once our MSRV is 1.68 Box::pin(async move { @@ -798,6 +816,16 @@ where event_handler(event).await }) }; + // We should extract these out of config because the macro expects individual arguments + let persister = config.persister; + let chain_monitor = config.chain_monitor; + let channel_manager = config.channel_manager; + let onion_messenger = config.onion_messenger; + let peer_manager = config.peer_manager; + let gossip_sync = config.gossip_sync; + let logger = config.logger; + let scorer = config.scorer; + define_run_body!( persister, chain_monitor, @@ -1007,6 +1035,81 @@ impl BackgroundProcessor { Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } + /// Creates a new [`BackgroundProcessor`] from a [`BackgroundProcessorConfig`]. + /// This provides a more structured approach to configuration. The processor will start processing events immediately upon creation. + /// + /// This method is functionally equivalent to [`BackgroundProcessor::start`], but takes a configuration + /// object instead of individual parameters. + /// + pub fn from_config< + 'a, + UL: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + T: 'static + Deref + Send + Sync, + F: 'static + Deref + Send + Sync, + G: 'static + Deref> + Send + Sync, + L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, + EH: 'static + EventHandler + Send, + PS: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P>> + + Send + + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, + PGS: 'static + Deref> + Send + Sync, + RGS: 'static + Deref> + Send, + PM: 'static + Deref + Send + Sync, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + >( + #[rustfmt::skip] config: BackgroundProcessorConfig< + 'a, + UL, + CF, + T, + F, + G, + L, + P, + EH, + PS, + M, + CM, + OM, + PGS, + RGS, + PM, + S, + SC, + >, + ) -> Self + where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, + PM::Target: APeerManager + Send + Sync, + { + Self::start( + config.persister, + config.event_handler, + config.chain_monitor, + config.channel_manager, + config.onion_messenger, + config.gossip_sync, + config.peer_manager, + config.logger, + config.scorer, + ) + } + /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting /// [`ChannelManager`]. /// @@ -1048,6 +1151,222 @@ impl BackgroundProcessor { } } +/// Configuration for synchronous [`BackgroundProcessor`] +#[cfg_attr(feature = "futures", doc = "and asynchronous [`process_events_async`]")] +/// event processing. +/// +/// This configuration holds all components needed for background processing, +/// including required components (like the channel manager and peer manager) and optional +/// components (like the onion messenger and scorer). +/// +/// The configuration can be constructed using [`BackgroundProcessorConfigBuilder`], which provides +/// a convenient builder pattern for setting up both required and optional components. +/// +/// This same configuration can be used for +#[cfg_attr( + not(feature = "futures"), + doc = "creating a [`BackgroundProcessor`] via [`BackgroundProcessor::from_config`]." +)] +#[cfg_attr( + feature = "futures", + doc = "both: +/// * Creating a [`BackgroundProcessor`] via [`BackgroundProcessor::from_config`] +/// * Running the async variant of the background processor via [`process_events_async`]" +)] +/// +#[cfg(any(feature = "std", feature = "futures"))] +pub struct BackgroundProcessorConfig< + 'a, + UL: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + T: 'static + Deref + Send + Sync, + F: 'static + Deref + Send + Sync, + G: 'static + Deref> + Send + Sync, + L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, + #[cfg(feature = "std")] EH: 'static + EventHandler + Send, + #[cfg(feature = "futures")] EventHandlerFuture: core::future::Future>, + #[cfg(feature = "futures")] EH: 'static + Fn(Event) -> EventHandlerFuture, + PS: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P>> + + Send + + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, + PGS: 'static + Deref> + Send + Sync, + RGS: 'static + Deref> + Send, + PM: 'static + Deref + Send + Sync, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, +> where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, + PM::Target: APeerManager + Send + Sync, +{ + persister: PS, + event_handler: EH, + chain_monitor: M, + channel_manager: CM, + onion_messenger: Option, + gossip_sync: GossipSync, + peer_manager: PM, + logger: L, + scorer: Option, + _phantom: PhantomData<(&'a (), CF, T, F, P)>, +} + +/// A builder for constructing a [`BackgroundProcessorConfig`] with optional components. +/// +/// This builder provides a flexible and type-safe way to construct a [`BackgroundProcessorConfig`] +/// with optional components like `onion_messenger` and `scorer`. It helps avoid specifying +/// concrete types for components that aren't being used. +#[cfg(any(feature = "std", feature = "futures"))] +pub struct BackgroundProcessorConfigBuilder< + 'a, + UL: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + T: 'static + Deref + Send + Sync, + F: 'static + Deref + Send + Sync, + G: 'static + Deref> + Send + Sync, + L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, + #[cfg(feature = "std")] EH: 'static + EventHandler + Send, + #[cfg(feature = "futures")] EventHandlerFuture: core::future::Future>, + #[cfg(feature = "futures")] EH: 'static + Fn(Event) -> EventHandlerFuture, + PS: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P>> + + Send + + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, + PGS: 'static + Deref> + Send + Sync, + RGS: 'static + Deref> + Send, + PM: 'static + Deref + Send + Sync, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, +> where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, + PM::Target: APeerManager + Send + Sync, +{ + persister: PS, + event_handler: EH, + chain_monitor: M, + channel_manager: CM, + onion_messenger: Option, + gossip_sync: GossipSync, + peer_manager: PM, + logger: L, + scorer: Option, + _phantom: PhantomData<(&'a (), CF, T, F, P)>, +} + +#[cfg(any(feature = "std", feature = "futures"))] +impl< + 'a, + UL: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + T: 'static + Deref + Send + Sync, + F: 'static + Deref + Send + Sync, + G: 'static + Deref> + Send + Sync, + L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, + #[cfg(feature = "std")] EH: 'static + EventHandler + Send, + #[cfg(feature = "futures")] EventHandlerFuture: core::future::Future>, + #[cfg(feature = "futures")] EH: 'static + Fn(Event) -> EventHandlerFuture, + PS: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P>> + + Send + + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref + Send + Sync, + PGS: 'static + Deref> + Send + Sync, + RGS: 'static + Deref> + Send, + PM: 'static + Deref + Send + Sync, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + > + BackgroundProcessorConfigBuilder<'a, UL, CF, T, F, G, L, P, EH, PS, M, CM, OM, PGS, RGS, PM, S, SC> +where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, S>, + CM::Target: AChannelManager + Send + Sync, + OM::Target: AOnionMessenger + Send + Sync, + PM::Target: APeerManager + Send + Sync, +{ + /// Creates a new builder instance. + pub fn new( + persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + gossip_sync: GossipSync, peer_manager: PM, logger: L, + ) -> Self { + Self { + persister, + event_handler, + chain_monitor, + channel_manager, + onion_messenger: None, + gossip_sync, + peer_manager, + logger, + scorer: None, + _phantom: PhantomData, + } + } + + /// Sets the optional onion messenger component. + pub fn with_onion_messenger(&mut self, onion_messenger: OM) -> &mut Self { + self.onion_messenger = Some(onion_messenger); + self + } + + /// Sets the optional scorer component. + pub fn with_scorer(&mut self, scorer: S) -> &mut Self { + self.scorer = Some(scorer); + self + } + + /// Builds and returns a [`BackgroundProcessorConfig`] object. + pub fn build( + self, + ) -> BackgroundProcessorConfig<'a, UL, CF, T, F, G, L, P, EH, PS, M, CM, OM, PGS, RGS, PM, S, SC> + { + BackgroundProcessorConfig { + persister: self.persister, + event_handler: self.event_handler, + chain_monitor: self.chain_monitor, + channel_manager: self.channel_manager, + onion_messenger: self.onion_messenger, + gossip_sync: self.gossip_sync, + peer_manager: self.peer_manager, + logger: self.logger, + scorer: self.scorer, + _phantom: PhantomData, + } + } +} + #[cfg(feature = "std")] impl Drop for BackgroundProcessor { fn drop(&mut self) { @@ -1057,7 +1376,9 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { - use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER}; + use super::{ + BackgroundProcessor, BackgroundProcessorConfigBuilder, GossipSync, FRESHNESS_TIMER, + }; use bitcoin::constants::{genesis_block, ChainHash}; use bitcoin::hashes::Hash; use bitcoin::locktime::absolute::LockTime; @@ -1990,16 +2311,22 @@ mod tests { Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); - let bp_future = super::process_events_async( + let mut builder = BackgroundProcessorConfigBuilder::new( persister, |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), + ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()); + let config = builder.build(); + + let bp_future = super::process_events_async( + config, move |dur: Duration| { Box::pin(async move { tokio::time::sleep(dur).await; @@ -2469,17 +2796,23 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); - let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); - let bp_future = super::process_events_async( + let mut builder = BackgroundProcessorConfigBuilder::new( persister, |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), + ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()); + let config = builder.build(); + + let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); + let bp_future = super::process_events_async( + config, move |dur: Duration| { let mut exit_receiver = exit_receiver.clone(); Box::pin(async move { @@ -2682,16 +3015,22 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); - let bp_future = super::process_events_async( + let mut builder = BackgroundProcessorConfigBuilder::new( persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), - Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), - Some(nodes[0].scorer.clone()), + ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()); + let config = builder.build(); + + let bp_future = super::process_events_async( + config, move |dur: Duration| { let mut exit_receiver = exit_receiver.clone(); Box::pin(async move { @@ -2721,4 +3060,106 @@ mod tests { r1.unwrap().unwrap(); r2.unwrap() } + + #[test] + fn test_background_processor_config_builder() { + // Test that when a new channel is created, the ChannelManager needs to be re-persisted with + // updates. Also test that when new updates are available, the manager signals that it needs + // re-persistence and is successfully re-persisted. + let (persist_dir, nodes) = create_nodes(2, "test_background_processor_config_builder"); + + // Go through the channel creation process so that each node has something to persist. Since + // open_channel consumes events, it must complete before starting BackgroundProcessor to + // avoid a race with processing events. + let tx = open_channel!(nodes[0], nodes[1], 100000); + + // Initiate the background processors to watch each node. + let data_dir = nodes[0].kv_store.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir)); + let event_handler = |_: _| Ok(()); + let mut builder = BackgroundProcessorConfigBuilder::new( + persister, + event_handler, + nodes[0].chain_monitor.clone(), + nodes[0].node.clone(), + nodes[0].p2p_gossip_sync(), + nodes[0].peer_manager.clone(), + nodes[0].logger.clone(), + ); + builder + .with_onion_messenger(nodes[0].messenger.clone()) + .with_scorer(nodes[0].scorer.clone()); + let config = builder.build(); + + let bg_processor = BackgroundProcessor::from_config(config); + + macro_rules! check_persisted_data { + ($node: expr, $filepath: expr) => { + let mut expected_bytes = Vec::new(); + loop { + expected_bytes.clear(); + match $node.write(&mut expected_bytes) { + Ok(()) => match std::fs::read($filepath) { + Ok(bytes) => { + if bytes == expected_bytes { + break; + } else { + continue; + } + }, + Err(_) => continue, + }, + Err(e) => panic!("Unexpected error: {}", e), + } + } + }; + } + + // Check that the initial data is persisted as expected + let filepath = + get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string()); + check_persisted_data!(nodes[0].node, filepath.clone()); + + loop { + if !nodes[0].node.get_event_or_persist_condvar_value() { + break; + } + } + + // Force-close the channel. + let error_message = "Channel force-closed"; + nodes[0] + .node + .force_close_broadcasting_latest_txn( + &ChannelId::v1_from_funding_outpoint(OutPoint { + txid: tx.compute_txid(), + index: 0, + }), + &nodes[1].node.get_our_node_id(), + error_message.to_string(), + ) + .unwrap(); + + // Check that the force-close updates are persisted + check_persisted_data!(nodes[0].node, filepath.clone()); + loop { + if !nodes[0].node.get_event_or_persist_condvar_value() { + break; + } + } + + // Check network graph is persisted + let filepath = + get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string()); + check_persisted_data!(nodes[0].network_graph, filepath.clone()); + + // Check scorer is persisted + let filepath = + get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string()); + check_persisted_data!(nodes[0].scorer, filepath.clone()); + + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } + } }