@@ -2478,6 +2478,8 @@ where
2478
2478
// | |
2479
2479
// | |__`pending_intercepted_htlcs`
2480
2480
// |
2481
+ // |__`receive_htlcs`
2482
+ // |
2481
2483
// |__`decode_update_add_htlcs`
2482
2484
// |
2483
2485
// |__`per_peer_state`
@@ -2553,7 +2555,7 @@ pub struct ChannelManager<
2553
2555
/// See `ChannelManager` struct-level documentation for lock order requirements.
2554
2556
pending_outbound_payments: OutboundPayments,
2555
2557
2556
- /// SCID/SCID Alias -> forward infos. Key of 0 means payments received.
2558
+ /// SCID/SCID Alias -> forward infos.
2557
2559
///
2558
2560
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
2559
2561
/// though in practice we probably won't be receiving HTLCs for a channel both via the alias
@@ -2562,6 +2564,9 @@ pub struct ChannelManager<
2562
2564
/// Note that no consistency guarantees are made about the existence of a channel with the
2563
2565
/// `short_channel_id` here, nor the `short_channel_id` in the `PendingHTLCInfo`!
2564
2566
///
2567
+ /// This will also hold any [`FailHTLC`]s arising from handling [`Self::pending_intercepted_htlcs`] or
2568
+ /// [`Self::receive_htlcs`].
2569
+ ///
2565
2570
/// See `ChannelManager` struct-level documentation for lock order requirements.
2566
2571
#[cfg(test)]
2567
2572
pub(super) forward_htlcs: Mutex<HashMap<u64, Vec<HTLCForwardInfo>>>,
@@ -2570,9 +2575,21 @@ pub struct ChannelManager<
2570
2575
/// Storage for HTLCs that have been intercepted and bubbled up to the user. We hold them here
2571
2576
/// until the user tells us what we should do with them.
2572
2577
///
2578
+ /// Note that any failures that may arise from handling these will be pushed to
2579
+ /// [`Self::forward_htlcs`] with the previous hop's SCID.
2580
+ ///
2573
2581
/// See `ChannelManager` struct-level documentation for lock order requirements.
2574
2582
pending_intercepted_htlcs: Mutex<HashMap<InterceptId, PendingAddHTLCInfo>>,
2575
-
2583
+ /// Storage for HTLCs that are meant for us.
2584
+ ///
2585
+ /// Note that any failures that may arise from handling these will be pushed to
2586
+ /// [`Self::forward_htlcs`] with the previous hop's SCID.
2587
+ ///
2588
+ /// See `ChannelManager` struct-level documentation for lock order requirements.
2589
+ #[cfg(test)]
2590
+ pub(super) receive_htlcs: Mutex<Vec<HTLCForwardInfo>>,
2591
+ #[cfg(not(test))]
2592
+ receive_htlcs: Mutex<Vec<HTLCForwardInfo>>,
2576
2593
/// SCID/SCID Alias -> pending `update_add_htlc`s to decode.
2577
2594
///
2578
2595
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
@@ -3755,6 +3772,7 @@ where
3755
3772
outbound_scid_aliases: Mutex::new(new_hash_set()),
3756
3773
pending_outbound_payments: OutboundPayments::new(new_hash_map()),
3757
3774
forward_htlcs: Mutex::new(new_hash_map()),
3775
+ receive_htlcs: Mutex::new(Vec::new()),
3758
3776
decode_update_add_htlcs: Mutex::new(new_hash_map()),
3759
3777
claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }),
3760
3778
pending_intercepted_htlcs: Mutex::new(new_hash_map()),
@@ -6494,6 +6512,9 @@ where
6494
6512
if !self.forward_htlcs.lock().unwrap().is_empty() {
6495
6513
return true;
6496
6514
}
6515
+ if !self.receive_htlcs.lock().unwrap().is_empty() {
6516
+ return true;
6517
+ }
6497
6518
if !self.decode_update_add_htlcs.lock().unwrap().is_empty() {
6498
6519
return true;
6499
6520
}
@@ -6541,20 +6562,19 @@ where
6541
6562
6542
6563
for (short_chan_id, mut pending_forwards) in forward_htlcs {
6543
6564
should_persist = NotifyOption::DoPersist;
6544
- if short_chan_id != 0 {
6545
- self.process_forward_htlcs(
6546
- short_chan_id,
6547
- &mut pending_forwards,
6548
- &mut failed_forwards,
6549
- &mut phantom_receives,
6550
- );
6551
- } else {
6552
- self.process_receive_htlcs(
6553
- &mut pending_forwards,
6554
- &mut new_events,
6555
- &mut failed_forwards,
6556
- );
6557
- }
6565
+ self.process_forward_htlcs(
6566
+ short_chan_id,
6567
+ &mut pending_forwards,
6568
+ &mut failed_forwards,
6569
+ &mut phantom_receives,
6570
+ );
6571
+ }
6572
+
6573
+ let mut receive_htlcs = Vec::new();
6574
+ mem::swap(&mut receive_htlcs, &mut self.receive_htlcs.lock().unwrap());
6575
+ if !receive_htlcs.is_empty() {
6576
+ self.process_receive_htlcs(receive_htlcs, &mut new_events, &mut failed_forwards);
6577
+ should_persist = NotifyOption::DoPersist;
6558
6578
}
6559
6579
6560
6580
let best_block_height = self.best_block.read().unwrap().height;
@@ -7068,11 +7088,11 @@ where
7068
7088
}
7069
7089
7070
7090
fn process_receive_htlcs(
7071
- &self, pending_forwards: &mut Vec<HTLCForwardInfo>,
7091
+ &self, receive_htlcs: Vec<HTLCForwardInfo>,
7072
7092
new_events: &mut VecDeque<(Event, Option<EventCompletionAction>)>,
7073
7093
failed_forwards: &mut Vec<FailedHTLCForward>,
7074
7094
) {
7075
- 'next_forwardable_htlc: for forward_info in pending_forwards.drain(.. ) {
7095
+ 'next_forwardable_htlc: for forward_info in receive_htlcs.into_iter( ) {
7076
7096
match forward_info {
7077
7097
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
7078
7098
prev_short_channel_id,
@@ -10613,8 +10633,21 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
10613
10633
let scid = match forward_info.routing {
10614
10634
PendingHTLCRouting::Forward { short_channel_id, .. } => short_channel_id,
10615
10635
PendingHTLCRouting::TrampolineForward { .. } => 0,
10616
- PendingHTLCRouting::Receive { .. } => 0,
10617
- PendingHTLCRouting::ReceiveKeysend { .. } => 0,
10636
+ PendingHTLCRouting::Receive { .. }
10637
+ | PendingHTLCRouting::ReceiveKeysend { .. } => {
10638
+ self.receive_htlcs.lock().unwrap().push(HTLCForwardInfo::AddHTLC(
10639
+ PendingAddHTLCInfo {
10640
+ prev_short_channel_id,
10641
+ prev_counterparty_node_id,
10642
+ prev_funding_outpoint,
10643
+ prev_channel_id,
10644
+ prev_htlc_id,
10645
+ prev_user_channel_id,
10646
+ forward_info,
10647
+ },
10648
+ ));
10649
+ continue;
10650
+ },
10618
10651
};
10619
10652
// Pull this now to avoid introducing a lock order with `forward_htlcs`.
10620
10653
let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid);
@@ -15279,6 +15312,8 @@ where
15279
15312
}
15280
15313
}
15281
15314
15315
+ let receive_htlcs = self.receive_htlcs.lock().unwrap();
15316
+
15282
15317
let mut decode_update_add_htlcs_opt = None;
15283
15318
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
15284
15319
if !decode_update_add_htlcs.is_empty() {
@@ -15446,6 +15481,7 @@ where
15446
15481
(17, in_flight_monitor_updates, option),
15447
15482
(19, peer_storage_dir, optional_vec),
15448
15483
(21, WithoutLength(&self.flow.writeable_async_receive_offer_cache()), required),
15484
+ (23, *receive_htlcs, required_vec),
15449
15485
});
15450
15486
15451
15487
Ok(())
@@ -16006,6 +16042,7 @@ where
16006
16042
const MAX_ALLOC_SIZE: usize = 1024 * 64;
16007
16043
let forward_htlcs_count: u64 = Readable::read(reader)?;
16008
16044
let mut forward_htlcs = hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128));
16045
+ let mut legacy_receive_htlcs: Vec<HTLCForwardInfo> = Vec::new();
16009
16046
for _ in 0..forward_htlcs_count {
16010
16047
let short_channel_id = Readable::read(reader)?;
16011
16048
let pending_forwards_count: u64 = Readable::read(reader)?;
@@ -16014,7 +16051,26 @@ where
16014
16051
MAX_ALLOC_SIZE / mem::size_of::<HTLCForwardInfo>(),
16015
16052
));
16016
16053
for _ in 0..pending_forwards_count {
16017
- pending_forwards.push(Readable::read(reader)?);
16054
+ let pending_htlc = Readable::read(reader)?;
16055
+ // Prior to LDK 0.2, Receive HTLCs used to be stored in `forward_htlcs` under SCID == 0. Here we migrate
16056
+ // the old data if necessary.
16057
+ if short_channel_id == 0 {
16058
+ match pending_htlc {
16059
+ HTLCForwardInfo::AddHTLC(ref htlc_info) => {
16060
+ if matches!(
16061
+ htlc_info.forward_info.routing,
16062
+ PendingHTLCRouting::Receive { .. }
16063
+ | PendingHTLCRouting::ReceiveKeysend { .. }
16064
+ ) {
16065
+ legacy_receive_htlcs.push(pending_htlc);
16066
+ continue;
16067
+ }
16068
+ },
16069
+ _ => {},
16070
+ }
16071
+ }
16072
+
16073
+ pending_forwards.push(pending_htlc);
16018
16074
}
16019
16075
forward_htlcs.insert(short_channel_id, pending_forwards);
16020
16076
}
@@ -16131,6 +16187,7 @@ where
16131
16187
let mut inbound_payment_id_secret = None;
16132
16188
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
16133
16189
let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new();
16190
+ let mut receive_htlcs = None;
16134
16191
read_tlv_fields!(reader, {
16135
16192
(1, pending_outbound_payments_no_retry, option),
16136
16193
(2, pending_intercepted_htlcs, option),
@@ -16149,8 +16206,14 @@ where
16149
16206
(17, in_flight_monitor_updates, option),
16150
16207
(19, peer_storage_dir, optional_vec),
16151
16208
(21, async_receive_offer_cache, (default_value, async_receive_offer_cache)),
16209
+ (23, receive_htlcs, optional_vec),
16152
16210
});
16153
16211
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
16212
+ debug_assert!(
16213
+ receive_htlcs.as_ref().map_or(true, |r| r.is_empty())
16214
+ || legacy_receive_htlcs.is_empty()
16215
+ );
16216
+ let receive_htlcs = receive_htlcs.unwrap_or_else(|| legacy_receive_htlcs);
16154
16217
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
16155
16218
if fake_scid_rand_bytes.is_none() {
16156
16219
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
@@ -16981,6 +17044,7 @@ where
16981
17044
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
16982
17045
16983
17046
forward_htlcs: Mutex::new(forward_htlcs),
17047
+ receive_htlcs: Mutex::new(receive_htlcs),
16984
17048
decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
16985
17049
claimable_payments: Mutex::new(ClaimablePayments {
16986
17050
claimable_payments,
0 commit comments