Skip to content

Commit 4df4699

Browse files
committed
Allow events processing without holding total_consistency_lock
Unfortunately, the RAII types used by `RwLock` are not `Send`, which is why they can't be held over `await` boundaries. In order to allow asynchronous events processing in multi-threaded environments, we here allow to process events without holding the `total_consistency_lock`.
1 parent 42bda27 commit 4df4699

File tree

2 files changed

+28
-16
lines changed

2 files changed

+28
-16
lines changed

lightning-background-processor/src/lib.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -1480,10 +1480,9 @@ mod tests {
14801480
})
14811481
}, false,
14821482
);
1483-
// TODO: Drop _local and simply spawn after #2003
1484-
let local_set = tokio::task::LocalSet::new();
1485-
local_set.spawn_local(bp_future);
1486-
local_set.spawn_local(async move {
1483+
1484+
let t1 = tokio::spawn(bp_future);
1485+
let t2 = tokio::spawn(async move {
14871486
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
14881487
let mut i = 0;
14891488
loop {
@@ -1495,7 +1494,9 @@ mod tests {
14951494
}, tokio::time::sleep(Duration::from_millis(1)).await);
14961495
exit_sender.send(()).unwrap();
14971496
});
1498-
local_set.await;
1497+
let (r1, r2) = tokio::join!(t1, t2);
1498+
r1.unwrap().unwrap();
1499+
r2.unwrap()
14991500
}
15001501

15011502
macro_rules! do_test_payment_path_scoring {
@@ -1649,13 +1650,14 @@ mod tests {
16491650
})
16501651
}, false,
16511652
);
1652-
// TODO: Drop _local and simply spawn after #2003
1653-
let local_set = tokio::task::LocalSet::new();
1654-
local_set.spawn_local(bp_future);
1655-
local_set.spawn_local(async move {
1653+
let t1 = tokio::spawn(bp_future);
1654+
let t2 = tokio::spawn(async move {
16561655
do_test_payment_path_scoring!(nodes, receiver.recv().await);
16571656
exit_sender.send(()).unwrap();
16581657
});
1659-
local_set.await;
1658+
1659+
let (r1, r2) = tokio::join!(t1, t2);
1660+
r1.unwrap().unwrap();
1661+
r2.unwrap()
16601662
}
16611663
}

lightning/src/ln/channelmanager.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use core::{cmp, mem};
7272
use core::cell::RefCell;
7373
use crate::io::Read;
7474
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
75-
use core::sync::atomic::{AtomicUsize, Ordering};
75+
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
7676
use core::time::Duration;
7777
use core::ops::Deref;
7878

@@ -926,6 +926,8 @@ where
926926

927927
/// See `ChannelManager` struct-level documentation for lock order requirements.
928928
pending_events: Mutex<Vec<events::Event>>,
929+
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
930+
pending_events_processor: AtomicBool,
929931
/// See `ChannelManager` struct-level documentation for lock order requirements.
930932
pending_background_events: Mutex<Vec<BackgroundEvent>>,
931933
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1741,6 +1743,7 @@ where
17411743
per_peer_state: FairRwLock::new(HashMap::new()),
17421744

17431745
pending_events: Mutex::new(Vec::new()),
1746+
pending_events_processor: AtomicBool::new(false),
17441747
pending_background_events: Mutex::new(Vec::new()),
17451748
total_consistency_lock: RwLock::new(()),
17461749
persistence_notifier: Notifier::new(),
@@ -5282,7 +5285,8 @@ where
52825285

52835286
/// Process pending events from the [`chain::Watch`], returning whether any events were processed.
52845287
fn process_pending_monitor_events(&self) -> bool {
5285-
debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
5288+
debug_assert!(self.total_consistency_lock.try_write().is_err() ||
5289+
self.pending_events_processor.load(Ordering::Relaxed)); // Caller holds read lock or processes events asynchronously.
52865290

52875291
let mut failed_channels = Vec::new();
52885292
let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
@@ -5775,9 +5779,9 @@ where
57755779
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
57765780
&self, handler: H
57775781
) {
5778-
// We'll acquire our total consistency lock until the returned future completes so that
5779-
// we can be sure no other persists happen while processing events.
5780-
let _read_guard = self.total_consistency_lock.read().unwrap();
5782+
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
5783+
return;
5784+
}
57815785

57825786
let mut result = NotifyOption::SkipPersist;
57835787

@@ -5787,7 +5791,8 @@ where
57875791
result = NotifyOption::DoPersist;
57885792
}
57895793

5790-
let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
5794+
let pending_events = self.pending_events.lock().unwrap().clone();
5795+
let num_events = pending_events.len();
57915796
if !pending_events.is_empty() {
57925797
result = NotifyOption::DoPersist;
57935798
}
@@ -5796,9 +5801,13 @@ where
57965801
handler(event).await;
57975802
}
57985803

5804+
self.pending_events.lock().unwrap().drain(..num_events);
5805+
57995806
if result == NotifyOption::DoPersist {
58005807
self.persistence_notifier.notify();
58015808
}
5809+
5810+
self.pending_events_processor.store(false, Ordering::Release);
58025811
}
58035812
}
58045813

@@ -7926,6 +7935,7 @@ where
79267935
per_peer_state: FairRwLock::new(per_peer_state),
79277936

79287937
pending_events: Mutex::new(pending_events_read),
7938+
pending_events_processor: AtomicBool::new(false),
79297939
pending_background_events: Mutex::new(pending_background_events),
79307940
total_consistency_lock: RwLock::new(()),
79317941
persistence_notifier: Notifier::new(),

0 commit comments

Comments
 (0)