Skip to content

Commit 1071b3c

Browse files
joostjagerclaude
andcommitted
Add deferred ChainMonitor updates with batched persistence
Introduce a `DeferredChainMonitor` wrapper around `ChainMonitor` that queues `watch_channel` and `update_channel` operations, returning `InProgress` until `flush()` is called. This enables batched persistence of monitor updates after `ChannelManager` persistence, ensuring correct ordering where the `ChannelManager` state is never ahead of the monitor state on restart. Key changes: - `DeferredChainMonitor` queues monitor operations and returns `InProgress` - `flush(count, logger)` removes and applies operations from the front of the queue one at a time, logging errors and panicking on `UnrecoverableError` - All `ChainMonitor` traits (Listen, Confirm, EventsProvider, etc.) are passed through, allowing drop-in replacement - Background processor updated to capture pending count before `ChannelManager` persistence, then flush after persistence completes Includes comprehensive tests covering the full channel lifecycle with payment flows using `DeferredChainMonitor`. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 8679d8d commit 1071b3c

File tree

3 files changed

+1071
-9
lines changed

3 files changed

+1071
-9
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use fwd_batch::BatchDelay;
3232

3333
use lightning::chain;
3434
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35-
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
35+
use lightning::chain::chainmonitor::Persist;
36+
use lightning::chain::deferred::DeferredChainMonitor;
3637
#[cfg(feature = "std")]
3738
use lightning::events::EventHandler;
3839
#[cfg(feature = "std")]
@@ -101,7 +102,7 @@ use alloc::vec::Vec;
101102
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
102103
/// writing it to disk/backups by invoking the callback given to it at startup.
103104
/// [`ChannelManager`] persistence should be done in the background.
104-
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
105+
/// * Calling [`ChannelManager::timer_tick_occurred`], [`lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims`]
105106
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
106107
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
107108
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
@@ -821,7 +822,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
821822
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
822823
/// # fn disconnect_socket(&mut self) {}
823824
/// # }
824-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
825+
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
825826
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
826827
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
827828
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
@@ -932,7 +933,9 @@ pub async fn process_events_async<
932933
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
933934
EventHandler: Fn(Event) -> EventHandlerFuture,
934935
ES: EntropySource,
935-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
936+
M: Deref<
937+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
938+
>,
936939
CM: Deref,
937940
OM: Deref,
938941
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1112,6 +1115,11 @@ where
11121115

11131116
let mut futures = Joiner::new();
11141117

1118+
// Capture the number of pending monitor writes before persisting the channel manager.
1119+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1120+
// monitor updates that arrived after the manager state was captured.
1121+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1122+
11151123
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11161124
log_trace!(logger, "Persisting ChannelManager...");
11171125

@@ -1309,6 +1317,15 @@ where
13091317
res?;
13101318
}
13111319

1320+
// Flush the monitor writes that were pending before we persisted the channel manager.
1321+
// Any writes that arrived after are left in the queue for the next iteration. There's
1322+
// no need to "chase the tail" by processing new updates that arrive during flushing -
1323+
// they'll be handled in the next round.
1324+
if pending_monitor_writes > 0 {
1325+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1326+
chain_monitor.flush(pending_monitor_writes, &logger);
1327+
}
1328+
13121329
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13131330
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13141331
}) {
@@ -1373,6 +1390,14 @@ where
13731390
channel_manager.get_cm().encode(),
13741391
)
13751392
.await?;
1393+
1394+
// Flush all pending monitor writes after final channel manager persistence.
1395+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1396+
if pending_monitor_writes > 0 {
1397+
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
1398+
chain_monitor.flush(pending_monitor_writes, &logger);
1399+
}
1400+
13761401
if let Some(ref scorer) = scorer {
13771402
kv_store
13781403
.write(
@@ -1425,7 +1450,9 @@ pub async fn process_events_async_with_kv_store_sync<
14251450
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
14261451
EventHandler: Fn(Event) -> EventHandlerFuture,
14271452
ES: EntropySource,
1428-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1453+
M: Deref<
1454+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1455+
>,
14291456
CM: Deref,
14301457
OM: Deref,
14311458
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1533,7 +1560,15 @@ impl BackgroundProcessor {
15331560
ES: 'static + EntropySource + Send,
15341561
M: 'static
15351562
+ Deref<
1536-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1563+
Target = DeferredChainMonitor<
1564+
<CM::Target as AChannelManager>::Signer,
1565+
CF,
1566+
T,
1567+
F,
1568+
L,
1569+
P,
1570+
ES,
1571+
>,
15371572
>
15381573
+ Send
15391574
+ Sync,
@@ -1669,6 +1704,10 @@ impl BackgroundProcessor {
16691704
channel_manager.get_cm().timer_tick_occurred();
16701705
last_freshness_call = Instant::now();
16711706
}
1707+
1708+
// Capture the number of pending monitor writes before persisting the channel manager.
1709+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1710+
16721711
if channel_manager.get_cm().get_and_clear_needs_persistence() {
16731712
log_trace!(logger, "Persisting ChannelManager...");
16741713
(kv_store.write(
@@ -1680,6 +1719,14 @@ impl BackgroundProcessor {
16801719
log_trace!(logger, "Done persisting ChannelManager.");
16811720
}
16821721

1722+
// Flush the monitor writes that were pending before we persisted the channel manager.
1723+
// There's no need to "chase the tail" by processing new updates that arrive during
1724+
// flushing - they'll be handled in the next round.
1725+
if pending_monitor_writes > 0 {
1726+
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
1727+
chain_monitor.flush(pending_monitor_writes, &logger);
1728+
}
1729+
16831730
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
16841731
log_trace!(logger, "Persisting LiquidityManager...");
16851732
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1800,6 +1847,18 @@ impl BackgroundProcessor {
18001847
CHANNEL_MANAGER_PERSISTENCE_KEY,
18011848
channel_manager.get_cm().encode(),
18021849
)?;
1850+
1851+
// Flush all pending monitor writes after final channel manager persistence.
1852+
let pending_monitor_writes = chain_monitor.pending_operation_count();
1853+
if pending_monitor_writes > 0 {
1854+
log_trace!(
1855+
logger,
1856+
"Flushing {} monitor writes on shutdown",
1857+
pending_monitor_writes
1858+
);
1859+
chain_monitor.flush(pending_monitor_writes, &logger);
1860+
}
1861+
18031862
if let Some(ref scorer) = scorer {
18041863
kv_store.write(
18051864
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1883,7 +1942,7 @@ mod tests {
18831942
use core::sync::atomic::{AtomicBool, Ordering};
18841943
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
18851944
use lightning::chain::transaction::OutPoint;
1886-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1945+
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
18871946
use lightning::events::{Event, PathFailure, ReplayEvent};
18881947
use lightning::ln::channelmanager;
18891948
use lightning::ln::channelmanager::{
@@ -1993,7 +2052,7 @@ mod tests {
19932052
Arc<test_utils::TestLogger>,
19942053
>;
19952054

1996-
type ChainMonitor = chainmonitor::ChainMonitor<
2055+
type ChainMonitor = deferred::DeferredChainMonitor<
19972056
InMemorySigner,
19982057
Arc<test_utils::TestChainSource>,
19992058
Arc<test_utils::TestBroadcaster>,
@@ -2421,7 +2480,7 @@ mod tests {
24212480
let now = Duration::from_secs(genesis_block.header.time as u64);
24222481
let keys_manager =
24232482
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2424-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2483+
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
24252484
Some(Arc::clone(&chain_source)),
24262485
Arc::clone(&tx_broadcaster),
24272486
Arc::clone(&logger),
@@ -2565,19 +2624,31 @@ mod tests {
25652624
tx.clone(),
25662625
)
25672626
.unwrap();
2627+
// Flush deferred monitor operations so messages aren't held back
2628+
$node_a
2629+
.chain_monitor
2630+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25682631
let msg_a = get_event_msg!(
25692632
$node_a,
25702633
MessageSendEvent::SendFundingCreated,
25712634
$node_b.node.get_our_node_id()
25722635
);
25732636
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2637+
// Flush node_b's monitor so it releases the FundingSigned message
2638+
$node_b
2639+
.chain_monitor
2640+
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
25742641
get_event!($node_b, Event::ChannelPending);
25752642
let msg_b = get_event_msg!(
25762643
$node_b,
25772644
MessageSendEvent::SendFundingSigned,
25782645
$node_a.node.get_our_node_id()
25792646
);
25802647
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2648+
// Flush node_a's monitor for the final update
2649+
$node_a
2650+
.chain_monitor
2651+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25812652
get_event!($node_a, Event::ChannelPending);
25822653
tx
25832654
}};
@@ -3024,11 +3095,23 @@ mod tests {
30243095
.node
30253096
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30263097
.unwrap();
3098+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3099+
nodes[0]
3100+
.chain_monitor
3101+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
30273102
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30283103
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3104+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3105+
nodes[1]
3106+
.chain_monitor
3107+
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
30293108
get_event!(nodes[1], Event::ChannelPending);
30303109
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30313110
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3111+
// Flush node_0's monitor for the funding_signed update
3112+
nodes[0]
3113+
.chain_monitor
3114+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
30323115
channel_pending_recv
30333116
.recv_timeout(EVENT_DEADLINE)
30343117
.expect("ChannelPending not handled within deadline");
@@ -3089,6 +3172,10 @@ mod tests {
30893172
error_message.to_string(),
30903173
)
30913174
.unwrap();
3175+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3176+
nodes[0]
3177+
.chain_monitor
3178+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
30923179
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
30933180
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
30943181

0 commit comments

Comments
 (0)