Skip to content

Commit 02ae5cb

Browse files
authored
Merge pull request #2208 from wpaulino/monitor-rebroadcast-pending-claims
Implement pending claim rebroadcast on force-closed channels
2 parents 7a07049 + 453b3a1 commit 02ae5cb

File tree

8 files changed

+358
-41
lines changed

8 files changed

+358
-41
lines changed

lightning-background-processor/src/lib.rs

+26-9
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ use alloc::vec::Vec;
6464
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
6565
/// writing it to disk/backups by invoking the callback given to it at startup.
6666
/// [`ChannelManager`] persistence should be done in the background.
67-
/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
68-
/// at the appropriate intervals.
67+
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
68+
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
6969
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
7070
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
7171
///
@@ -116,12 +116,17 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
116116
#[cfg(test)]
117117
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
118118

119+
#[cfg(not(test))]
120+
const REBROADCAST_TIMER: u64 = 30;
121+
#[cfg(test)]
122+
const REBROADCAST_TIMER: u64 = 1;
123+
119124
#[cfg(feature = "futures")]
120125
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
121126
const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
122127
#[cfg(feature = "futures")]
123128
const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
124-
min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER));
129+
min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
125130

126131
/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
127132
pub enum GossipSync<
@@ -270,11 +275,14 @@ macro_rules! define_run_body {
270275
=> { {
271276
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
272277
$channel_manager.timer_tick_occurred();
278+
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
279+
$chain_monitor.rebroadcast_pending_claims();
273280

274281
let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
275282
let mut last_ping_call = $get_timer(PING_TIMER);
276283
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
277284
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
285+
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
278286
let mut have_pruned = false;
279287

280288
loop {
@@ -372,6 +380,12 @@ macro_rules! define_run_body {
372380
}
373381
last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
374382
}
383+
384+
if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
385+
log_trace!($logger, "Rebroadcasting monitor's pending claims");
386+
$chain_monitor.rebroadcast_pending_claims();
387+
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
388+
}
375389
}
376390

377391
// After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -1189,19 +1203,22 @@ mod tests {
11891203

11901204
#[test]
11911205
fn test_timer_tick_called() {
1192-
// Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1193-
// `FRESHNESS_TIMER`.
1206+
// Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1207+
// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
1208+
// `PeerManager::timer_tick_occurred` every `PING_TIMER`.
11941209
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
11951210
let data_dir = nodes[0].persister.get_data_dir();
11961211
let persister = Arc::new(Persister::new(data_dir));
11971212
let event_handler = |_: _| {};
11981213
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
11991214
loop {
12001215
let log_entries = nodes[0].logger.lines.lock().unwrap();
1201-
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1202-
let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1203-
if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1204-
log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1216+
let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1217+
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1218+
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1219+
if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
1220+
log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
1221+
log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
12051222
break
12061223
}
12071224
}

lightning/src/chain/chainmonitor.rs

+21
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,15 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> Deref for LockedChannelMonitor<
217217
/// or used independently to monitor channels remotely. See the [module-level documentation] for
218218
/// details.
219219
///
220+
/// Note that `ChainMonitor` should regularly trigger rebroadcasts/fee bumps of pending claims from
221+
/// a force-closed channel. This is crucial in preventing certain classes of pinning attacks,
222+
/// detecting substantial mempool feerate changes between blocks, and ensuring reliability if
223+
/// broadcasting fails. We recommend invoking this every 30 seconds, or lower if running in an
224+
/// environment with spotty connections, like on mobile.
225+
///
220226
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
221227
/// [module-level documentation]: crate::chain::chainmonitor
228+
/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
222229
pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
223230
where C::Target: chain::Filter,
224231
T::Target: BroadcasterInterface,
@@ -533,6 +540,20 @@ where C::Target: chain::Filter,
533540
pub fn get_update_future(&self) -> Future {
534541
self.event_notifier.get_future()
535542
}
543+
544+
/// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
545+
/// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
546+
/// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
547+
/// invoking this every 30 seconds, or lower if running in an environment with spotty
548+
/// connections, like on mobile.
549+
pub fn rebroadcast_pending_claims(&self) {
550+
let monitors = self.monitors.read().unwrap();
551+
for (_, monitor_holder) in &*monitors {
552+
monitor_holder.monitor.rebroadcast_pending_claims(
553+
&*self.broadcaster, &*self.fee_estimator, &*self.logger
554+
)
555+
}
556+
}
536557
}
537558

538559
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

lightning/src/chain/channelmonitor.rs

+21
Original file line numberDiff line numberDiff line change
@@ -1467,6 +1467,27 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
14671467
pub fn current_best_block(&self) -> BestBlock {
14681468
self.inner.lock().unwrap().best_block.clone()
14691469
}
1470+
1471+
/// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
1472+
/// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
1473+
/// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
1474+
/// invoking this every 30 seconds, or lower if running in an environment with spotty
1475+
/// connections, like on mobile.
1476+
pub fn rebroadcast_pending_claims<B: Deref, F: Deref, L: Deref>(
1477+
&self, broadcaster: B, fee_estimator: F, logger: L,
1478+
)
1479+
where
1480+
B::Target: BroadcasterInterface,
1481+
F::Target: FeeEstimator,
1482+
L::Target: Logger,
1483+
{
1484+
let fee_estimator = LowerBoundedFeeEstimator::new(fee_estimator);
1485+
let mut inner = self.inner.lock().unwrap();
1486+
let current_height = inner.best_block.height;
1487+
inner.onchain_tx_handler.rebroadcast_pending_claims(
1488+
current_height, &broadcaster, &fee_estimator, &logger,
1489+
);
1490+
}
14701491
}
14711492

14721493
impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {

lightning/src/chain/onchaintx.rs

+75-10
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,59 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
481481
events.into_iter().map(|(_, event)| event).collect()
482482
}
483483

484+
/// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
485+
/// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
486+
/// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
487+
/// invoking this every 30 seconds, or lower if running in an environment with spotty
488+
/// connections, like on mobile.
489+
pub(crate) fn rebroadcast_pending_claims<B: Deref, F: Deref, L: Deref>(
490+
&mut self, current_height: u32, broadcaster: &B, fee_estimator: &LowerBoundedFeeEstimator<F>,
491+
logger: &L,
492+
)
493+
where
494+
B::Target: BroadcasterInterface,
495+
F::Target: FeeEstimator,
496+
L::Target: Logger,
497+
{
498+
let mut bump_requests = Vec::with_capacity(self.pending_claim_requests.len());
499+
for (package_id, request) in self.pending_claim_requests.iter() {
500+
let inputs = request.outpoints();
501+
log_info!(logger, "Triggering rebroadcast/fee-bump for request with inputs {:?}", inputs);
502+
bump_requests.push((*package_id, request.clone()));
503+
}
504+
for (package_id, request) in bump_requests {
505+
self.generate_claim(current_height, &request, false /* force_feerate_bump */, fee_estimator, logger)
506+
.map(|(_, new_feerate, claim)| {
507+
let mut bumped_feerate = false;
508+
if let Some(mut_request) = self.pending_claim_requests.get_mut(&package_id) {
509+
bumped_feerate = request.previous_feerate() > new_feerate;
510+
mut_request.set_feerate(new_feerate);
511+
}
512+
match claim {
513+
OnchainClaim::Tx(tx) => {
514+
let log_start = if bumped_feerate { "Broadcasting RBF-bumped" } else { "Rebroadcasting" };
515+
log_info!(logger, "{} onchain {}", log_start, log_tx!(tx));
516+
broadcaster.broadcast_transaction(&tx);
517+
},
518+
#[cfg(anchors)]
519+
OnchainClaim::Event(event) => {
520+
let log_start = if bumped_feerate { "Yielding fee-bumped" } else { "Replaying" };
521+
log_info!(logger, "{} onchain event to spend inputs {:?}", log_start,
522+
request.outpoints());
523+
#[cfg(debug_assertions)] {
524+
debug_assert!(request.requires_external_funding());
525+
let num_existing = self.pending_claim_events.iter()
526+
.filter(|entry| entry.0 == package_id).count();
527+
assert!(num_existing == 0 || num_existing == 1);
528+
}
529+
self.pending_claim_events.retain(|event| event.0 != package_id);
530+
self.pending_claim_events.push((package_id, event));
531+
}
532+
}
533+
});
534+
}
535+
}
536+
484537
/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty
485538
/// onchain) lays on the assumption of claim transactions getting confirmed before timelock
486539
/// expiration (CSV or CLTV following cases). In case of high-fee spikes, claim tx may get stuck
@@ -489,9 +542,13 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
489542
///
490543
/// Panics if there are signing errors, because signing operations in reaction to on-chain
491544
/// events are not expected to fail, and if they do, we may lose funds.
492-
fn generate_claim<F: Deref, L: Deref>(&mut self, cur_height: u32, cached_request: &PackageTemplate, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L) -> Option<(u32, u64, OnchainClaim)>
493-
where F::Target: FeeEstimator,
494-
L::Target: Logger,
545+
fn generate_claim<F: Deref, L: Deref>(
546+
&mut self, cur_height: u32, cached_request: &PackageTemplate, force_feerate_bump: bool,
547+
fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L,
548+
) -> Option<(u32, u64, OnchainClaim)>
549+
where
550+
F::Target: FeeEstimator,
551+
L::Target: Logger,
495552
{
496553
let request_outpoints = cached_request.outpoints();
497554
if request_outpoints.is_empty() {
@@ -538,8 +595,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
538595
#[cfg(anchors)]
539596
{ // Attributes are not allowed on if expressions on our current MSRV of 1.41.
540597
if cached_request.requires_external_funding() {
541-
let target_feerate_sat_per_1000_weight = cached_request
542-
.compute_package_feerate(fee_estimator, ConfirmationTarget::HighPriority);
598+
let target_feerate_sat_per_1000_weight = cached_request.compute_package_feerate(
599+
fee_estimator, ConfirmationTarget::HighPriority, force_feerate_bump
600+
);
543601
if let Some(htlcs) = cached_request.construct_malleable_package_with_external_funding(self) {
544602
return Some((
545603
new_timer,
@@ -558,7 +616,8 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
558616

559617
let predicted_weight = cached_request.package_weight(&self.destination_script);
560618
if let Some((output_value, new_feerate)) = cached_request.compute_package_output(
561-
predicted_weight, self.destination_script.dust_value().to_sat(), fee_estimator, logger,
619+
predicted_weight, self.destination_script.dust_value().to_sat(),
620+
force_feerate_bump, fee_estimator, logger,
562621
) {
563622
assert!(new_feerate != 0);
564623

@@ -601,7 +660,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
601660
// counterparty's latest commitment don't have any HTLCs present.
602661
let conf_target = ConfirmationTarget::HighPriority;
603662
let package_target_feerate_sat_per_1000_weight = cached_request
604-
.compute_package_feerate(fee_estimator, conf_target);
663+
.compute_package_feerate(fee_estimator, conf_target, force_feerate_bump);
605664
Some((
606665
new_timer,
607666
package_target_feerate_sat_per_1000_weight as u64,
@@ -700,7 +759,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
700759
// Generate claim transactions and track them to bump if necessary at
701760
// height timer expiration (i.e in how many blocks we're going to take action).
702761
for mut req in preprocessed_requests {
703-
if let Some((new_timer, new_feerate, claim)) = self.generate_claim(cur_height, &req, &*fee_estimator, &*logger) {
762+
if let Some((new_timer, new_feerate, claim)) = self.generate_claim(
763+
cur_height, &req, true /* force_feerate_bump */, &*fee_estimator, &*logger,
764+
) {
704765
req.set_timer(new_timer);
705766
req.set_feerate(new_feerate);
706767
let package_id = match claim {
@@ -893,7 +954,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
893954
// Build, bump and rebroadcast tx accordingly
894955
log_trace!(logger, "Bumping {} candidates", bump_candidates.len());
895956
for (package_id, request) in bump_candidates.iter() {
896-
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(cur_height, &request, &*fee_estimator, &*logger) {
957+
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(
958+
cur_height, &request, true /* force_feerate_bump */, &*fee_estimator, &*logger,
959+
) {
897960
match bump_claim {
898961
OnchainClaim::Tx(bump_tx) => {
899962
log_info!(logger, "Broadcasting RBF-bumped onchain {}", log_tx!(bump_tx));
@@ -973,7 +1036,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
9731036
}
9741037
}
9751038
for ((_package_id, _), ref mut request) in bump_candidates.iter_mut() {
976-
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(height, &request, fee_estimator, &&*logger) {
1039+
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(
1040+
height, &request, true /* force_feerate_bump */, fee_estimator, &&*logger
1041+
) {
9771042
request.set_timer(new_timer);
9781043
request.set_feerate(new_feerate);
9791044
match bump_claim {

0 commit comments

Comments
 (0)