Skip to content

Do not track HTLC IDs as separate MPP parts which need claiming #3680

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lightning-liquidity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ lightning-persister = { version = "0.2.0", path = "../lightning-persister", defa

proptest = "1.0.0"
tokio = { version = "1.35", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }
parking_lot = { version = "0.12", default-features = false }

[lints.rust.unexpected_cfgs]
level = "forbid"
2 changes: 1 addition & 1 deletion lightning-persister/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>

// Integration-test the given KVStore implementation. Test relaying a few payments and check that
// the persisted data is updated the appropriate number of times.
pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
pub(crate) fn do_test_store<K: KVStore + Sync>(store_0: &K, store_1: &K) {
let chanmon_cfgs = create_chanmon_cfgs(2);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
1 change: 1 addition & 0 deletions lightning/Cargo.toml
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ inventory = { version = "0.3", optional = true }
regex = "1.5.6"
lightning-types = { version = "0.3.0", path = "../lightning-types", features = ["_test_utils"] }
lightning-macros = { path = "../lightning-macros" }
parking_lot = { version = "0.12", default-features = false }

[dev-dependencies.bitcoin]
version = "0.32.2"
2 changes: 2 additions & 0 deletions lightning/src/lib.rs
Original file line number Diff line number Diff line change
@@ -63,6 +63,8 @@ extern crate core;

#[cfg(ldk_bench)] extern crate criterion;

#[cfg(all(feature = "std", test))] extern crate parking_lot;

#[macro_use]
pub mod util;
pub mod chain;
222 changes: 222 additions & 0 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
@@ -3860,3 +3860,225 @@ fn test_claim_to_closed_channel_blocks_claimed_event() {
nodes[1].chain_monitor.complete_sole_pending_chan_update(&chan_a.2);
expect_payment_claimed!(nodes[1], payment_hash, 1_000_000);
}

#[test]
#[cfg(all(feature = "std", not(target_os = "windows")))]
fn test_single_channel_multiple_mpp() {
use std::sync::atomic::{AtomicBool, Ordering};

// Test what happens when we attempt to claim an MPP with many parts that came to us through
// the same channel with a synchronous persistence interface which has very high latency.
//
// Previously, if a `revoke_and_ack` came in while we were still running in
// `ChannelManager::claim_payment` we'd end up hanging waiting to apply a
// `ChannelMonitorUpdate` until after it completed. See the commit which introduced this test
// for more info.
let chanmon_cfgs = create_chanmon_cfgs(9);
let node_cfgs = create_node_cfgs(9, &chanmon_cfgs);
let configs = [None, None, None, None, None, None, None, None, None];
let node_chanmgrs = create_node_chanmgrs(9, &node_cfgs, &configs);
let mut nodes = create_network(9, &node_cfgs, &node_chanmgrs);

let node_7_id = nodes[7].node.get_our_node_id();
let node_8_id = nodes[8].node.get_our_node_id();

// Send an MPP payment in six parts along the path shown from top to bottom
// 0
// 1 2 3 4 5 6
// 7
// 8
//
// We can in theory reproduce this issue with fewer channels/HTLCs, but getting this test
// robust is rather challenging. We rely on having the main test thread wait on locks held in
// the background `claim_funds` thread and unlocking when the `claim_funds` thread completes a
// single `ChannelMonitorUpdate`.
// This thread calls `get_and_clear_pending_msg_events()` and `handle_revoke_and_ack()`, both
// of which require `ChannelManager` locks, but we have to make sure this thread gets a chance
// to be blocked on the mutexes before we let the background thread wake `claim_funds` so that
// the mutex can switch to this main thread.
// This relies on our locks being fair, but also on our threads getting runtime during the test
// run, which can be pretty competitive. Thus we do a dumb dance to be as conservative as
// possible - we have a background thread which completes a `ChannelMonitorUpdate` (by sending
// into the `write_blocker` mpsc) but it doesn't run until a mpsc channel sends from this main
// thread to the background thread, and then we let it sleep a while before we send the
// `ChannelMonitorUpdate` unblocker.
// Further, we give ourselves two chances each time, needing 4 HTLCs just to unlock our two
// `ChannelManager` calls. We then need a few remaining HTLCs to actually trigger the bug, so
// we use 6 HTLCs.
// Finaly, we do not run this test on Winblowz because it, somehow, in 2025, does not implement
// actual preemptive multitasking and thinks that cooperative multitasking somehow is
// acceptable in the 21st century, let alone a quarter of the way into it.
const MAX_THREAD_INIT_TIME: std::time::Duration = std::time::Duration::from_secs(1);

create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 0, 3, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 0, 4, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 0, 5, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 0, 6, 100_000, 0);

create_announced_chan_between_nodes_with_value(&nodes, 1, 7, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 2, 7, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 3, 7, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 4, 7, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 5, 7, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 6, 7, 100_000, 0);
create_announced_chan_between_nodes_with_value(&nodes, 7, 8, 1_000_000, 0);

let (mut route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(&nodes[0], nodes[8], 50_000_000);

send_along_route_with_secret(&nodes[0], route, &[&[&nodes[1], &nodes[7], &nodes[8]], &[&nodes[2], &nodes[7], &nodes[8]], &[&nodes[3], &nodes[7], &nodes[8]], &[&nodes[4], &nodes[7], &nodes[8]], &[&nodes[5], &nodes[7], &nodes[8]], &[&nodes[6], &nodes[7], &nodes[8]]], 50_000_000, payment_hash, payment_secret);

let (do_a_write, blocker) = std::sync::mpsc::sync_channel(0);
*nodes[8].chain_monitor.write_blocker.lock().unwrap() = Some(blocker);

// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
// We do this by casting a pointer to a `TestChannelManager` to a pointer to a
// `TestChannelManager` with different (in this case 'static) lifetime.
// This is even suggested in the second example at
// https://doc.rust-lang.org/std/mem/fn.transmute.html#examples
let claim_node: &'static TestChannelManager<'static, 'static> =
unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) };
let thrd = std::thread::spawn(move || {
// Initiate the claim in a background thread as it will immediately block waiting on the
// `write_blocker` we set above.
claim_node.claim_funds(payment_preimage);
});

// First unlock one monitor so that we have a pending
// `update_fulfill_htlc`/`commitment_signed` pair to pass to our counterparty.
do_a_write.send(()).unwrap();

// Then fetch the `update_fulfill_htlc`/`commitment_signed`. Note that the
// `get_and_clear_pending_msg_events` will immediately hang trying to take a peer lock which
// `claim_funds` is holding. Thus, we release a second write after a small sleep in the
// background to give `claim_funds` a chance to step forward, unblocking
// `get_and_clear_pending_msg_events`.
let do_a_write_background = do_a_write.clone();
let block_thrd2 = AtomicBool::new(true);
let block_thrd2_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd2) };
let thrd2 = std::thread::spawn(move || {
while block_thrd2_read.load(Ordering::Acquire) {
std::thread::yield_now();
}
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
});
block_thrd2.store(false, Ordering::Release);
let first_updates = get_htlc_update_msgs(&nodes[8], &nodes[7].node.get_our_node_id());
thrd2.join().unwrap();

// Disconnect node 6 from all its peers so it doesn't bother to fail the HTLCs back
nodes[7].node.peer_disconnected(nodes[1].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[2].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[3].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[4].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[5].node.get_our_node_id());
nodes[7].node.peer_disconnected(nodes[6].node.get_our_node_id());

nodes[7].node.handle_update_fulfill_htlc(node_8_id, &first_updates.update_fulfill_htlcs[0]);
check_added_monitors(&nodes[7], 1);
expect_payment_forwarded!(nodes[7], nodes[1], nodes[8], Some(1000), false, false);
nodes[7].node.handle_commitment_signed(node_8_id, &first_updates.commitment_signed);
check_added_monitors(&nodes[7], 1);
let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_8_id);

// Now, handle the `revoke_and_ack` from node 5. Note that `claim_funds` is still blocked on
// our peer lock, so we have to release a write to let it process.
// After this call completes, the channel previously would be locked up and should not be able
// to make further progress.
let do_a_write_background = do_a_write.clone();
let block_thrd3 = AtomicBool::new(true);
let block_thrd3_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd3) };
let thrd3 = std::thread::spawn(move || {
while block_thrd3_read.load(Ordering::Acquire) {
std::thread::yield_now();
}
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
std::thread::sleep(MAX_THREAD_INIT_TIME);
do_a_write_background.send(()).unwrap();
});
block_thrd3.store(false, Ordering::Release);
nodes[8].node.handle_revoke_and_ack(node_7_id, &raa);
thrd3.join().unwrap();
assert!(!thrd.is_finished());

let thrd4 = std::thread::spawn(move || {
do_a_write.send(()).unwrap();
do_a_write.send(()).unwrap();
});

thrd4.join().unwrap();
thrd.join().unwrap();

expect_payment_claimed!(nodes[8], payment_hash, 50_000_000);

// At the end, we should have 7 ChannelMonitorUpdates - 6 for HTLC claims, and one for the
// above `revoke_and_ack`.
check_added_monitors(&nodes[8], 7);

// Now drive everything to the end, at least as far as node 7 is concerned...
*nodes[8].chain_monitor.write_blocker.lock().unwrap() = None;
nodes[8].node.handle_commitment_signed(node_7_id, &cs);
check_added_monitors(&nodes[8], 1);

let (updates, raa) = get_updates_and_revoke(&nodes[8], &nodes[7].node.get_our_node_id());

nodes[7].node.handle_update_fulfill_htlc(node_8_id, &updates.update_fulfill_htlcs[0]);
expect_payment_forwarded!(nodes[7], nodes[2], nodes[8], Some(1000), false, false);
nodes[7].node.handle_update_fulfill_htlc(node_8_id, &updates.update_fulfill_htlcs[1]);
expect_payment_forwarded!(nodes[7], nodes[3], nodes[8], Some(1000), false, false);
let mut next_source = 4;
if let Some(update) = updates.update_fulfill_htlcs.get(2) {
nodes[7].node.handle_update_fulfill_htlc(node_8_id, update);
expect_payment_forwarded!(nodes[7], nodes[4], nodes[8], Some(1000), false, false);
next_source += 1;
}

nodes[7].node.handle_commitment_signed(node_8_id, &updates.commitment_signed);
nodes[7].node.handle_revoke_and_ack(node_8_id, &raa);
if updates.update_fulfill_htlcs.get(2).is_some() {
check_added_monitors(&nodes[7], 5);
} else {
check_added_monitors(&nodes[7], 4);
}

let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_8_id);

nodes[8].node.handle_revoke_and_ack(node_7_id, &raa);
nodes[8].node.handle_commitment_signed(node_7_id, &cs);
check_added_monitors(&nodes[8], 2);

let (updates, raa) = get_updates_and_revoke(&nodes[8], &node_7_id);

nodes[7].node.handle_update_fulfill_htlc(node_8_id, &updates.update_fulfill_htlcs[0]);
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
next_source += 1;
nodes[7].node.handle_update_fulfill_htlc(node_8_id, &updates.update_fulfill_htlcs[1]);
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
next_source += 1;
if let Some(update) = updates.update_fulfill_htlcs.get(2) {
nodes[7].node.handle_update_fulfill_htlc(node_8_id, update);
expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false);
}

nodes[7].node.handle_commitment_signed(node_8_id, &updates.commitment_signed);
nodes[7].node.handle_revoke_and_ack(node_8_id, &raa);
if updates.update_fulfill_htlcs.get(2).is_some() {
check_added_monitors(&nodes[7], 5);
} else {
check_added_monitors(&nodes[7], 4);
}

let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_8_id);
nodes[8].node.handle_revoke_and_ack(node_7_id, &raa);
nodes[8].node.handle_commitment_signed(node_7_id, &cs);
check_added_monitors(&nodes[8], 2);

let raa = get_event_msg!(nodes[8], MessageSendEvent::SendRevokeAndACK, node_7_id);
nodes[7].node.handle_revoke_and_ack(node_8_id, &raa);
check_added_monitors(&nodes[7], 1);
}
60 changes: 34 additions & 26 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
@@ -1132,7 +1132,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
/// A pending MPP claim which hasn't yet completed.
///
/// Not written to disk.
pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
pending_mpp_claim: Option<(PublicKey, ChannelId, PendingMPPClaimPointer)>,
},
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
/// operation of another channel.
@@ -1234,10 +1234,16 @@ impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
}
}

#[derive(Debug)]
pub(crate) struct PendingMPPClaim {
channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
/// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is
/// tracked in [`PendingMPPClaim`] as well as in [`ChannelMonitor`]s, so that it can be converted
/// to an [`HTLCClaimSource`] for claim replays on startup.
/// tracked in [`ChannelMonitor`]s, so that it can be converted to an [`HTLCClaimSource`] for claim
/// replays on startup.
struct MPPClaimHTLCSource {
counterparty_node_id: PublicKey,
funding_txo: OutPoint,
@@ -1252,12 +1258,6 @@ impl_writeable_tlv_based!(MPPClaimHTLCSource, {
(6, htlc_id, required),
});

#[derive(Debug)]
pub(crate) struct PendingMPPClaim {
channels_without_preimage: Vec<MPPClaimHTLCSource>,
channels_with_preimage: Vec<MPPClaimHTLCSource>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
/// When we're claiming a(n MPP) payment, we want to store information about that payment in the
/// [`ChannelMonitor`] so that we can replay the claim without any information from the
@@ -7207,8 +7207,15 @@ where
}
}).collect();
let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
let mut channels_without_preimage = Vec::with_capacity(mpp_parts.len());
for part in mpp_parts.iter() {
let chan = (part.counterparty_node_id, part.funding_txo, part.channel_id);
if !channels_without_preimage.contains(&chan) {
channels_without_preimage.push(chan);
}
}
Some(Arc::new(Mutex::new(PendingMPPClaim {
channels_without_preimage: mpp_parts.clone(),
channels_without_preimage,
channels_with_preimage: Vec::new(),
})))
} else {
@@ -7219,7 +7226,7 @@ where
let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
Some((cp_id, htlc.prev_hop.channel_id, claim_ptr))
} else {
None
}
@@ -7552,7 +7559,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
for action in actions.into_iter() {
match action {
MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
if let Some((counterparty_node_id, chan_id, claim_ptr)) = pending_mpp_claim {
let per_peer_state = self.per_peer_state.read().unwrap();
per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
let mut peer_state = peer_state_mutex.lock().unwrap();
@@ -7563,24 +7570,17 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
if *pending_claim == claim_ptr {
let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
let pending_claim_state = &mut *pending_claim_state_lock;
pending_claim_state.channels_without_preimage.retain(|htlc_info| {
pending_claim_state.channels_without_preimage.retain(|(cp, op, cid)| {
let this_claim =
htlc_info.counterparty_node_id == counterparty_node_id
&& htlc_info.channel_id == chan_id
&& htlc_info.htlc_id == htlc_id;
*cp == counterparty_node_id && *cid == chan_id;
if this_claim {
pending_claim_state.channels_with_preimage.push(htlc_info.clone());
pending_claim_state.channels_with_preimage.push((*cp, *op, *cid));
false
} else { true }
});
if pending_claim_state.channels_without_preimage.is_empty() {
for htlc_info in pending_claim_state.channels_with_preimage.iter() {
let freed_chan = (
htlc_info.counterparty_node_id,
htlc_info.funding_txo,
htlc_info.channel_id,
blocker.clone()
);
for (cp, op, cid) in pending_claim_state.channels_with_preimage.iter() {
let freed_chan = (*cp, *op, *cid, blocker.clone());
freed_channels.push(freed_chan);
}
}
@@ -14786,8 +14786,16 @@ where
if payment_claim.mpp_parts.is_empty() {
return Err(DecodeError::InvalidValue);
}
let mut channels_without_preimage = payment_claim.mpp_parts.iter()
.map(|htlc_info| (htlc_info.counterparty_node_id, htlc_info.funding_txo, htlc_info.channel_id))
.collect::<Vec<_>>();
// If we have multiple MPP parts which were received over the same channel,
// we only track it once as once we get a preimage durably in the
// `ChannelMonitor` it will be used for all HTLCs with a matching hash.
channels_without_preimage.sort_unstable();
channels_without_preimage.dedup();
let pending_claims = PendingMPPClaim {
channels_without_preimage: payment_claim.mpp_parts.clone(),
channels_without_preimage,
channels_with_preimage: Vec::new(),
};
let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
@@ -14820,7 +14828,7 @@ where

for part in payment_claim.mpp_parts.iter() {
let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
part.counterparty_node_id, part.channel_id, part.htlc_id,
part.counterparty_node_id, part.channel_id,
PendingMPPClaimPointer(Arc::clone(&ptr))
));
let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
Loading