@@ -72,7 +72,7 @@ use core::{cmp, mem};
7272use core:: cell:: RefCell ;
7373use crate :: io:: Read ;
7474use crate :: sync:: { Arc , Mutex , RwLock , RwLockReadGuard , FairRwLock , LockTestExt , LockHeldState } ;
75- use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
75+ use core:: sync:: atomic:: { AtomicUsize , AtomicBool , Ordering } ;
7676use core:: time:: Duration ;
7777use core:: ops:: Deref ;
7878
@@ -926,6 +926,8 @@ where
926926
927927 /// See `ChannelManager` struct-level documentation for lock order requirements.
928928 pending_events : Mutex < Vec < events:: Event > > ,
929+ /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
930+ pending_events_processor : AtomicBool ,
929931 /// See `ChannelManager` struct-level documentation for lock order requirements.
930932 pending_background_events : Mutex < Vec < BackgroundEvent > > ,
931933 /// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update {
16801682
16811683macro_rules! process_events_body {
16821684 ( $self: expr, $event_to_handle: expr, $handle_event: expr) => {
1683- // We'll acquire our total consistency lock until the returned future completes so that
1684- // we can be sure no other persists happen while processing events.
1685- let _read_guard = $self. total_consistency_lock. read( ) . unwrap( ) ;
1685+ let mut processed_all_events = false ;
1686+ while !processed_all_events {
1687+ if $self. pending_events_processor. compare_exchange( false , true , Ordering :: Acquire , Ordering :: Relaxed ) . is_err( ) {
1688+ return ;
1689+ }
16861690
1687- let mut result = NotifyOption :: SkipPersist ;
1691+ let mut result = NotifyOption :: SkipPersist ;
16881692
1689- // TODO: This behavior should be documented. It's unintuitive that we query
1690- // ChannelMonitors when clearing other events.
1691- if $self. process_pending_monitor_events( ) {
1692- result = NotifyOption :: DoPersist ;
1693- }
1693+ {
1694+ // We'll acquire our total consistency lock so that we can be sure no other
1695+ // persists happen while processing monitor events.
1696+ let _read_guard = $self. total_consistency_lock. read( ) . unwrap( ) ;
1697+
1698+ // TODO: This behavior should be documented. It's unintuitive that we query
1699+ // ChannelMonitors when clearing other events.
1700+ if $self. process_pending_monitor_events( ) {
1701+ result = NotifyOption :: DoPersist ;
1702+ }
1703+ }
16941704
1695- let pending_events = mem:: replace( & mut * $self. pending_events. lock( ) . unwrap( ) , vec![ ] ) ;
1696- if !pending_events. is_empty( ) {
1697- result = NotifyOption :: DoPersist ;
1698- }
1705+ let pending_events = $self. pending_events. lock( ) . unwrap( ) . clone( ) ;
1706+ let num_events = pending_events. len( ) ;
1707+ if !pending_events. is_empty( ) {
1708+ result = NotifyOption :: DoPersist ;
1709+ }
16991710
1700- for event in pending_events {
1701- $event_to_handle = event;
1702- $handle_event;
1703- }
1711+ for event in pending_events {
1712+ $event_to_handle = event;
1713+ $handle_event;
1714+ }
17041715
1705- if result == NotifyOption :: DoPersist {
1706- $self. persistence_notifier. notify( ) ;
1716+ {
1717+ let mut pending_events = $self. pending_events. lock( ) . unwrap( ) ;
1718+ pending_events. drain( ..num_events) ;
1719+ processed_all_events = pending_events. is_empty( ) ;
1720+ $self. pending_events_processor. store( false , Ordering :: Release ) ;
1721+ }
1722+
1723+ if result == NotifyOption :: DoPersist {
1724+ $self. persistence_notifier. notify( ) ;
1725+ }
17071726 }
17081727 }
17091728}
@@ -1771,6 +1790,7 @@ where
17711790 per_peer_state : FairRwLock :: new ( HashMap :: new ( ) ) ,
17721791
17731792 pending_events : Mutex :: new ( Vec :: new ( ) ) ,
1793+ pending_events_processor : AtomicBool :: new ( false ) ,
17741794 pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
17751795 total_consistency_lock : RwLock :: new ( ( ) ) ,
17761796 persistence_notifier : Notifier :: new ( ) ,
@@ -7916,6 +7936,7 @@ where
79167936 per_peer_state : FairRwLock :: new ( per_peer_state) ,
79177937
79187938 pending_events : Mutex :: new ( pending_events_read) ,
7939+ pending_events_processor : AtomicBool :: new ( false ) ,
79197940 pending_background_events : Mutex :: new ( pending_background_events) ,
79207941 total_consistency_lock : RwLock :: new ( ( ) ) ,
79217942 persistence_notifier : Notifier :: new ( ) ,
0 commit comments