Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 8ec1ddf

Browse files
committedApr 30, 2023
Expose a trait impl'd for all PeerManager for use as a bound
A while back, in tests, we added a `AChannelManager` trait, which is implemented for all `ChannelManager`s, and can be used as a bound when we need a `ChannelManager`, rather than having to duplicate all the bounds of `ChannelManager` everywhere. Here we do the same thing for `PeerManager`, but make it public and use it to clean up `lightning-net-tokio` and `lightning-background-processor`. We should likely do the same for `AChannelManager`, but that's left as a followup.
1 parent 524981d commit 8ec1ddf

File tree

3 files changed

+81
-117
lines changed

3 files changed

+81
-117
lines changed
 

‎lightning-background-processor/src/lib.rs

+10-25
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ use lightning::events::{Event, PathFailure};
3030
#[cfg(feature = "std")]
3131
use lightning::events::{EventHandler, EventsProvider};
3232
use lightning::ln::channelmanager::ChannelManager;
33-
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34-
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
33+
use lightning::ln::peer_handler::APeerManager;
3534
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3635
use lightning::routing::utxo::UtxoLookup;
3736
use lightning::routing::router::Router;
@@ -81,6 +80,8 @@ use alloc::vec::Vec;
8180
///
8281
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
8382
/// [`Event`]: lightning::events::Event
83+
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
84+
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
8485
#[cfg(feature = "std")]
8586
#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
8687
pub struct BackgroundProcessor {
@@ -295,7 +296,7 @@ macro_rules! define_run_body {
295296
// ChannelManager, we want to minimize methods blocking on a ChannelManager
296297
// generally, and as a fallback place such blocking only immediately before
297298
// persistence.
298-
$peer_manager.process_events();
299+
$peer_manager.pm().process_events();
299300

300301
// Exit the loop if the background processor was requested to stop.
301302
if $loop_exit_check {
@@ -340,11 +341,11 @@ macro_rules! define_run_body {
340341
// more than a handful of seconds to complete, and shouldn't disconnect all our
341342
// peers.
342343
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
343-
$peer_manager.disconnect_all_peers();
344+
$peer_manager.pm().disconnect_all_peers();
344345
last_ping_call = $get_timer(PING_TIMER);
345346
} else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
346347
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
347-
$peer_manager.timer_tick_occurred();
348+
$peer_manager.pm().timer_tick_occurred();
348349
last_ping_call = $get_timer(PING_TIMER);
349350
}
350351

@@ -578,19 +579,15 @@ pub async fn process_events_async<
578579
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
579580
L: 'static + Deref + Send + Sync,
580581
P: 'static + Deref + Send + Sync,
581-
Descriptor: 'static + SocketDescriptor + Send + Sync,
582-
CMH: 'static + Deref + Send + Sync,
583-
RMH: 'static + Deref + Send + Sync,
584-
OMH: 'static + Deref + Send + Sync,
585582
EventHandlerFuture: core::future::Future<Output = ()>,
586583
EventHandler: Fn(Event) -> EventHandlerFuture,
587584
PS: 'static + Deref + Send,
588585
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
589586
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
590587
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
591588
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
592-
UMH: 'static + Deref + Send + Sync,
593-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
589+
APM: APeerManager + Send + Sync,
590+
PM: 'static + Deref<Target = APM> + Send + Sync,
594591
S: 'static + Deref<Target = SC> + Send + Sync,
595592
SC: for<'b> WriteableScore<'b>,
596593
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -612,10 +609,6 @@ where
612609
R::Target: 'static + Router,
613610
L::Target: 'static + Logger,
614611
P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
615-
CMH::Target: 'static + ChannelMessageHandler,
616-
OMH::Target: 'static + OnionMessageHandler,
617-
RMH::Target: 'static + RoutingMessageHandler,
618-
UMH::Target: 'static + CustomMessageHandler,
619612
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
620613
{
621614
let mut should_break = false;
@@ -721,18 +714,14 @@ impl BackgroundProcessor {
721714
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
722715
L: 'static + Deref + Send + Sync,
723716
P: 'static + Deref + Send + Sync,
724-
Descriptor: 'static + SocketDescriptor + Send + Sync,
725-
CMH: 'static + Deref + Send + Sync,
726-
OMH: 'static + Deref + Send + Sync,
727-
RMH: 'static + Deref + Send + Sync,
728717
EH: 'static + EventHandler + Send,
729718
PS: 'static + Deref + Send,
730719
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
731720
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
732721
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
733722
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
734-
UMH: 'static + Deref + Send + Sync,
735-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
723+
APM: APeerManager + Send + Sync,
724+
PM: 'static + Deref<Target = APM> + Send + Sync,
736725
S: 'static + Deref<Target = SC> + Send + Sync,
737726
SC: for <'b> WriteableScore<'b>,
738727
>(
@@ -751,10 +740,6 @@ impl BackgroundProcessor {
751740
R::Target: 'static + Router,
752741
L::Target: 'static + Logger,
753742
P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
754-
CMH::Target: 'static + ChannelMessageHandler,
755-
OMH::Target: 'static + OnionMessageHandler,
756-
RMH::Target: 'static + RoutingMessageHandler,
757-
UMH::Target: 'static + CustomMessageHandler,
758743
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
759744
{
760745
let stop_thread = Arc::new(AtomicBool::new(false));

‎lightning-net-tokio/src/lib.rs

+23-92
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,10 @@ use tokio::{io, time};
3636
use tokio::sync::mpsc;
3737
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
3838

39-
use lightning::chain::keysinterface::NodeSigner;
4039
use lightning::ln::peer_handler;
4140
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
42-
use lightning::ln::peer_handler::CustomMessageHandler;
43-
use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, OnionMessageHandler, RoutingMessageHandler};
44-
use lightning::util::logger::Logger;
41+
use lightning::ln::peer_handler::APeerManager;
42+
use lightning::ln::msgs::NetAddress;
4543

4644
use std::ops::Deref;
4745
use std::task;
@@ -80,53 +78,25 @@ struct Connection {
8078
id: u64,
8179
}
8280
impl Connection {
83-
async fn poll_event_process<PM, CMH, RMH, OMH, L, UMH, NS>(
81+
async fn poll_event_process<PM: Deref + 'static + Send + Sync>(
8482
peer_manager: PM,
8583
mut event_receiver: mpsc::Receiver<()>,
86-
) where
87-
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync,
88-
CMH: Deref + 'static + Send + Sync,
89-
RMH: Deref + 'static + Send + Sync,
90-
OMH: Deref + 'static + Send + Sync,
91-
L: Deref + 'static + Send + Sync,
92-
UMH: Deref + 'static + Send + Sync,
93-
NS: Deref + 'static + Send + Sync,
94-
CMH::Target: ChannelMessageHandler + Send + Sync,
95-
RMH::Target: RoutingMessageHandler + Send + Sync,
96-
OMH::Target: OnionMessageHandler + Send + Sync,
97-
L::Target: Logger + Send + Sync,
98-
UMH::Target: CustomMessageHandler + Send + Sync,
99-
NS::Target: NodeSigner + Send + Sync,
100-
{
84+
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
10185
loop {
10286
if event_receiver.recv().await.is_none() {
10387
return;
10488
}
105-
peer_manager.process_events();
89+
peer_manager.pm().process_events();
10690
}
10791
}
10892

109-
async fn schedule_read<PM, CMH, RMH, OMH, L, UMH, NS>(
93+
async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
11094
peer_manager: PM,
11195
us: Arc<Mutex<Self>>,
11296
mut reader: io::ReadHalf<TcpStream>,
11397
mut read_wake_receiver: mpsc::Receiver<()>,
11498
mut write_avail_receiver: mpsc::Receiver<()>,
115-
) where
116-
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
117-
CMH: Deref + 'static + Send + Sync,
118-
RMH: Deref + 'static + Send + Sync,
119-
OMH: Deref + 'static + Send + Sync,
120-
L: Deref + 'static + Send + Sync,
121-
UMH: Deref + 'static + Send + Sync,
122-
NS: Deref + 'static + Send + Sync,
123-
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
124-
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
125-
OMH::Target: OnionMessageHandler + 'static + Send + Sync,
126-
L::Target: Logger + 'static + Send + Sync,
127-
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
128-
NS::Target: NodeSigner + 'static + Send + Sync,
129-
{
99+
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
130100
// Create a waker to wake up poll_event_process, above
131101
let (event_waker, event_receiver) = mpsc::channel(1);
132102
tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver));
@@ -160,15 +130,15 @@ impl Connection {
160130
tokio::select! {
161131
v = write_avail_receiver.recv() => {
162132
assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
163-
if peer_manager.write_buffer_space_avail(&mut our_descriptor).is_err() {
133+
if peer_manager.pm().write_buffer_space_avail(&mut our_descriptor).is_err() {
164134
break Disconnect::CloseConnection;
165135
}
166136
},
167137
_ = read_wake_receiver.recv() => {},
168138
read = reader.read(&mut buf), if !read_paused => match read {
169139
Ok(0) => break Disconnect::PeerDisconnected,
170140
Ok(len) => {
171-
let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]);
141+
let read_res = peer_manager.pm().read_event(&mut our_descriptor, &buf[0..len]);
172142
let mut us_lock = us.lock().unwrap();
173143
match read_res {
174144
Ok(pause_read) => {
@@ -197,8 +167,8 @@ impl Connection {
197167
let _ = writer.shutdown().await;
198168
}
199169
if let Disconnect::PeerDisconnected = disconnect_type {
200-
peer_manager.socket_disconnected(&our_descriptor);
201-
peer_manager.process_events();
170+
peer_manager.pm().socket_disconnected(&our_descriptor);
171+
peer_manager.pm().process_events();
202172
}
203173
}
204174

@@ -245,30 +215,17 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
245215
/// The returned future will complete when the peer is disconnected and associated handling
246216
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
247217
/// not need to poll the provided future in order to make progress.
248-
pub fn setup_inbound<PM, CMH, RMH, OMH, L, UMH, NS>(
218+
pub fn setup_inbound<PM: Deref + 'static + Send + Sync + Clone>(
249219
peer_manager: PM,
250220
stream: StdTcpStream,
251-
) -> impl std::future::Future<Output=()> where
252-
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
253-
CMH: Deref + 'static + Send + Sync,
254-
RMH: Deref + 'static + Send + Sync,
255-
OMH: Deref + 'static + Send + Sync,
256-
L: Deref + 'static + Send + Sync,
257-
UMH: Deref + 'static + Send + Sync,
258-
NS: Deref + 'static + Send + Sync,
259-
CMH::Target: ChannelMessageHandler + Send + Sync,
260-
RMH::Target: RoutingMessageHandler + Send + Sync,
261-
OMH::Target: OnionMessageHandler + Send + Sync,
262-
L::Target: Logger + Send + Sync,
263-
UMH::Target: CustomMessageHandler + Send + Sync,
264-
NS::Target: NodeSigner + Send + Sync,
265-
{
221+
) -> impl std::future::Future<Output=()>
222+
where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
266223
let remote_addr = get_addr_from_stream(&stream);
267224
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
268225
#[cfg(test)]
269226
let last_us = Arc::clone(&us);
270227

271-
let handle_opt = if peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() {
228+
let handle_opt = if peer_manager.pm().new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() {
272229
Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)))
273230
} else {
274231
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
@@ -300,30 +257,17 @@ pub fn setup_inbound<PM, CMH, RMH, OMH, L, UMH, NS>(
300257
/// The returned future will complete when the peer is disconnected and associated handling
301258
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
302259
/// not need to poll the provided future in order to make progress.
303-
pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
260+
pub fn setup_outbound<PM: Deref + 'static + Send + Sync + Clone>(
304261
peer_manager: PM,
305262
their_node_id: PublicKey,
306263
stream: StdTcpStream,
307-
) -> impl std::future::Future<Output=()> where
308-
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
309-
CMH: Deref + 'static + Send + Sync,
310-
RMH: Deref + 'static + Send + Sync,
311-
OMH: Deref + 'static + Send + Sync,
312-
L: Deref + 'static + Send + Sync,
313-
UMH: Deref + 'static + Send + Sync,
314-
NS: Deref + 'static + Send + Sync,
315-
CMH::Target: ChannelMessageHandler + Send + Sync,
316-
RMH::Target: RoutingMessageHandler + Send + Sync,
317-
OMH::Target: OnionMessageHandler + Send + Sync,
318-
L::Target: Logger + Send + Sync,
319-
UMH::Target: CustomMessageHandler + Send + Sync,
320-
NS::Target: NodeSigner + Send + Sync,
321-
{
264+
) -> impl std::future::Future<Output=()>
265+
where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
322266
let remote_addr = get_addr_from_stream(&stream);
323267
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
324268
#[cfg(test)]
325269
let last_us = Arc::clone(&us);
326-
let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
270+
let handle_opt = if let Ok(initial_send) = peer_manager.pm().new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
327271
Some(tokio::spawn(async move {
328272
// We should essentially always have enough room in a TCP socket buffer to send the
329273
// initial 10s of bytes. However, tokio running in single-threaded mode will always
@@ -342,7 +286,7 @@ pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
342286
},
343287
_ => {
344288
eprintln!("Failed to write first full message to socket!");
345-
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
289+
peer_manager.pm().socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
346290
break Err(());
347291
}
348292
}
@@ -385,25 +329,12 @@ pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
385329
/// disconnected and associated handling futures are freed, though, because all processing in said
386330
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
387331
/// make progress.
388-
pub async fn connect_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
332+
pub async fn connect_outbound<PM: Deref + 'static + Send + Sync + Clone>(
389333
peer_manager: PM,
390334
their_node_id: PublicKey,
391335
addr: SocketAddr,
392-
) -> Option<impl std::future::Future<Output=()>> where
393-
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
394-
CMH: Deref + 'static + Send + Sync,
395-
RMH: Deref + 'static + Send + Sync,
396-
OMH: Deref + 'static + Send + Sync,
397-
L: Deref + 'static + Send + Sync,
398-
UMH: Deref + 'static + Send + Sync,
399-
NS: Deref + 'static + Send + Sync,
400-
CMH::Target: ChannelMessageHandler + Send + Sync,
401-
RMH::Target: RoutingMessageHandler + Send + Sync,
402-
OMH::Target: OnionMessageHandler + Send + Sync,
403-
L::Target: Logger + Send + Sync,
404-
UMH::Target: CustomMessageHandler + Send + Sync,
405-
NS::Target: NodeSigner + Send + Sync,
406-
{
336+
) -> Option<impl std::future::Future<Output=()>>
337+
where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
407338
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
408339
Some(setup_outbound(peer_manager, their_node_id, stream))
409340
} else { None }

‎lightning/src/ln/peer_handler.rs

+48
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,54 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArc
542542
/// This is not exported to bindings users as general type aliases don't make sense in bindings.
543543
pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, 'k, 'l, 'm, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'm, M, T, F, L>, &'f P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'i SimpleRefOnionMessenger<'j, 'k, L>, &'f L, IgnoringMessageHandler, &'c KeysManager>;
544544

545+
546+
/// A generic trait which is implemented for all [`PeerManager`]s. This makes bounding functions or
547+
/// structs on any [`PeerManager`] much simpler as only this trait is needed as a bound, rather
548+
/// than the full set of bounds on [`PeerManager`] itself.
549+
#[allow(missing_docs)]
550+
pub trait APeerManager {
551+
type Descriptor: SocketDescriptor;
552+
type CMT: ChannelMessageHandler + ?Sized;
553+
type CM: Deref<Target=Self::CMT>;
554+
type RMT: RoutingMessageHandler + ?Sized;
555+
type RM: Deref<Target=Self::RMT>;
556+
type OMT: OnionMessageHandler + ?Sized;
557+
type OM: Deref<Target=Self::OMT>;
558+
type LT: Logger + ?Sized;
559+
type L: Deref<Target=Self::LT>;
560+
type CMHT: CustomMessageHandler + ?Sized;
561+
type CMH: Deref<Target=Self::CMHT>;
562+
type NST: NodeSigner + ?Sized;
563+
type NS: Deref<Target=Self::NST>;
564+
/// Gets a reference to the underlying [`PeerManager`].
565+
fn pm(&self) -> &PeerManager<Self::Descriptor, Self::CM, Self::RM, Self::OM, Self::L, Self::CMH, Self::NS>;
566+
}
567+
568+
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref>
569+
APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
570+
CM::Target: ChannelMessageHandler,
571+
RM::Target: RoutingMessageHandler,
572+
OM::Target: OnionMessageHandler,
573+
L::Target: Logger,
574+
CMH::Target: CustomMessageHandler,
575+
NS::Target: NodeSigner,
576+
{
577+
type Descriptor = Descriptor;
578+
type CMT = <CM as Deref>::Target;
579+
type CM = CM;
580+
type RMT = <RM as Deref>::Target;
581+
type RM = RM;
582+
type OMT = <OM as Deref>::Target;
583+
type OM = OM;
584+
type LT = <L as Deref>::Target;
585+
type L = L;
586+
type CMHT = <CMH as Deref>::Target;
587+
type CMH = CMH;
588+
type NST = <NS as Deref>::Target;
589+
type NS = NS;
590+
fn pm(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> { self }
591+
}
592+
545593
/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
546594
/// socket events into messages which it passes on to its [`MessageHandler`].
547595
///

0 commit comments

Comments
 (0)
Please sign in to comment.