Skip to content

Commit 5509788

Browse files
committed
Add infra to block ChannelMonitorUpdates on forwarded claims
When we forward a payment and receive an `update_fulfill_htlc` message from the downstream channel, we immediately claim the HTLC on the upstream channel, before even doing a `commitment_signed` dance on the downstream channel. This implies that our `ChannelMonitorUpdate`s "go out" in the right order - first we ensure we'll get our money by writing the preimage down, then we write the update that resolves giving money on the downstream node. This is safe as long as `ChannelMonitorUpdate`s complete in the order in which they are generated, but of course looking forward we want to support asynchronous updates, which may complete in any order. Here we add infrastructure to handle downstream `ChannelMonitorUpdate`s which are blocked on an upstream preimage-containing one. We don't yet actually do the blocking which will come in a future commit.
1 parent 793e901 commit 5509788

File tree

1 file changed

+133
-27
lines changed

1 file changed

+133
-27
lines changed

lightning/src/ln/channelmanager.rs

+133-27
Original file line numberDiff line numberDiff line change
@@ -532,13 +532,31 @@ pub(crate) enum MonitorUpdateCompletionAction {
532532
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
533533
/// event can be generated.
534534
PaymentClaimed { payment_hash: PaymentHash },
535-
/// Indicates an [`events::Event`] should be surfaced to the user.
536-
EmitEvent { event: events::Event },
535+
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
536+
/// operation of another channel.
537+
///
538+
/// This is usually generated when we've forwarded an HTLC and want to block the outbound edge
539+
/// from completing a monitor update which removes the payment preimage until the inbound edge
540+
/// completes a monitor update containing the payment preimage. In that case, after the inbound
541+
/// edge completes, we will surface an [`Event::PaymentForwarded`] as well as unblock the
542+
/// outbound edge.
543+
EmitEventAndFreeOtherChannel {
544+
event: events::Event,
545+
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
546+
},
537547
}
538548

539549
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
540550
(0, PaymentClaimed) => { (0, payment_hash, required) },
541-
(2, EmitEvent) => { (0, event, upgradable_required) },
551+
(2, EmitEventAndFreeOtherChannel) => {
552+
(0, event, upgradable_required),
553+
// LDK prior to 0.0.116 did not have this field as the monitor update application order was
554+
// required by clients. If we downgrade to something prior to 0.0.116 this may result in
555+
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
556+
// support async monitor updates even in LDK 0.0.116 and once we do we'll require no
557+
// downgrades to prior versions.
558+
(1, downstream_counterparty_and_funding_outpoint, option),
559+
},
542560
);
543561

544562
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -555,6 +573,36 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
555573
};
556574
);
557575

576+
#[derive(Clone, PartialEq, Eq, Debug)]
577+
/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
578+
/// the blocked action here. See enum variants for more info.
579+
pub(crate) enum RAAMonitorUpdateBlockingAction {
580+
/// A forwarded payment was claimed. We block the downstream channel completing its monitor
581+
/// update which removes the HTLC preimage until the upstream channel has gotten the preimage
582+
/// durably to disk.
583+
ForwardedPaymentInboundClaim {
584+
/// The upstream channel ID (i.e. the inbound edge).
585+
channel_id: [u8; 32],
586+
/// The HTLC ID on the inbound edge.
587+
htlc_id: u64,
588+
},
589+
}
590+
591+
impl RAAMonitorUpdateBlockingAction {
592+
#[allow(unused)]
593+
fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
594+
Self::ForwardedPaymentInboundClaim {
595+
channel_id: prev_hop.outpoint.to_channel_id(),
596+
htlc_id: prev_hop.htlc_id,
597+
}
598+
}
599+
}
600+
601+
impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
602+
(0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
603+
;);
604+
605+
558606
/// State we hold per-peer.
559607
pub(super) struct PeerState<Signer: ChannelSigner> {
560608
/// `temporary_channel_id` or `channel_id` -> `channel`.
@@ -583,6 +631,11 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
583631
/// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
584632
/// duplicates do not occur, so such channels should fail without a monitor update completing.
585633
monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
634+
/// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
635+
/// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
636+
/// will remove a preimage that needs to be durably in an upstream channel first), we put an
637+
/// entry here to note that the channel with the key's ID is blocked on a set of actions.
638+
actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
586639
/// The peer is currently connected (i.e. we've seen a
587640
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
588641
/// [`ChannelMessageHandler::peer_disconnected`].
@@ -993,6 +1046,8 @@ where
9931046
/// Thus, we place them here to be handled as soon as possible once we are running normally.
9941047
///
9951048
/// See `ChannelManager` struct-level documentation for lock order requirements.
1049+
///
1050+
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
9961051
pending_background_events: Mutex<Vec<BackgroundEvent>>,
9971052
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
9981053
/// Essentially just when we're serializing ourselves out.
@@ -4488,16 +4543,16 @@ where
44884543
Some(claimed_htlc_value - forwarded_htlc_value)
44894544
} else { None };
44904545

4491-
let prev_channel_id = Some(prev_outpoint.to_channel_id());
4492-
let next_channel_id = Some(next_channel_id);
4493-
4494-
Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
4495-
fee_earned_msat,
4496-
claim_from_onchain_tx: from_onchain,
4497-
prev_channel_id,
4498-
next_channel_id,
4499-
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4500-
}})
4546+
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4547+
event: events::Event::PaymentForwarded {
4548+
fee_earned_msat,
4549+
claim_from_onchain_tx: from_onchain,
4550+
prev_channel_id: Some(prev_outpoint.to_channel_id()),
4551+
next_channel_id: Some(next_channel_id),
4552+
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
4553+
},
4554+
downstream_counterparty_and_funding_outpoint: None,
4555+
})
45014556
} else { None }
45024557
});
45034558
if let Err((pk, err)) = res {
@@ -4524,8 +4579,13 @@ where
45244579
}, None));
45254580
}
45264581
},
4527-
MonitorUpdateCompletionAction::EmitEvent { event } => {
4582+
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
4583+
event, downstream_counterparty_and_funding_outpoint
4584+
} => {
45284585
self.pending_events.lock().unwrap().push_back((event, None));
4586+
if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
4587+
self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
4588+
}
45294589
},
45304590
}
45314591
}
@@ -5372,6 +5432,24 @@ where
53725432
}
53735433
}
53745434

5435+
/// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
5436+
/// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
5437+
/// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
5438+
/// the [`ChannelMonitorUpdate`] in question.
5439+
fn raa_monitor_updates_held(&self,
5440+
actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
5441+
channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
5442+
) -> bool {
5443+
actions_blocking_raa_monitor_updates
5444+
.get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
5445+
|| self.pending_events.lock().unwrap().iter().any(|(_, action)| {
5446+
action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
5447+
channel_funding_outpoint,
5448+
counterparty_node_id,
5449+
})
5450+
})
5451+
}
5452+
53755453
fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
53765454
let (htlcs_to_fail, res) = {
53775455
let per_peer_state = self.per_peer_state.read().unwrap();
@@ -6036,25 +6114,37 @@ where
60366114
self.pending_outbound_payments.clear_pending_payments()
60376115
}
60386116

6039-
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
6117+
/// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
6118+
/// [`Event`] being handled) completes, this should be called to restore the channel to normal
6119+
/// operation. It will double-check that nothing *else* is also blocking the same channel from
6120+
/// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
6121+
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
60406122
let mut errors = Vec::new();
60416123
loop {
60426124
let per_peer_state = self.per_peer_state.read().unwrap();
60436125
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
60446126
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
60456127
let peer_state = &mut *peer_state_lck;
6046-
if self.pending_events.lock().unwrap().iter()
6047-
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
6048-
channel_funding_outpoint, counterparty_node_id
6049-
}))
6050-
{
6051-
// Check that, while holding the peer lock, we don't have another event
6052-
// blocking any monitor updates for this channel. If we do, let those
6053-
// events be the ones that ultimately release the monitor update(s).
6054-
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",
6128+
6129+
if let Some(blocker) = completed_blocker.take() {
6130+
// Only do this on the first iteration of the loop.
6131+
if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
6132+
.get_mut(&channel_funding_outpoint.to_channel_id())
6133+
{
6134+
blockers.retain(|iter| iter != &blocker);
6135+
}
6136+
}
6137+
6138+
if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
6139+
channel_funding_outpoint, counterparty_node_id) {
6140+
// Check that, while holding the peer lock, we don't have anything else
6141+
// blocking monitor updates for this channel. If we do, release the monitor
6142+
// update(s) when those blockers complete.
6143+
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
60556144
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
60566145
break;
60576146
}
6147+
60586148
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
60596149
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
60606150
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
@@ -6096,7 +6186,7 @@ where
60966186
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
60976187
channel_funding_outpoint, counterparty_node_id
60986188
} => {
6099-
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
6189+
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
61006190
}
61016191
}
61026192
}
@@ -6772,6 +6862,7 @@ where
67726862
latest_features: init_msg.features.clone(),
67736863
pending_msg_events: Vec::new(),
67746864
monitor_update_blocked_actions: BTreeMap::new(),
6865+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
67756866
is_connected: true,
67766867
}));
67776868
},
@@ -7968,6 +8059,7 @@ where
79688059
latest_features: Readable::read(reader)?,
79698060
pending_msg_events: Vec::new(),
79708061
monitor_update_blocked_actions: BTreeMap::new(),
8062+
actions_blocking_raa_monitor_updates: BTreeMap::new(),
79718063
is_connected: false,
79728064
};
79738065
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
@@ -8049,7 +8141,7 @@ where
80498141
let mut claimable_htlc_purposes = None;
80508142
let mut claimable_htlc_onion_fields = None;
80518143
let mut pending_claiming_payments = Some(HashMap::new());
8052-
let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
8144+
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
80538145
let mut events_override = None;
80548146
read_tlv_fields!(reader, {
80558147
(1, pending_outbound_payments_no_retry, option),
@@ -8374,7 +8466,21 @@ where
83748466
}
83758467

83768468
for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
8377-
if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
8469+
if let Some(peer_state) = per_peer_state.get(&node_id) {
8470+
for (_, actions) in monitor_update_blocked_actions.iter() {
8471+
for action in actions.iter() {
8472+
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
8473+
downstream_counterparty_and_funding_outpoint:
8474+
Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
8475+
} = action {
8476+
if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
8477+
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
8478+
.entry(blocked_channel_outpoint.to_channel_id())
8479+
.or_insert_with(Vec::new).push(blocking_action.clone());
8480+
}
8481+
}
8482+
}
8483+
}
83788484
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
83798485
} else {
83808486
log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);

0 commit comments

Comments
 (0)