Skip to content

Commit 5f96d13

Browse files
authored
Merge pull request #2199 from tnull/2023-04-fix-async-event-processing
Allow async events processing without holding `total_consistency_lock`
2 parents 02ae5cb + f2453b7 commit 5f96d13

File tree

4 files changed

+63
-37
lines changed

4 files changed

+63
-37
lines changed

ci/ci-tests.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ fi
8787

8888
echo -e "\n\nTest futures builds"
8989
pushd lightning-background-processor
90-
cargo test --verbose --color always --no-default-features --features futures
90+
cargo test --verbose --color always --features futures
9191
popd
9292

9393
if [ "$RUSTC_MINOR_VERSION" -gt 55 ]; then

lightning-background-processor/src/lib.rs

+20-13
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,8 @@ macro_rules! define_run_body {
350350
// falling back to our usual hourly prunes. This avoids short-lived clients never
351351
// pruning their network graph. We run once 60 seconds after startup before
352352
// continuing our normal cadence.
353-
if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
353+
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
354+
if $timer_elapsed(&mut last_prune_call, prune_timer) {
354355
// The network graph must not be pruned while rapid sync completion is pending
355356
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
356357
#[cfg(feature = "std")] {
@@ -368,7 +369,8 @@ macro_rules! define_run_body {
368369

369370
have_pruned = true;
370371
}
371-
last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
372+
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
373+
last_prune_call = $get_timer(prune_timer);
372374
}
373375

374376
if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
@@ -881,7 +883,10 @@ mod tests {
881883

882884
if key == "network_graph" {
883885
if let Some(sender) = &self.graph_persistence_notifier {
884-
sender.send(()).unwrap();
886+
match sender.send(()) {
887+
Ok(()) => {},
888+
Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
889+
}
885890
};
886891

887892
if let Some((error, message)) = self.graph_error {
@@ -1497,10 +1502,9 @@ mod tests {
14971502
})
14981503
}, false,
14991504
);
1500-
// TODO: Drop _local and simply spawn after #2003
1501-
let local_set = tokio::task::LocalSet::new();
1502-
local_set.spawn_local(bp_future);
1503-
local_set.spawn_local(async move {
1505+
1506+
let t1 = tokio::spawn(bp_future);
1507+
let t2 = tokio::spawn(async move {
15041508
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
15051509
let mut i = 0;
15061510
loop {
@@ -1512,7 +1516,9 @@ mod tests {
15121516
}, tokio::time::sleep(Duration::from_millis(1)).await);
15131517
exit_sender.send(()).unwrap();
15141518
});
1515-
local_set.await;
1519+
let (r1, r2) = tokio::join!(t1, t2);
1520+
r1.unwrap().unwrap();
1521+
r2.unwrap()
15161522
}
15171523

15181524
macro_rules! do_test_payment_path_scoring {
@@ -1666,13 +1672,14 @@ mod tests {
16661672
})
16671673
}, false,
16681674
);
1669-
// TODO: Drop _local and simply spawn after #2003
1670-
let local_set = tokio::task::LocalSet::new();
1671-
local_set.spawn_local(bp_future);
1672-
local_set.spawn_local(async move {
1675+
let t1 = tokio::spawn(bp_future);
1676+
let t2 = tokio::spawn(async move {
16731677
do_test_payment_path_scoring!(nodes, receiver.recv().await);
16741678
exit_sender.send(()).unwrap();
16751679
});
1676-
local_set.await;
1680+
1681+
let (r1, r2) = tokio::join!(t1, t2);
1682+
r1.unwrap().unwrap();
1683+
r2.unwrap()
16771684
}
16781685
}

lightning-net-tokio/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl Connection {
189189
// our timeslice to another task we may just spin on this peer, starving other peers
190190
// and eventually disconnecting them for ping timeouts. Instead, we explicitly yield
191191
// here.
192-
tokio::task::yield_now().await;
192+
let _ = tokio::task::yield_now().await;
193193
};
194194
let writer_option = us.lock().unwrap().writer.take();
195195
if let Some(mut writer) = writer_option {

lightning/src/ln/channelmanager.rs

+41-22
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use core::{cmp, mem};
7272
use core::cell::RefCell;
7373
use crate::io::Read;
7474
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
75-
use core::sync::atomic::{AtomicUsize, Ordering};
75+
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
7676
use core::time::Duration;
7777
use core::ops::Deref;
7878

@@ -934,6 +934,8 @@ where
934934

935935
/// See `ChannelManager` struct-level documentation for lock order requirements.
936936
pending_events: Mutex<Vec<events::Event>>,
937+
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
938+
pending_events_processor: AtomicBool,
937939
/// See `ChannelManager` struct-level documentation for lock order requirements.
938940
pending_background_events: Mutex<Vec<BackgroundEvent>>,
939941
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1696,30 +1698,47 @@ macro_rules! handle_new_monitor_update {
16961698

16971699
macro_rules! process_events_body {
16981700
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
1699-
// We'll acquire our total consistency lock until the returned future completes so that
1700-
// we can be sure no other persists happen while processing events.
1701-
let _read_guard = $self.total_consistency_lock.read().unwrap();
1701+
let mut processed_all_events = false;
1702+
while !processed_all_events {
1703+
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1704+
return;
1705+
}
17021706

1703-
let mut result = NotifyOption::SkipPersist;
1707+
let mut result = NotifyOption::SkipPersist;
17041708

1705-
// TODO: This behavior should be documented. It's unintuitive that we query
1706-
// ChannelMonitors when clearing other events.
1707-
if $self.process_pending_monitor_events() {
1708-
result = NotifyOption::DoPersist;
1709-
}
1709+
{
1710+
// We'll acquire our total consistency lock so that we can be sure no other
1711+
// persists happen while processing monitor events.
1712+
let _read_guard = $self.total_consistency_lock.read().unwrap();
1713+
1714+
// TODO: This behavior should be documented. It's unintuitive that we query
1715+
// ChannelMonitors when clearing other events.
1716+
if $self.process_pending_monitor_events() {
1717+
result = NotifyOption::DoPersist;
1718+
}
1719+
}
17101720

1711-
let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
1712-
if !pending_events.is_empty() {
1713-
result = NotifyOption::DoPersist;
1714-
}
1721+
let pending_events = $self.pending_events.lock().unwrap().clone();
1722+
let num_events = pending_events.len();
1723+
if !pending_events.is_empty() {
1724+
result = NotifyOption::DoPersist;
1725+
}
17151726

1716-
for event in pending_events {
1717-
$event_to_handle = event;
1718-
$handle_event;
1719-
}
1727+
for event in pending_events {
1728+
$event_to_handle = event;
1729+
$handle_event;
1730+
}
17201731

1721-
if result == NotifyOption::DoPersist {
1722-
$self.persistence_notifier.notify();
1732+
{
1733+
let mut pending_events = $self.pending_events.lock().unwrap();
1734+
pending_events.drain(..num_events);
1735+
processed_all_events = pending_events.is_empty();
1736+
$self.pending_events_processor.store(false, Ordering::Release);
1737+
}
1738+
1739+
if result == NotifyOption::DoPersist {
1740+
$self.persistence_notifier.notify();
1741+
}
17231742
}
17241743
}
17251744
}
@@ -1787,6 +1806,7 @@ where
17871806
per_peer_state: FairRwLock::new(HashMap::new()),
17881807

17891808
pending_events: Mutex::new(Vec::new()),
1809+
pending_events_processor: AtomicBool::new(false),
17901810
pending_background_events: Mutex::new(Vec::new()),
17911811
total_consistency_lock: RwLock::new(()),
17921812
persistence_notifier: Notifier::new(),
@@ -8026,6 +8046,7 @@ where
80268046
per_peer_state: FairRwLock::new(per_peer_state),
80278047

80288048
pending_events: Mutex::new(pending_events_read),
8049+
pending_events_processor: AtomicBool::new(false),
80298050
pending_background_events: Mutex::new(pending_background_events),
80308051
total_consistency_lock: RwLock::new(()),
80318052
persistence_notifier: Notifier::new(),
@@ -8057,8 +8078,6 @@ mod tests {
80578078
use bitcoin::hashes::Hash;
80588079
use bitcoin::hashes::sha256::Hash as Sha256;
80598080
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
8060-
#[cfg(feature = "std")]
8061-
use core::time::Duration;
80628081
use core::sync::atomic::Ordering;
80638082
use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
80648083
use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret};

0 commit comments

Comments
 (0)