Skip to content

Commit bc1b327

Browse files
joostjagerclaude
andcommitted
Add deferred ChainMonitor updates with batched persistence
Introduce a `DeferredChainMonitor` wrapper around `ChainMonitor` that queues `watch_channel` and `update_channel` operations, returning `InProgress` until `flush()` is called. This enables batched persistence of monitor updates after `ChannelManager` persistence, ensuring correct ordering where the `ChannelManager` state is never ahead of the monitor state on restart. Key changes: - `DeferredChainMonitor` queues monitor operations and returns `InProgress` - Calling `flush()` applies pending operations and persists monitors - All `ChainMonitor` traits (Listen, Confirm, EventsProvider, etc.) are passed through, allowing drop-in replacement - Background processor updated to capture pending count before `ChannelManager` persistence, then flush after persistence completes Includes comprehensive tests covering the full channel lifecycle with payment flows using `DeferredChainMonitor`. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 8cdc86a commit bc1b327

File tree

3 files changed

+1052
-9
lines changed

3 files changed

+1052
-9
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use fwd_batch::BatchDelay;
3232

3333
use lightning::chain;
3434
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35-
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
35+
use lightning::chain::chainmonitor::Persist;
36+
use lightning::chain::deferred::DeferredChainMonitor;
3637
#[cfg(feature = "std")]
3738
use lightning::events::EventHandler;
3839
#[cfg(feature = "std")]
@@ -101,7 +102,7 @@ use alloc::vec::Vec;
101102
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
102103
/// writing it to disk/backups by invoking the callback given to it at startup.
103104
/// [`ChannelManager`] persistence should be done in the background.
104-
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
105+
/// * Calling [`ChannelManager::timer_tick_occurred`], [`lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims`]
105106
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
106107
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
107108
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
@@ -853,7 +854,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
853854
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
854855
/// # fn disconnect_socket(&mut self) {}
855856
/// # }
856-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857+
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857858
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
858859
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
859860
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
@@ -964,7 +965,9 @@ pub async fn process_events_async<
964965
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
965966
EventHandler: Fn(Event) -> EventHandlerFuture,
966967
ES: Deref,
967-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
968+
M: Deref<
969+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
970+
>,
968971
CM: Deref,
969972
OM: Deref,
970973
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1152,6 +1155,11 @@ where
11521155

11531156
let mut futures = Joiner::new();
11541157

1158+
// Capture the number of pending monitor writes before persisting the channel manager.
1159+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1160+
// monitor updates that arrived after the manager state was captured.
1161+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1162+
11551163
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11561164
log_trace!(logger, "Persisting ChannelManager...");
11571165

@@ -1349,6 +1357,13 @@ where
13491357
res?;
13501358
}
13511359

1360+
// Flush the monitor writes that were pending before we persisted the channel manager.
1361+
// Any writes that arrived after are left in the queue for the next iteration.
1362+
if pending_monitor_writes > 0 {
1363+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1364+
chain_monitor.flush(pending_monitor_writes);
1365+
}
1366+
13521367
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531368
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541369
}) {
@@ -1413,6 +1428,14 @@ where
14131428
channel_manager.get_cm().encode(),
14141429
)
14151430
.await?;
1431+
1432+
// Flush all pending monitor writes after final channel manager persistence.
1433+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1434+
if pending_monitor_writes > 0 {
1435+
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
1436+
chain_monitor.flush(pending_monitor_writes);
1437+
}
1438+
14161439
if let Some(ref scorer) = scorer {
14171440
kv_store
14181441
.write(
@@ -1465,7 +1488,9 @@ pub async fn process_events_async_with_kv_store_sync<
14651488
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
14661489
EventHandler: Fn(Event) -> EventHandlerFuture,
14671490
ES: Deref,
1468-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1491+
M: Deref<
1492+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1493+
>,
14691494
CM: Deref,
14701495
OM: Deref,
14711496
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1580,7 +1605,15 @@ impl BackgroundProcessor {
15801605
ES: 'static + Deref + Send,
15811606
M: 'static
15821607
+ Deref<
1583-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1608+
Target = DeferredChainMonitor<
1609+
<CM::Target as AChannelManager>::Signer,
1610+
CF,
1611+
T,
1612+
F,
1613+
L,
1614+
P,
1615+
ES,
1616+
>,
15841617
>
15851618
+ Send
15861619
+ Sync,
@@ -1722,6 +1755,10 @@ impl BackgroundProcessor {
17221755
channel_manager.get_cm().timer_tick_occurred();
17231756
last_freshness_call = Instant::now();
17241757
}
1758+
1759+
// Capture the number of pending monitor writes before persisting the channel manager.
1760+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1761+
17251762
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17261763
log_trace!(logger, "Persisting ChannelManager...");
17271764
(kv_store.write(
@@ -1733,6 +1770,12 @@ impl BackgroundProcessor {
17331770
log_trace!(logger, "Done persisting ChannelManager.");
17341771
}
17351772

1773+
// Flush the monitor writes that were pending before we persisted the channel manager.
1774+
if pending_monitor_writes > 0 {
1775+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1776+
chain_monitor.flush(pending_monitor_writes);
1777+
}
1778+
17361779
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
17371780
log_trace!(logger, "Persisting LiquidityManager...");
17381781
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1853,6 +1896,18 @@ impl BackgroundProcessor {
18531896
CHANNEL_MANAGER_PERSISTENCE_KEY,
18541897
channel_manager.get_cm().encode(),
18551898
)?;
1899+
1900+
// Flush all pending monitor writes after final channel manager persistence.
1901+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1902+
if pending_monitor_writes > 0 {
1903+
log_trace!(
1904+
logger,
1905+
"Flushing {} monitor writes on shutdown",
1906+
pending_monitor_writes
1907+
);
1908+
chain_monitor.flush(pending_monitor_writes);
1909+
}
1910+
18561911
if let Some(ref scorer) = scorer {
18571912
kv_store.write(
18581913
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1936,7 +1991,7 @@ mod tests {
19361991
use core::sync::atomic::{AtomicBool, Ordering};
19371992
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19381993
use lightning::chain::transaction::OutPoint;
1939-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1994+
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
19401995
use lightning::events::{Event, PathFailure, ReplayEvent};
19411996
use lightning::ln::channelmanager;
19421997
use lightning::ln::channelmanager::{
@@ -2026,7 +2081,7 @@ mod tests {
20262081
Arc<test_utils::TestLogger>,
20272082
>;
20282083

2029-
type ChainMonitor = chainmonitor::ChainMonitor<
2084+
type ChainMonitor = deferred::DeferredChainMonitor<
20302085
InMemorySigner,
20312086
Arc<test_utils::TestChainSource>,
20322087
Arc<test_utils::TestBroadcaster>,
@@ -2454,7 +2509,7 @@ mod tests {
24542509
let now = Duration::from_secs(genesis_block.header.time as u64);
24552510
let keys_manager =
24562511
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2457-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2512+
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
24582513
Some(Arc::clone(&chain_source)),
24592514
Arc::clone(&tx_broadcaster),
24602515
Arc::clone(&logger),
@@ -2598,19 +2653,25 @@ mod tests {
25982653
tx.clone(),
25992654
)
26002655
.unwrap();
2656+
// Flush deferred monitor operations so messages aren't held back
2657+
$node_a.chain_monitor.flush_all();
26012658
let msg_a = get_event_msg!(
26022659
$node_a,
26032660
MessageSendEvent::SendFundingCreated,
26042661
$node_b.node.get_our_node_id()
26052662
);
26062663
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2664+
// Flush node_b's monitor so it releases the FundingSigned message
2665+
$node_b.chain_monitor.flush_all();
26072666
get_event!($node_b, Event::ChannelPending);
26082667
let msg_b = get_event_msg!(
26092668
$node_b,
26102669
MessageSendEvent::SendFundingSigned,
26112670
$node_a.node.get_our_node_id()
26122671
);
26132672
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2673+
// Flush node_a's monitor for the final update
2674+
$node_a.chain_monitor.flush_all();
26142675
get_event!($node_a, Event::ChannelPending);
26152676
tx
26162677
}};
@@ -3057,11 +3118,17 @@ mod tests {
30573118
.node
30583119
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30593120
.unwrap();
3121+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3122+
nodes[0].chain_monitor.flush_all();
30603123
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30613124
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3125+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3126+
nodes[1].chain_monitor.flush_all();
30623127
get_event!(nodes[1], Event::ChannelPending);
30633128
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30643129
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3130+
// Flush node_0's monitor for the funding_signed update
3131+
nodes[0].chain_monitor.flush_all();
30653132
channel_pending_recv
30663133
.recv_timeout(EVENT_DEADLINE)
30673134
.expect("ChannelPending not handled within deadline");
@@ -3122,6 +3189,8 @@ mod tests {
31223189
error_message.to_string(),
31233190
)
31243191
.unwrap();
3192+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3193+
nodes[0].chain_monitor.flush_all();
31253194
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31263195
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31273196

0 commit comments

Comments
 (0)