diff --git a/lightning-background-processor/Cargo.toml b/lightning-background-processor/Cargo.toml index fa89b078de5..301e58ed161 100644 --- a/lightning-background-processor/Cargo.toml +++ b/lightning-background-processor/Cargo.toml @@ -25,6 +25,7 @@ bitcoin-io = { version = "0.1.2", default-features = false } lightning = { version = "0.2.0", path = "../lightning", default-features = false } lightning-rapid-gossip-sync = { version = "0.2.0", path = "../lightning-rapid-gossip-sync", default-features = false } lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false } +futures = "0.3.31" [dev-dependencies] tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 239ff3f0c98..7b425124afc 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -43,6 +43,7 @@ use lightning::util::persist::{KVStore, Persister}; use lightning::util::sweep::OutputSweeper; #[cfg(feature = "std")] use lightning::util::sweep::OutputSweeperSync; +use lightning::util::wakers::Sleep; #[cfg(feature = "std")] use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; @@ -310,198 +311,6 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri true } -macro_rules! define_run_body { - ( - $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $onion_messenger: ident, $process_onion_message_handler_events: expr, - $peer_manager: ident, $gossip_sync: ident, - $process_sweeper: expr, - $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, - ) => { { - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); - $channel_manager.get_cm().timer_tick_occurred(); - log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); - $chain_monitor.rebroadcast_pending_claims(); - - let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); - let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); - let mut last_ping_call = $get_timer(PING_TIMER); - let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); - let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); - let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); - let mut last_sweeper_call = $get_timer(SWEEPER_TIMER); - let mut have_pruned = false; - let mut have_decayed_scorer = false; - - loop { - $process_channel_manager_events; - $process_chain_monitor_events; - $process_onion_message_handler_events; - - // Note that the PeerManager::process_events may block on ChannelManager's locks, - // hence it comes last here. When the ChannelManager finishes whatever it's doing, - // we want to ensure we get into `persist_manager` as quickly as we can, especially - // without running the normal event processing above and handing events to users. - // - // Specifically, on an *extremely* slow machine, we may see ChannelManager start - // processing a message effectively at any point during this loop. In order to - // minimize the time between such processing completing and persisting the updated - // ChannelManager, we want to minimize methods blocking on a ChannelManager - // generally, and as a fallback place such blocking only immediately before - // persistence. - $peer_manager.as_ref().process_events(); - - // Exit the loop if the background processor was requested to stop. - if $loop_exit_check { - log_trace!($logger, "Terminating background processor."); - break; - } - - // We wait up to 100ms, but track how long it takes to detect being put to sleep, - // see `await_start`'s use below. - let mut await_start = None; - if $check_slow_await { await_start = Some($get_timer(1)); } - $await; - let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; - - // Exit the loop if the background processor was requested to stop. - if $loop_exit_check { - log_trace!($logger, "Terminating background processor."); - break; - } - - if $channel_manager.get_cm().get_and_clear_needs_persistence() { - log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&$channel_manager)?; - log_trace!($logger, "Done persisting ChannelManager."); - } - if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); - $channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = $get_timer(FRESHNESS_TIMER); - } - if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { - if let Some(om) = &$onion_messenger { - log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); - om.get_om().timer_tick_occurred(); - } - last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); - } - if await_slow { - // On various platforms, we may be starved of CPU cycles for several reasons. - // E.g. on iOS, if we've been in the background, we will be entirely paused. - // Similarly, if we're on a desktop platform and the device has been asleep, we - // may not get any cycles. - // We detect this by checking if our max-100ms-sleep, above, ran longer than a - // full second, at which point we assume sockets may have been killed (they - // appear to be at least on some platforms, even if it has only been a second). - // Note that we have to take care to not get here just because user event - // processing was slow at the top of the loop. For example, the sample client - // may call Bitcoin Core RPCs during event handling, which very often takes - // more than a handful of seconds to complete, and shouldn't disconnect all our - // peers. - log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); - $peer_manager.as_ref().disconnect_all_peers(); - last_ping_call = $get_timer(PING_TIMER); - } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) { - log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); - $peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = $get_timer(PING_TIMER); - } - - // Note that we want to run a graph prune once not long after startup before - // falling back to our usual hourly prunes. This avoids short-lived clients never - // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. For RGS, since 60 seconds is likely too long, - // we prune after an initial sync completes. - let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer); - let should_prune = match $gossip_sync { - GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, - _ => prune_timer_elapsed, - }; - if should_prune { - // The network graph must not be pruned while rapid sync completion is pending - if let Some(network_graph) = $gossip_sync.prunable_network_graph() { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Pruning and persisting network graph."); - network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs()); - } else { - log_warn!($logger, - "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually." - ); - log_trace!($logger, "Persisting network graph."); - } - - if let Err(e) = $persister.persist_graph(network_graph) { - log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) - } - - have_pruned = true; - } - let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - last_prune_call = $get_timer(prune_timer); - } - - if !have_decayed_scorer { - if let Some(ref scorer) = $scorer { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Calling time_passed on scorer at startup"); - scorer.write_lock().time_passed(duration_since_epoch); - } - } - have_decayed_scorer = true; - } - - if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { - if let Some(ref scorer) = $scorer { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Calling time_passed and persisting scorer"); - scorer.write_lock().time_passed(duration_since_epoch); - } else { - log_trace!($logger, "Persisting scorer"); - } - if let Err(e) = $persister.persist_scorer(&scorer) { - log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - } - } - last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); - } - - if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) { - log_trace!($logger, "Rebroadcasting monitor's pending claims"); - $chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); - } - - if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) { - log_trace!($logger, "Regenerating sweeper spends if necessary"); - $process_sweeper; - last_sweeper_call = $get_timer(SWEEPER_TIMER); - } - } - - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&$channel_manager)?; - - // Persist Scorer on exit - if let Some(ref scorer) = $scorer { - $persister.persist_scorer(&scorer)?; - } - - // Persist NetworkGraph on exit - if let Some(network_graph) = $gossip_sync.network_graph() { - $persister.persist_graph(network_graph)?; - } - - Ok(()) - } } -} - pub(crate) mod futures_util { use core::future::Future; use core::marker::Unpin; @@ -851,26 +660,35 @@ where event_handler(event).await }) }; - define_run_body!( - persister, - chain_monitor, - chain_monitor.process_pending_events_async(async_event_handler).await, - channel_manager, - channel_manager.get_cm().process_pending_events_async(async_event_handler).await, - onion_messenger, - if let Some(om) = &onion_messenger { + + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); + channel_manager.get_cm().timer_tick_occurred(); + log_trace!(logger, "Rebroadcasting monitor's pending claims on startup"); + chain_monitor.rebroadcast_pending_claims(); + let mut last_freshness_call = (|_| Instant::now())(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = (|_| Instant::now())(ONION_MESSAGE_HANDLER_TIMER); + let mut last_ping_call = (|_| Instant::now())(PING_TIMER); + let mut last_prune_call = (|_| Instant::now())(FIRST_NETWORK_PRUNE_TIMER); + let mut last_scorer_persist_call = (|_| Instant::now())(SCORER_PERSIST_TIMER); + let mut last_rebroadcast_call = (|_| Instant::now())(REBROADCAST_TIMER); + let mut last_sweeper_call = (|_| Instant::now())(SWEEPER_TIMER); + let mut have_pruned = false; + let mut have_decayed_scorer = false; + loop { + (channel_manager.get_cm().process_pending_events_async(async_event_handler).await); + (chain_monitor.process_pending_events_async(async_event_handler).await); + (if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await - }, - peer_manager, - gossip_sync, - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; - } - }, - logger, - scorer, - should_break, + }); + peer_manager.as_ref().process_events(); + if should_break { + log_trace!(logger, "Terminating background processor."); + break; + } + let mut await_start = None; + if mobile_interruptable_platform { + await_start = Some((|_| Instant::now())(1)); + } { let om_fut = if let Some(om) = onion_messenger.as_ref() { let fut = om.get_om().get_update_future(); @@ -901,22 +719,140 @@ where should_break = exit; }, } - }, - |t| sleeper(Duration::from_secs(t)), - |fut: &mut SleepFuture, _| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, + }; + let await_slow = if mobile_interruptable_platform { + (|time: &Instant, dur| time.elapsed().as_secs() > dur)(&mut await_start.unwrap(), 1) + } else { + false + }; + if should_break { + log_trace!(logger, "Terminating background processor."); + break; + } + if channel_manager.get_cm().get_and_clear_needs_persistence() { + log_trace!(logger, "Persisting ChannelManager..."); + persister.persist_manager(&channel_manager)?; + log_trace!(logger, "Done persisting ChannelManager."); + } + if (|time: &Instant, dur| time.elapsed().as_secs() > dur)( + &mut last_freshness_call, + FRESHNESS_TIMER, + ) { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = (|_| Instant::now())(FRESHNESS_TIMER); + } + if (|time: &Instant, dur| time.elapsed().as_secs() > dur)( + &mut last_onion_message_handler_call, + ONION_MESSAGE_HANDLER_TIMER, + ) { + if let Some(om) = &onion_messenger { + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); } - }, - mobile_interruptable_platform, - fetch_time, - ) + last_onion_message_handler_call = (|_| Instant::now())(ONION_MESSAGE_HANDLER_TIMER); + } + if await_slow { + log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); + peer_manager.as_ref().disconnect_all_peers(); + last_ping_call = (|_| Instant::now())(PING_TIMER); + } else if (|time: &Instant, dur| time.elapsed().as_secs() > dur)( + &mut last_ping_call, + PING_TIMER, + ) { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = (|_| Instant::now())(PING_TIMER); + } + let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + let prune_timer_elapsed = (|time: &Instant, dur| time.elapsed().as_secs() > dur)( + &mut last_prune_call, + prune_timer, + ); + let should_prune = match gossip_sync { + GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, + _ => prune_timer_elapsed, + }; + if should_prune { + if let Some(network_graph) = gossip_sync.prunable_network_graph() { + if let Some(duration_since_epoch) = fetch_time() { + log_trace!(logger, "Pruning and persisting network graph."); + network_graph.remove_stale_channels_and_tracking_with_time( + duration_since_epoch.as_secs(), + ); + } else { + log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); + log_trace!(logger, "Persisting network graph."); + } + if let Err(e) = persister.persist_graph(network_graph) { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + have_pruned = true; + } + let prune_timer = + if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + last_prune_call = (|_| Instant::now())(prune_timer); + } + if !have_decayed_scorer { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = fetch_time() { + log_trace!(logger, "Calling time_passed on scorer at startup"); + scorer.write_lock().time_passed(duration_since_epoch); + } + } + have_decayed_scorer = true; + } + if (|time: &Instant, dur| time.elapsed().as_secs() > dur)( + &mut last_scorer_persist_call, + SCORER_PERSIST_TIMER, + ) { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = fetch_time() { + log_trace!(logger, "Calling time_passed and persisting scorer"); + scorer.write_lock().time_passed(duration_since_epoch); + } else { + log_trace!(logger, "Persisting scorer"); + } + if let Err(e) = persister.persist_scorer(&scorer) { + log_error!( + logger, + "Error: Failed to persist scorer, check your disk and permissions {}", + e + ); + } + } + last_scorer_persist_call = (|_| Instant::now())(SCORER_PERSIST_TIMER); + } + if (|time: &Instant, dur| time.elapsed().as_secs() > dur)( + &mut last_rebroadcast_call, + REBROADCAST_TIMER, + ) { + log_trace!(logger, "Rebroadcasting monitor's pending claims"); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = (|_| Instant::now())(REBROADCAST_TIMER); + } + if (|time: &Instant, dur| time.elapsed().as_secs() > dur)( + &mut last_sweeper_call, + SWEEPER_TIMER, + ) { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + { + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + } + }; + last_sweeper_call = (|_| Instant::now())(SWEEPER_TIMER); + } + } + + persister.persist_manager(&channel_manager)?; + if let Some(ref scorer) = scorer { + persister.persist_scorer(&scorer)?; + } + if let Some(network_graph) = gossip_sync.network_graph() { + persister.persist_graph(network_graph)?; + } + Ok(()) } #[cfg(feature = "std")] @@ -991,7 +927,7 @@ impl BackgroundProcessor { D: 'static + Deref, O: 'static + Deref, K: 'static + Deref, - OS: 'static + Deref> + Send, + OS: 'static + Deref> + Send, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, @@ -1009,79 +945,33 @@ impl BackgroundProcessor { OM::Target: AOnionMessenger, PM::Target: APeerManager, LM::Target: ALiquidityManager, - D::Target: ChangeDestinationSourceSync, + D::Target: ChangeDestinationSource, O::Target: 'static + OutputSpender, K::Target: 'static + KVStore, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - let event_handler = |event| { - let network_graph = gossip_sync.network_graph(); - if let Some(network_graph) = network_graph { - handle_network_graph_update(network_graph, &event) - } - if let Some(ref scorer) = scorer { - use std::time::SystemTime; - let duration_since_epoch = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time should be sometime after 1970"); - if update_scorer(scorer, &event, duration_since_epoch) { - log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&scorer) { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - } - } - } - event_handler.handle_event(event) - }; - define_run_body!( + let fut = process_events_async( persister, + |event| async { event_handler.handle_event(event) }, chain_monitor, - chain_monitor.process_pending_events(&event_handler), channel_manager, - channel_manager.get_cm().process_pending_events(&event_handler), onion_messenger, - if let Some(om) = &onion_messenger { - om.get_om().process_pending_events(&event_handler) - }, - peer_manager, gossip_sync, - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); - } - }, + peer_manager, + liquidity_manager, + sweeper, logger, scorer, - stop_thread.load(Ordering::Acquire), - { - let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { - (Some(om), Some(lm)) => Sleeper::from_four_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (Some(om), None) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - ), - (None, Some(lm)) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (None, None) => Sleeper::from_two_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - ), - }; - sleeper.wait_timeout(Duration::from_millis(100)); + move |dur: Duration| { + let stop_thread_clone = stop_thread.clone(); + + Box::pin(async move { + Sleep::new(dur).await; + stop_thread_clone.load(Ordering::Acquire) + }) }, - |_| Instant::now(), - |time: &Instant, dur| time.elapsed().as_secs() > dur, false, || { use std::time::SystemTime; @@ -1091,7 +981,10 @@ impl BackgroundProcessor { .expect("Time should be sometime after 1970"), ) }, - ) + ); + + // TODO: Implement simple executor in utils. + futures::executor::block_on(fut).map_err(Into::into) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -1935,7 +1828,7 @@ mod tests { nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2030,7 +1923,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2074,7 +1967,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2145,7 +2038,7 @@ mod tests { nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2176,7 +2069,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2224,7 +2117,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2288,7 +2181,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2453,7 +2346,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2484,7 +2377,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2581,7 +2474,7 @@ mod tests { nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); @@ -2778,7 +2671,7 @@ mod tests { nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), Some(Arc::clone(&nodes[0].liquidity_manager)), - Some(nodes[0].sweeper.clone()), + Some(nodes[0].sweeper.sweeper_async().clone()), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), ); diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 2c2a13a7d5a..d933d848257 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -28,6 +28,10 @@ use std::time::Duration; use core::future::Future as StdFuture; use core::pin::Pin; use core::task::{Context, Poll}; +use std::{ + sync::atomic::{AtomicBool, Ordering}, + thread, +}; /// Used to signal to one of many waiters that the condition they're waiting on has happened. /// @@ -340,6 +344,47 @@ impl Sleeper { } } +pub struct Sleep { + is_done: Arc, + waker: Arc>>, +} + +impl Sleep { + pub fn new(duration: Duration) -> Self { + let is_done = Arc::new(AtomicBool::new(false)); + let waker: Arc>> = Arc::new(Mutex::new(None)); + + let is_done_clone = is_done.clone(); + let waker_clone = waker.clone(); + + thread::spawn(move || { + thread::sleep(duration); + is_done_clone.store(true, Ordering::SeqCst); + + if let Some(w) = waker_clone.lock().unwrap().take() { + w.wake(); + } + }); + + Self { is_done, waker } + } +} + +impl core::future::Future for Sleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.is_done.load(Ordering::SeqCst) { + Poll::Ready(()) + } else { + let mut waker_lock = self.waker.lock().unwrap(); + // Store latest waker in case the task is moved or re-polled + *waker_lock = Some(cx.waker().clone()); + Poll::Pending + } + } +} + #[cfg(test)] mod tests { use super::*;