Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trivial PeerManager cleanups #2249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
chan_handler: channelmanager.clone(),
route_handler: gossip_sync.clone(),
onion_message_handler: IgnoringMessageHandler {},
}, 0, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}, keys_manager.clone()));
custom_message_handler: IgnoringMessageHandler {},
}, 0, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), keys_manager.clone()));

let mut should_forward = false;
let mut payments_received: Vec<PaymentHash> = Vec::new();
Expand Down
43 changes: 16 additions & 27 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use lightning::events::{Event, PathFailure};
#[cfg(feature = "std")]
use lightning::events::{EventHandler, EventsProvider};
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::ln::peer_handler::APeerManager;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::UtxoLookup;
use lightning::routing::router::Router;
Expand Down Expand Up @@ -81,6 +80,8 @@ use alloc::vec::Vec;
///
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::events::Event
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
#[cfg(feature = "std")]
#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
pub struct BackgroundProcessor {
Expand Down Expand Up @@ -295,7 +296,7 @@ macro_rules! define_run_body {
// ChannelManager, we want to minimize methods blocking on a ChannelManager
// generally, and as a fallback place such blocking only immediately before
// persistence.
$peer_manager.process_events();
$peer_manager.as_ref().process_events();

// Exit the loop if the background processor was requested to stop.
if $loop_exit_check {
Expand Down Expand Up @@ -340,11 +341,11 @@ macro_rules! define_run_body {
// more than a handful of seconds to complete, and shouldn't disconnect all our
// peers.
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
$peer_manager.disconnect_all_peers();
$peer_manager.as_ref().disconnect_all_peers();
last_ping_call = $get_timer(PING_TIMER);
} else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
$peer_manager.timer_tick_occurred();
$peer_manager.as_ref().timer_tick_occurred();
last_ping_call = $get_timer(PING_TIMER);
}

Expand Down Expand Up @@ -578,19 +579,15 @@ pub async fn process_events_async<
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
OMH: 'static + Deref + Send + Sync,
EventHandlerFuture: core::future::Future<Output = ()>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
APM: APeerManager + Send + Sync,
PM: 'static + Deref<Target = APM> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Expand All @@ -612,10 +609,6 @@ where
R::Target: 'static + Router,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
CMH::Target: 'static + ChannelMessageHandler,
OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
{
let mut should_break = false;
Expand Down Expand Up @@ -721,18 +714,14 @@ impl BackgroundProcessor {
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
OMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
APM: APeerManager + Send + Sync,
PM: 'static + Deref<Target = APM> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for <'b> WriteableScore<'b>,
>(
Expand All @@ -751,10 +740,6 @@ impl BackgroundProcessor {
R::Target: 'static + Router,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
CMH::Target: 'static + ChannelMessageHandler,
OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
{
let stop_thread = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -1140,8 +1125,12 @@ mod tests {
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
let msg_handler = MessageHandler {
chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()),
route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
};
let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
nodes.push(node);
}
Expand Down
124 changes: 29 additions & 95 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ use tokio::{io, time};
use tokio::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};

use lightning::chain::keysinterface::NodeSigner;
use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, OnionMessageHandler, RoutingMessageHandler};
use lightning::util::logger::Logger;
use lightning::ln::peer_handler::APeerManager;
use lightning::ln::msgs::NetAddress;

use std::ops::Deref;
use std::task;
Expand Down Expand Up @@ -80,53 +78,25 @@ struct Connection {
id: u64,
}
impl Connection {
async fn poll_event_process<PM, CMH, RMH, OMH, L, UMH, NS>(
async fn poll_event_process<PM: Deref + 'static + Send + Sync>(
peer_manager: PM,
mut event_receiver: mpsc::Receiver<()>,
) where
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
NS: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
NS::Target: NodeSigner + Send + Sync,
{
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
loop {
if event_receiver.recv().await.is_none() {
return;
}
peer_manager.process_events();
peer_manager.as_ref().process_events();
}
}

async fn schedule_read<PM, CMH, RMH, OMH, L, UMH, NS>(
async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
peer_manager: PM,
us: Arc<Mutex<Self>>,
mut reader: io::ReadHalf<TcpStream>,
mut read_wake_receiver: mpsc::Receiver<()>,
mut write_avail_receiver: mpsc::Receiver<()>,
) where
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
NS: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
OMH::Target: OnionMessageHandler + 'static + Send + Sync,
L::Target: Logger + 'static + Send + Sync,
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
NS::Target: NodeSigner + 'static + Send + Sync,
{
) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
// Create a waker to wake up poll_event_process, above
let (event_waker, event_receiver) = mpsc::channel(1);
tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver));
Expand Down Expand Up @@ -160,15 +130,15 @@ impl Connection {
tokio::select! {
v = write_avail_receiver.recv() => {
assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
if peer_manager.write_buffer_space_avail(&mut our_descriptor).is_err() {
if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() {
break Disconnect::CloseConnection;
}
},
_ = read_wake_receiver.recv() => {},
read = reader.read(&mut buf), if !read_paused => match read {
Ok(0) => break Disconnect::PeerDisconnected,
Ok(len) => {
let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]);
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
let mut us_lock = us.lock().unwrap();
match read_res {
Ok(pause_read) => {
Expand Down Expand Up @@ -197,8 +167,8 @@ impl Connection {
let _ = writer.shutdown().await;
}
if let Disconnect::PeerDisconnected = disconnect_type {
peer_manager.socket_disconnected(&our_descriptor);
peer_manager.process_events();
peer_manager.as_ref().socket_disconnected(&our_descriptor);
peer_manager.as_ref().process_events();
}
}

Expand Down Expand Up @@ -245,30 +215,17 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
pub fn setup_inbound<PM, CMH, RMH, OMH, L, UMH, NS>(
pub fn setup_inbound<PM: Deref + 'static + Send + Sync + Clone>(
peer_manager: PM,
stream: StdTcpStream,
) -> impl std::future::Future<Output=()> where
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
NS: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
NS::Target: NodeSigner + Send + Sync,
{
) -> impl std::future::Future<Output=()>
where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
let remote_addr = get_addr_from_stream(&stream);
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(test)]
let last_us = Arc::clone(&us);

let handle_opt = if peer_manager.new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() {
let handle_opt = if peer_manager.as_ref().new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() {
Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)))
} else {
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
Expand Down Expand Up @@ -300,30 +257,17 @@ pub fn setup_inbound<PM, CMH, RMH, OMH, L, UMH, NS>(
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
pub fn setup_outbound<PM: Deref + 'static + Send + Sync + Clone>(
peer_manager: PM,
their_node_id: PublicKey,
stream: StdTcpStream,
) -> impl std::future::Future<Output=()> where
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
NS: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
NS::Target: NodeSigner + Send + Sync,
{
) -> impl std::future::Future<Output=()>
where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
let remote_addr = get_addr_from_stream(&stream);
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(test)]
let last_us = Arc::clone(&us);
let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
Some(tokio::spawn(async move {
// We should essentially always have enough room in a TCP socket buffer to send the
// initial 10s of bytes. However, tokio running in single-threaded mode will always
Expand All @@ -342,7 +286,7 @@ pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
},
_ => {
eprintln!("Failed to write first full message to socket!");
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
peer_manager.as_ref().socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
break Err(());
}
}
Expand Down Expand Up @@ -385,25 +329,12 @@ pub fn setup_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
/// disconnected and associated handling futures are freed, though, because all processing in said
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
/// make progress.
pub async fn connect_outbound<PM, CMH, RMH, OMH, L, UMH, NS>(
pub async fn connect_outbound<PM: Deref + 'static + Send + Sync + Clone>(
peer_manager: PM,
their_node_id: PublicKey,
addr: SocketAddr,
) -> Option<impl std::future::Future<Output=()>> where
PM: Deref<Target = peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH, NS>> + 'static + Send + Sync + Clone,
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
NS: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
NS::Target: NodeSigner + Send + Sync,
{
) -> Option<impl std::future::Future<Output=()>>
where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
Some(setup_outbound(peer_manager, their_node_id, stream))
} else { None }
Expand Down Expand Up @@ -659,7 +590,8 @@ mod tests {
chan_handler: Arc::clone(&a_handler),
route_handler: Arc::clone(&a_handler),
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), Arc::new(TestNodeSigner::new(a_key))));
custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));

let (b_connected_sender, mut b_connected) = mpsc::channel(1);
let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
Expand All @@ -674,7 +606,8 @@ mod tests {
chan_handler: Arc::clone(&b_handler),
route_handler: Arc::clone(&b_handler),
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), Arc::new(TestNodeSigner::new(b_key))));
custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key))));

// We bind on localhost, hoping the environment is properly configured with a local
// address. This may not always be the case in containers and the like, so if this test is
Expand Down Expand Up @@ -727,7 +660,8 @@ mod tests {
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), Arc::new(TestNodeSigner::new(a_key))));
custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));

// Make two connections, one for an inbound and one for an outbound connection
let conn_a = {
Expand Down
Loading