Skip to content

Commit 9818d0c

Browse files
committed
Add onion mailbox for async receivers
1 parent 9414343 commit 9818d0c

File tree

7 files changed

+196
-33
lines changed

7 files changed

+196
-33
lines changed

src/builder.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,10 @@ fn build_with_store_internal(
13781378
100;
13791379
}
13801380

1381+
if config.async_payment_services_enabled {
1382+
user_config.accept_forwards_to_priv_channels = true;
1383+
}
1384+
13811385
let message_router =
13821386
Arc::new(MessageRouter::new(Arc::clone(&network_graph), Arc::clone(&keys_manager)));
13831387

@@ -1448,17 +1452,31 @@ fn build_with_store_internal(
14481452
}
14491453

14501454
// Initialize the PeerManager
1451-
let onion_messenger: Arc<OnionMessenger> = Arc::new(OnionMessenger::new(
1452-
Arc::clone(&keys_manager),
1453-
Arc::clone(&keys_manager),
1454-
Arc::clone(&logger),
1455-
Arc::clone(&channel_manager),
1456-
message_router,
1457-
Arc::clone(&channel_manager),
1458-
Arc::clone(&channel_manager),
1459-
IgnoringMessageHandler {},
1460-
IgnoringMessageHandler {},
1461-
));
1455+
let onion_messenger: Arc<OnionMessenger> = if config.async_payment_services_enabled {
1456+
Arc::new(OnionMessenger::new_with_offline_peer_interception(
1457+
Arc::clone(&keys_manager),
1458+
Arc::clone(&keys_manager),
1459+
Arc::clone(&logger),
1460+
Arc::clone(&channel_manager),
1461+
message_router,
1462+
Arc::clone(&channel_manager),
1463+
Arc::clone(&channel_manager),
1464+
IgnoringMessageHandler {},
1465+
IgnoringMessageHandler {},
1466+
))
1467+
} else {
1468+
Arc::new(OnionMessenger::new(
1469+
Arc::clone(&keys_manager),
1470+
Arc::clone(&keys_manager),
1471+
Arc::clone(&logger),
1472+
Arc::clone(&channel_manager),
1473+
message_router,
1474+
Arc::clone(&channel_manager),
1475+
Arc::clone(&channel_manager),
1476+
IgnoringMessageHandler {},
1477+
IgnoringMessageHandler {},
1478+
))
1479+
};
14621480
let ephemeral_bytes: [u8; 32] = keys_manager.get_secure_random_bytes();
14631481

14641482
// Initialize the GossipSource

src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,8 @@ pub(crate) fn default_user_config(config: &Config) -> UserConfig {
330330
user_config.channel_handshake_limits.force_announced_channel_preference = true;
331331
}
332332

333+
user_config.enable_htlc_hold = true;
334+
333335
user_config
334336
}
335337

src/event.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
8+
use crate::om_mailbox::OnionMessageMailbox;
9+
use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet};
910
use crate::{
1011
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
1112
UserChannelId,
@@ -459,6 +460,8 @@ where
459460
logger: L,
460461
config: Arc<Config>,
461462
static_invoice_store: Option<StaticInvoiceStore>,
463+
onion_messenger: Arc<OnionMessenger>,
464+
om_mailbox: OnionMessageMailbox,
462465
}
463466

464467
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -472,8 +475,8 @@ where
472475
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
473476
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
474477
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
475-
static_invoice_store: Option<StaticInvoiceStore>, runtime: Arc<Runtime>, logger: L,
476-
config: Arc<Config>,
478+
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
479+
om_mailbox: OnionMessageMailbox, runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
477480
) -> Self {
478481
Self {
479482
event_queue,
@@ -490,6 +493,8 @@ where
490493
runtime,
491494
config,
492495
static_invoice_store,
496+
onion_messenger,
497+
om_mailbox,
493498
}
494499
}
495500

@@ -1491,11 +1496,15 @@ where
14911496

14921497
self.bump_tx_event_handler.handle_event(&bte).await;
14931498
},
1494-
LdkEvent::OnionMessageIntercepted { .. } => {
1495-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1499+
LdkEvent::OnionMessageIntercepted { peer_node_id, message } => {
1500+
self.om_mailbox.onion_message_intercepted(peer_node_id, message);
14961501
},
1497-
LdkEvent::OnionMessagePeerConnected { .. } => {
1498-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1502+
LdkEvent::OnionMessagePeerConnected { peer_node_id } => {
1503+
let messages = self.om_mailbox.onion_message_peer_connected(peer_node_id);
1504+
1505+
for message in messages {
1506+
let _ = self.onion_messenger.forward_onion_message(message, &peer_node_id);
1507+
}
14991508
},
15001509

15011510
LdkEvent::PersistStaticInvoice {

src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub mod io;
9292
pub mod liquidity;
9393
pub mod logger;
9494
mod message_handler;
95+
mod om_mailbox;
9596
pub mod payment;
9697
mod peer_store;
9798
mod runtime;
@@ -171,6 +172,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
171172
use std::sync::{Arc, Mutex, RwLock};
172173
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
173174

175+
use crate::om_mailbox::OnionMessageMailbox;
176+
174177
#[cfg(feature = "uniffi")]
175178
uniffi::include_scaffolding!("ldk_node");
176179

@@ -504,6 +507,7 @@ impl Node {
504507
} else {
505508
None
506509
};
510+
let om_mailbox = OnionMessageMailbox::new();
507511

508512
let event_handler = Arc::new(EventHandler::new(
509513
Arc::clone(&self.event_queue),
@@ -517,6 +521,8 @@ impl Node {
517521
Arc::clone(&self.payment_store),
518522
Arc::clone(&self.peer_store),
519523
static_invoice_store,
524+
Arc::clone(&self.onion_messenger),
525+
om_mailbox,
520526
Arc::clone(&self.runtime),
521527
Arc::clone(&self.logger),
522528
Arc::clone(&self.config),

src/om_mailbox.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use std::{
2+
collections::{hash_map::Entry, HashMap, VecDeque},
3+
sync::Mutex,
4+
};
5+
6+
use lightning::ln::msgs::OnionMessage;
7+
8+
pub(crate) struct OnionMessageMailbox {
9+
map: Mutex<HashMap<bitcoin::secp256k1::PublicKey, VecDeque<lightning::ln::msgs::OnionMessage>>>,
10+
}
11+
12+
impl OnionMessageMailbox {
13+
const MAX_MESSAGES_PER_PEER: usize = 100;
14+
const MAX_PEERS: usize = 100;
15+
16+
pub fn new() -> Self {
17+
Self { map: Mutex::new(HashMap::new()) }
18+
}
19+
20+
pub(crate) fn onion_message_intercepted(
21+
&self, peer_node_id: bitcoin::secp256k1::PublicKey,
22+
message: lightning::ln::msgs::OnionMessage,
23+
) {
24+
let mut map = self.map.lock().unwrap();
25+
26+
let queue = map.entry(peer_node_id).or_insert_with(VecDeque::new);
27+
if queue.len() >= Self::MAX_MESSAGES_PER_PEER {
28+
queue.pop_front();
29+
}
30+
queue.push_back(message);
31+
32+
// Enforce a peers limit. If exceeded, evict the peer with the longest queue.
33+
if map.len() > Self::MAX_PEERS {
34+
let peer_to_remove = map
35+
.iter()
36+
.max_by_key(|(_, queue)| queue.len())
37+
.map(|(peer, _)| peer.clone())
38+
.unwrap();
39+
40+
map.remove(&peer_to_remove);
41+
}
42+
}
43+
44+
pub(crate) fn onion_message_peer_connected(
45+
&self, peer_node_id: bitcoin::secp256k1::PublicKey,
46+
) -> Vec<OnionMessage> {
47+
let mut map = self.map.lock().unwrap();
48+
49+
match map.entry(peer_node_id) {
50+
Entry::Occupied(mut entry) => {
51+
let queue = std::mem::take(entry.get_mut());
52+
queue.into()
53+
},
54+
Entry::Vacant(_) => Vec::new(),
55+
}
56+
}
57+
}
58+
59+
#[cfg(test)]
60+
mod tests {
61+
use bitcoin::{
62+
key::Secp256k1,
63+
secp256k1::{PublicKey, SecretKey},
64+
};
65+
use lightning::onion_message;
66+
67+
use crate::om_mailbox::OnionMessageMailbox;
68+
69+
#[test]
70+
fn onion_message_mailbox() {
71+
let mailbox = OnionMessageMailbox::new();
72+
73+
let secp = Secp256k1::new();
74+
let sk_bytes = [12; 32];
75+
let sk = SecretKey::from_slice(&sk_bytes).unwrap();
76+
let peer_node_id = PublicKey::from_secret_key(&secp, &sk);
77+
78+
let blinding_sk = SecretKey::from_slice(&[13; 32]).unwrap();
79+
let blinding_point = PublicKey::from_secret_key(&secp, &blinding_sk);
80+
81+
let message_sk = SecretKey::from_slice(&[13; 32]).unwrap();
82+
let message_point = PublicKey::from_secret_key(&secp, &message_sk);
83+
84+
let message = lightning::ln::msgs::OnionMessage {
85+
blinding_point,
86+
onion_routing_packet: onion_message::packet::Packet {
87+
version: 0,
88+
public_key: message_point,
89+
hop_data: vec![1, 2, 3],
90+
hmac: [0; 32],
91+
},
92+
};
93+
mailbox.onion_message_intercepted(peer_node_id, message.clone());
94+
95+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
96+
assert_eq!(messages.len(), 1);
97+
assert_eq!(messages[0], message);
98+
99+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
100+
assert_eq!(messages.len(), 0);
101+
}
102+
}

tests/common/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,14 +589,21 @@ pub(crate) fn bump_fee_and_broadcast<E: ElectrumApi>(
589589
pub fn open_channel(
590590
node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool,
591591
electrsd: &ElectrsD,
592+
) -> OutPoint {
593+
open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd)
594+
}
595+
596+
pub fn open_channel_push_amt(
597+
node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option<u64>,
598+
should_announce: bool, electrsd: &ElectrsD,
592599
) -> OutPoint {
593600
if should_announce {
594601
node_a
595602
.open_announced_channel(
596603
node_b.node_id(),
597604
node_b.listening_addresses().unwrap().first().unwrap().clone(),
598605
funding_amount_sat,
599-
None,
606+
push_amount_msat,
600607
None,
601608
)
602609
.unwrap();
@@ -606,7 +613,7 @@ pub fn open_channel(
606613
node_b.node_id(),
607614
node_b.listening_addresses().unwrap().first().unwrap().clone(),
608615
funding_amount_sat,
609-
None,
616+
push_amount_msat,
610617
None,
611618
)
612619
.unwrap();

tests/integration_tests_rust.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ use common::{
1313
expect_payment_claimable_event, expect_payment_received_event, expect_payment_successful_event,
1414
generate_blocks_and_wait,
1515
logging::{init_log_logger, validate_log_entry, TestLogWriter},
16-
open_channel, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_config,
17-
random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node,
18-
setup_two_nodes, wait_for_tx, TestChainSource, TestSyncStore,
16+
open_channel, open_channel_push_amt, premine_and_distribute_funds, premine_blocks, prepare_rbf,
17+
random_config, random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder,
18+
setup_node, setup_two_nodes, wait_for_tx, TestChainSource, TestSyncStore,
1919
};
2020

2121
use ldk_node::config::EsploraSyncConfig;
@@ -1135,17 +1135,22 @@ fn static_invoice_server() {
11351135
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
11361136
let chain_source = TestChainSource::Esplora(&electrsd);
11371137

1138-
let config_sender = random_config(true);
1138+
let mut config_sender = random_config(true);
1139+
config_sender.node_config.listening_addresses = None;
1140+
config_sender.node_config.node_alias = None;
11391141
let node_sender = setup_node(&chain_source, config_sender, None);
11401142

1141-
let config_sender_lsp = random_config(true);
1143+
let mut config_sender_lsp = random_config(true);
1144+
config_sender_lsp.node_config.async_payment_services_enabled = true;
11421145
let node_sender_lsp = setup_node(&chain_source, config_sender_lsp, None);
11431146

11441147
let mut config_receiver_lsp = random_config(true);
11451148
config_receiver_lsp.node_config.async_payment_services_enabled = true;
11461149
let node_receiver_lsp = setup_node(&chain_source, config_receiver_lsp, None);
11471150

1148-
let config_receiver = random_config(true);
1151+
let mut config_receiver = random_config(true);
1152+
config_receiver.node_config.listening_addresses = None;
1153+
config_receiver.node_config.node_alias = None;
11491154
let node_receiver = setup_node(&chain_source, config_receiver, None);
11501155

11511156
let address_sender = node_sender.onchain_payment().new_address().unwrap();
@@ -1165,9 +1170,16 @@ fn static_invoice_server() {
11651170
node_receiver_lsp.sync_wallets().unwrap();
11661171
node_receiver.sync_wallets().unwrap();
11671172

1168-
open_channel(&node_sender, &node_sender_lsp, 400_000, true, &electrsd);
1173+
open_channel(&node_sender, &node_sender_lsp, 400_000, false, &electrsd);
11691174
open_channel(&node_sender_lsp, &node_receiver_lsp, 400_000, true, &electrsd);
1170-
open_channel(&node_receiver_lsp, &node_receiver, 400_000, true, &electrsd);
1175+
open_channel_push_amt(
1176+
&node_receiver,
1177+
&node_receiver_lsp,
1178+
400_000,
1179+
Some(200_000_000),
1180+
false,
1181+
&electrsd,
1182+
);
11711183

11721184
generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6);
11731185

@@ -1190,14 +1202,14 @@ fn static_invoice_server() {
11901202
.filter(|n| {
11911203
node.network_graph().node(n).map_or(false, |info| info.announcement_info.is_some())
11921204
})
1193-
.count() >= 4
1205+
.count() >= 2
11941206
};
11951207

11961208
// Wait for everyone to see all channels and node announcements.
1197-
while node_sender.network_graph().list_channels().len() < 3
1198-
|| node_sender_lsp.network_graph().list_channels().len() < 3
1199-
|| node_receiver_lsp.network_graph().list_channels().len() < 3
1200-
|| node_receiver.network_graph().list_channels().len() < 3
1209+
while node_sender.network_graph().list_channels().len() < 1
1210+
|| node_sender_lsp.network_graph().list_channels().len() < 1
1211+
|| node_receiver_lsp.network_graph().list_channels().len() < 1
1212+
|| node_receiver.network_graph().list_channels().len() < 1
12011213
|| !has_node_announcements(&node_sender)
12021214
|| !has_node_announcements(&node_sender_lsp)
12031215
|| !has_node_announcements(&node_receiver_lsp)
@@ -1219,9 +1231,16 @@ fn static_invoice_server() {
12191231
std::thread::sleep(std::time::Duration::from_millis(100));
12201232
};
12211233

1234+
node_receiver.stop().unwrap();
1235+
12221236
let payment_id =
12231237
node_sender.bolt12_payment().send_using_amount(&offer, 5_000, None, None).unwrap();
12241238

1239+
// Sleep to allow the payment reach a state where the htlc is held and waiting for the receiver to come online.
1240+
std::thread::sleep(std::time::Duration::from_millis(5000));
1241+
1242+
node_receiver.start().unwrap();
1243+
12251244
expect_payment_successful_event!(node_sender, Some(payment_id), None);
12261245
}
12271246

0 commit comments

Comments
 (0)