Skip to content

Commit 060fbf1

Browse files
committed
Batch commitment_signed messages in PeerManager
During splicing, commitment_signed messages need to be collected into a single batch before they are handled. Rather than including this as part of the channel state machine logic, batch when reading messages from the wire since they can be considered one logical message.
1 parent eed614b commit 060fbf1

File tree

1 file changed

+60
-0
lines changed

1 file changed

+60
-0
lines changed

lightning/src/ln/peer_handler.rs

+60
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with
1616
//! messages they should handle, and encoding/sending response messages.
1717
18+
use bitcoin::Txid;
1819
use bitcoin::constants::ChainHash;
1920
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
2021

@@ -41,6 +42,8 @@ use crate::util::string::PrintableString;
4142
#[allow(unused_imports)]
4243
use crate::prelude::*;
4344

45+
use alloc::collections::{btree_map, BTreeMap};
46+
4447
use crate::io;
4548
use crate::sync::{Mutex, MutexGuard, FairRwLock};
4649
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
@@ -608,6 +611,8 @@ struct Peer {
608611
received_channel_announce_since_backlogged: bool,
609612

610613
inbound_connection: bool,
614+
615+
commitment_signed_batch: Option<(ChannelId, BTreeMap<Txid, msgs::CommitmentSigned>)>,
611616
}
612617

613618
impl Peer {
@@ -860,6 +865,7 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
860865

861866
enum LogicalMessage<T: core::fmt::Debug + wire::Type + wire::TestEq> {
862867
FromWire(wire::Message<T>),
868+
CommitmentSignedBatch(ChannelId, BTreeMap<Txid, msgs::CommitmentSigned>),
863869
}
864870

865871
enum MessageHandlingError {
@@ -1144,6 +1150,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11441150

11451151
received_channel_announce_since_backlogged: false,
11461152
inbound_connection: false,
1153+
1154+
commitment_signed_batch: None,
11471155
}));
11481156
Ok(res)
11491157
}
@@ -1200,6 +1208,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
12001208

12011209
received_channel_announce_since_backlogged: false,
12021210
inbound_connection: true,
1211+
1212+
commitment_signed_batch: None,
12031213
}));
12041214
Ok(())
12051215
}
@@ -1653,6 +1663,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
16531663
Some(LogicalMessage::FromWire(message)) => {
16541664
self.do_handle_message_without_peer_lock(peer_mutex, message, their_node_id, &logger)
16551665
},
1666+
Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)) => {
1667+
log_trace!(logger, "Received commitment_signed batch {:?} from {}", batch, log_pubkey!(their_node_id));
1668+
self.message_handler.chan_handler.handle_commitment_signed_batch(their_node_id, channel_id, batch);
1669+
return Ok(None);
1670+
},
16561671
None => Ok(None),
16571672
}
16581673
}
@@ -1747,6 +1762,51 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17471762
return Err(PeerHandleError { }.into());
17481763
}
17491764

1765+
// During splicing, commitment_signed messages need to be collected into a single batch
1766+
// before they are handled.
1767+
if let wire::Message::CommitmentSigned(msg) = message {
1768+
if let Some(ref batch) = msg.batch {
1769+
let (channel_id, buffer) = peer_lock
1770+
.commitment_signed_batch
1771+
.get_or_insert_with(|| (msg.channel_id, BTreeMap::new()));
1772+
1773+
if msg.channel_id != *channel_id {
1774+
log_debug!(logger, "Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id);
1775+
return Err(PeerHandleError { }.into());
1776+
}
1777+
1778+
const COMMITMENT_SIGNED_BATCH_LIMIT: usize = 100;
1779+
if buffer.len() == COMMITMENT_SIGNED_BATCH_LIMIT {
1780+
log_debug!(logger, "Peer {} sent batched commitment_signed for channel {} exceeding the limit", log_pubkey!(their_node_id), channel_id);
1781+
return Err(PeerHandleError { }.into());
1782+
}
1783+
1784+
let batch_size = batch.batch_size as usize;
1785+
match buffer.entry(batch.funding_txid) {
1786+
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
1787+
btree_map::Entry::Occupied(_) => {
1788+
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), channel_id, &batch.funding_txid);
1789+
return Err(PeerHandleError { }.into());
1790+
}
1791+
}
1792+
1793+
if buffer.len() >= batch_size {
1794+
let (channel_id, batch) = peer_lock.commitment_signed_batch.take().expect("batch should have been inserted");
1795+
return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch)));
1796+
} else {
1797+
return Ok(None);
1798+
}
1799+
} else if peer_lock.commitment_signed_batch.is_some() {
1800+
log_debug!(logger, "Peer {} sent non-batched commitment_signed for channel {} when expecting batched commitment_signed", log_pubkey!(their_node_id), &msg.channel_id);
1801+
return Err(PeerHandleError { }.into());
1802+
} else {
1803+
return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg))));
1804+
}
1805+
} else if peer_lock.commitment_signed_batch.is_some() {
1806+
log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id));
1807+
return Err(PeerHandleError { }.into());
1808+
}
1809+
17501810
if let wire::Message::GossipTimestampFilter(_msg) = message {
17511811
// When supporting gossip messages, start initial gossip sync only after we receive
17521812
// a GossipTimestampFilter

0 commit comments

Comments
 (0)