Skip to content

Commit 89d74d5

Browse files
committed
Add onion mailbox for async receivers
1 parent 9414343 commit 89d74d5

File tree

7 files changed

+213
-33
lines changed

7 files changed

+213
-33
lines changed

src/builder.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::liquidity::{
2727
};
2828
use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
2929
use crate::message_handler::NodeCustomMessageHandler;
30+
use crate::om_mailbox::OnionMessageMailbox;
3031
use crate::peer_store::PeerStore;
3132
use crate::runtime::Runtime;
3233
use crate::tx_broadcaster::TransactionBroadcaster;
@@ -1378,6 +1379,10 @@ fn build_with_store_internal(
13781379
100;
13791380
}
13801381

1382+
if config.async_payment_services_enabled {
1383+
user_config.accept_forwards_to_priv_channels = true;
1384+
}
1385+
13811386
let message_router =
13821387
Arc::new(MessageRouter::new(Arc::clone(&network_graph), Arc::clone(&keys_manager)));
13831388

@@ -1448,17 +1453,31 @@ fn build_with_store_internal(
14481453
}
14491454

14501455
// Initialize the PeerManager
1451-
let onion_messenger: Arc<OnionMessenger> = Arc::new(OnionMessenger::new(
1452-
Arc::clone(&keys_manager),
1453-
Arc::clone(&keys_manager),
1454-
Arc::clone(&logger),
1455-
Arc::clone(&channel_manager),
1456-
message_router,
1457-
Arc::clone(&channel_manager),
1458-
Arc::clone(&channel_manager),
1459-
IgnoringMessageHandler {},
1460-
IgnoringMessageHandler {},
1461-
));
1456+
let onion_messenger: Arc<OnionMessenger> = if config.async_payment_services_enabled {
1457+
Arc::new(OnionMessenger::new_with_offline_peer_interception(
1458+
Arc::clone(&keys_manager),
1459+
Arc::clone(&keys_manager),
1460+
Arc::clone(&logger),
1461+
Arc::clone(&channel_manager),
1462+
message_router,
1463+
Arc::clone(&channel_manager),
1464+
Arc::clone(&channel_manager),
1465+
IgnoringMessageHandler {},
1466+
IgnoringMessageHandler {},
1467+
))
1468+
} else {
1469+
Arc::new(OnionMessenger::new(
1470+
Arc::clone(&keys_manager),
1471+
Arc::clone(&keys_manager),
1472+
Arc::clone(&logger),
1473+
Arc::clone(&channel_manager),
1474+
message_router,
1475+
Arc::clone(&channel_manager),
1476+
Arc::clone(&channel_manager),
1477+
IgnoringMessageHandler {},
1478+
IgnoringMessageHandler {},
1479+
))
1480+
};
14621481
let ephemeral_bytes: [u8; 32] = keys_manager.get_secure_random_bytes();
14631482

14641483
// Initialize the GossipSource
@@ -1645,6 +1664,8 @@ fn build_with_store_internal(
16451664
},
16461665
};
16471666

1667+
let om_mailbox = Arc::new(OnionMessageMailbox::new());
1668+
16481669
let (stop_sender, _) = tokio::sync::watch::channel(());
16491670
let (background_processor_stop_sender, _) = tokio::sync::watch::channel(());
16501671
let is_running = Arc::new(RwLock::new(false));
@@ -1677,6 +1698,7 @@ fn build_with_store_internal(
16771698
is_running,
16781699
is_listening,
16791700
node_metrics,
1701+
om_mailbox,
16801702
})
16811703
}
16821704

src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,8 @@ pub(crate) fn default_user_config(config: &Config) -> UserConfig {
330330
user_config.channel_handshake_limits.force_announced_channel_preference = true;
331331
}
332332

333+
user_config.enable_htlc_hold = true;
334+
333335
user_config
334336
}
335337

src/event.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};
8+
use crate::om_mailbox::OnionMessageMailbox;
9+
use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet};
910
use crate::{
1011
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
1112
UserChannelId,
@@ -459,6 +460,8 @@ where
459460
logger: L,
460461
config: Arc<Config>,
461462
static_invoice_store: Option<StaticInvoiceStore>,
463+
onion_messenger: Arc<OnionMessenger>,
464+
om_mailbox: Arc<OnionMessageMailbox>,
462465
}
463466

464467
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -472,7 +475,8 @@ where
472475
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
473476
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
474477
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
475-
static_invoice_store: Option<StaticInvoiceStore>, runtime: Arc<Runtime>, logger: L,
478+
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
479+
om_mailbox: Arc<OnionMessageMailbox>, runtime: Arc<Runtime>, logger: L,
476480
config: Arc<Config>,
477481
) -> Self {
478482
Self {
@@ -490,6 +494,8 @@ where
490494
runtime,
491495
config,
492496
static_invoice_store,
497+
onion_messenger,
498+
om_mailbox,
493499
}
494500
}
495501

@@ -1491,11 +1497,15 @@ where
14911497

14921498
self.bump_tx_event_handler.handle_event(&bte).await;
14931499
},
1494-
LdkEvent::OnionMessageIntercepted { .. } => {
1495-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1500+
LdkEvent::OnionMessageIntercepted { peer_node_id, message } => {
1501+
self.om_mailbox.onion_message_intercepted(peer_node_id, message);
14961502
},
1497-
LdkEvent::OnionMessagePeerConnected { .. } => {
1498-
debug_assert!(false, "We currently don't support onion message interception, so this event should never be emitted.");
1503+
LdkEvent::OnionMessagePeerConnected { peer_node_id } => {
1504+
let messages = self.om_mailbox.onion_message_peer_connected(peer_node_id);
1505+
1506+
for message in messages {
1507+
let _ = self.onion_messenger.forward_onion_message(message, &peer_node_id);
1508+
}
14991509
},
15001510

15011511
LdkEvent::PersistStaticInvoice {

src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub mod io;
9292
pub mod liquidity;
9393
pub mod logger;
9494
mod message_handler;
95+
mod om_mailbox;
9596
pub mod payment;
9697
mod peer_store;
9798
mod runtime;
@@ -171,6 +172,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
171172
use std::sync::{Arc, Mutex, RwLock};
172173
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
173174

175+
use crate::om_mailbox::OnionMessageMailbox;
176+
174177
#[cfg(feature = "uniffi")]
175178
uniffi::include_scaffolding!("ldk_node");
176179

@@ -205,6 +208,7 @@ pub struct Node {
205208
is_running: Arc<RwLock<bool>>,
206209
is_listening: Arc<AtomicBool>,
207210
node_metrics: Arc<RwLock<NodeMetrics>>,
211+
om_mailbox: Arc<OnionMessageMailbox>,
208212
}
209213

210214
impl Node {
@@ -517,6 +521,8 @@ impl Node {
517521
Arc::clone(&self.payment_store),
518522
Arc::clone(&self.peer_store),
519523
static_invoice_store,
524+
Arc::clone(&self.onion_messenger),
525+
Arc::clone(&self.om_mailbox),
520526
Arc::clone(&self.runtime),
521527
Arc::clone(&self.logger),
522528
Arc::clone(&self.config),
@@ -1491,6 +1497,11 @@ impl Node {
14911497
Error::PersistenceFailed
14921498
})
14931499
}
1500+
1501+
#[allow(missing_docs)]
1502+
pub fn om_mailbox_is_empty(&self) -> bool {
1503+
self.om_mailbox.is_empty()
1504+
}
14941505
}
14951506

14961507
impl Drop for Node {

src/om_mailbox.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use std::{
2+
collections::{HashMap, VecDeque},
3+
sync::Mutex,
4+
};
5+
6+
use lightning::ln::msgs::OnionMessage;
7+
8+
pub(crate) struct OnionMessageMailbox {
9+
map: Mutex<HashMap<bitcoin::secp256k1::PublicKey, VecDeque<lightning::ln::msgs::OnionMessage>>>,
10+
}
11+
12+
impl OnionMessageMailbox {
13+
const MAX_MESSAGES_PER_PEER: usize = 100;
14+
const MAX_PEERS: usize = 100;
15+
16+
pub fn new() -> Self {
17+
Self { map: Mutex::new(HashMap::new()) }
18+
}
19+
20+
pub(crate) fn onion_message_intercepted(
21+
&self, peer_node_id: bitcoin::secp256k1::PublicKey,
22+
message: lightning::ln::msgs::OnionMessage,
23+
) {
24+
let mut map = self.map.lock().unwrap();
25+
26+
let queue = map.entry(peer_node_id).or_insert_with(VecDeque::new);
27+
if queue.len() >= Self::MAX_MESSAGES_PER_PEER {
28+
queue.pop_front();
29+
}
30+
queue.push_back(message);
31+
32+
// Enforce a peers limit. If exceeded, evict the peer with the longest queue.
33+
if map.len() > Self::MAX_PEERS {
34+
let peer_to_remove = map
35+
.iter()
36+
.max_by_key(|(_, queue)| queue.len())
37+
.map(|(peer, _)| peer.clone())
38+
.unwrap();
39+
40+
map.remove(&peer_to_remove);
41+
}
42+
}
43+
44+
pub(crate) fn onion_message_peer_connected(
45+
&self, peer_node_id: bitcoin::secp256k1::PublicKey,
46+
) -> Vec<OnionMessage> {
47+
let mut map = self.map.lock().unwrap();
48+
49+
if let Some(queue) = map.remove(&peer_node_id) {
50+
queue.into()
51+
} else {
52+
Vec::new()
53+
}
54+
}
55+
56+
pub(crate) fn is_empty(&self) -> bool {
57+
let map = self.map.lock().unwrap();
58+
map.is_empty()
59+
}
60+
}
61+
62+
#[cfg(test)]
63+
mod tests {
64+
use bitcoin::{
65+
key::Secp256k1,
66+
secp256k1::{PublicKey, SecretKey},
67+
};
68+
use lightning::onion_message;
69+
70+
use crate::om_mailbox::OnionMessageMailbox;
71+
72+
#[test]
73+
fn onion_message_mailbox() {
74+
let mailbox = OnionMessageMailbox::new();
75+
76+
let secp = Secp256k1::new();
77+
let sk_bytes = [12; 32];
78+
let sk = SecretKey::from_slice(&sk_bytes).unwrap();
79+
let peer_node_id = PublicKey::from_secret_key(&secp, &sk);
80+
81+
let blinding_sk = SecretKey::from_slice(&[13; 32]).unwrap();
82+
let blinding_point = PublicKey::from_secret_key(&secp, &blinding_sk);
83+
84+
let message_sk = SecretKey::from_slice(&[13; 32]).unwrap();
85+
let message_point = PublicKey::from_secret_key(&secp, &message_sk);
86+
87+
let message = lightning::ln::msgs::OnionMessage {
88+
blinding_point,
89+
onion_routing_packet: onion_message::packet::Packet {
90+
version: 0,
91+
public_key: message_point,
92+
hop_data: vec![1, 2, 3],
93+
hmac: [0; 32],
94+
},
95+
};
96+
mailbox.onion_message_intercepted(peer_node_id, message.clone());
97+
98+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
99+
assert_eq!(messages.len(), 1);
100+
assert_eq!(messages[0], message);
101+
102+
assert!(mailbox.is_empty());
103+
104+
let messages = mailbox.onion_message_peer_connected(peer_node_id);
105+
assert_eq!(messages.len(), 0);
106+
}
107+
}

tests/common/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,14 +589,21 @@ pub(crate) fn bump_fee_and_broadcast<E: ElectrumApi>(
589589
pub fn open_channel(
590590
node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool,
591591
electrsd: &ElectrsD,
592+
) -> OutPoint {
593+
open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd)
594+
}
595+
596+
pub fn open_channel_push_amt(
597+
node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option<u64>,
598+
should_announce: bool, electrsd: &ElectrsD,
592599
) -> OutPoint {
593600
if should_announce {
594601
node_a
595602
.open_announced_channel(
596603
node_b.node_id(),
597604
node_b.listening_addresses().unwrap().first().unwrap().clone(),
598605
funding_amount_sat,
599-
None,
606+
push_amount_msat,
600607
None,
601608
)
602609
.unwrap();
@@ -606,7 +613,7 @@ pub fn open_channel(
606613
node_b.node_id(),
607614
node_b.listening_addresses().unwrap().first().unwrap().clone(),
608615
funding_amount_sat,
609-
None,
616+
push_amount_msat,
610617
None,
611618
)
612619
.unwrap();

0 commit comments

Comments
 (0)