From 9c360eb9c968e3168e805fa8fe3a21e5d3e97e08 Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Wed, 22 Jan 2025 20:17:05 +0100 Subject: [PATCH] chore: Move to service API (#1075) --- crates/test/src/lib.rs | 21 +++++++++------------ src/cli.rs | 2 ++ src/cli/agent.rs | 3 +-- src/cli/manage.rs | 4 +--- src/cli/relay.rs | 8 ++------ src/components/agent.rs | 9 ++++++--- src/components/manage.rs | 17 +++++------------ src/components/relay.rs | 32 +++++++++++--------------------- 8 files changed, 37 insertions(+), 59 deletions(-) diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs index 89431db513..30040cb602 100644 --- a/crates/test/src/lib.rs +++ b/crates/test/src/lib.rs @@ -307,11 +307,8 @@ impl Pail { PailConfig::Relay(rpc) => { use components::relay; - let xds_listener = TcpListener::bind(None).unwrap(); - let mds_listener = TcpListener::bind(None).unwrap(); - - let xds_port = xds_listener.port(); - let mds_port = mds_listener.port(); + let xds_port = TcpListener::bind(None).unwrap().port(); + let mds_port = TcpListener::bind(None).unwrap().port(); let path = td.join(spc.name); let mut tc = rpc.config.unwrap_or_default(); @@ -328,8 +325,8 @@ impl Pail { let task = tokio::spawn( relay::Relay { - xds_listener, - mds_listener, + xds_port, + mds_port, provider: Some(Providers::File { path }), } .run(RunArgs { @@ -392,9 +389,9 @@ impl Pail { let (shutdown, shutdown_rx) = quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing); - let qcmp_socket = - quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"); - let qcmp_port = quilkin::net::socket_port(&qcmp_socket); + let port = quilkin::net::socket_port( + &quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"), + ); let config_path = path.clone(); let config = Arc::new(Config::default_agent()); @@ -406,7 +403,7 @@ impl Pail { locality: None, icao_code: Some(apc.icao_code), relay_servers, - qcmp_socket, + port, provider: Some(Providers::File { path }), address_selector: None, } @@ -419,7 +416,7 @@ impl Pail { }); Self::Agent(AgentPail { - qcmp_port, + qcmp_port: port, task, shutdown, config_file: Some(ConfigFile { diff --git a/src/cli.rs b/src/cli.rs index 5698f3ee5e..b6f5f3798d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -297,6 +297,8 @@ impl Cli { shutdown_tx.send(crate::ShutdownKind::Normal).ok(); }); + self.service.spawn_services(&config, &shutdown_rx)?; + match (self.command, mode) { (Commands::Agent(agent), Admin::Agent(ready)) => { agent.run(locality, config, ready, shutdown_rx).await diff --git a/src/cli/agent.rs b/src/cli/agent.rs index b1678b7c0a..0ffb2b4e5c 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -85,12 +85,11 @@ impl Agent { ready: Ready, shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let qcmp_socket = crate::net::raw_socket_with_reuse(self.qcmp_port)?; let icao_code = Some(self.icao_code); agent::Agent { locality, - qcmp_socket, + port: self.qcmp_port, icao_code, relay_servers: self.relay, provider: self.provider, diff --git a/src/cli/manage.rs b/src/cli/manage.rs index 32790dbb95..0575852664 100644 --- a/src/cli/manage.rs +++ b/src/cli/manage.rs @@ -50,13 +50,11 @@ impl Manage { ready: Ready, shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let listener = crate::net::TcpListener::bind(Some(self.port))?; - manage::Manage { locality, + port: self.port, provider: self.provider, relay_servers: self.relay, - listener, address_selector: self.address_type.map(|at| crate::config::AddressSelector { name: at, kind: self.ip_kind, diff --git a/src/cli/relay.rs b/src/cli/relay.rs index b146c7a341..a9d46e7015 100644 --- a/src/cli/relay.rs +++ b/src/cli/relay.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use crate::{ components::relay, config::{Config, Providers}, - net::TcpListener, }; pub use relay::Ready; @@ -63,12 +62,9 @@ impl Relay { ready: Ready, shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let xds_listener = TcpListener::bind(Some(self.xds_port))?; - let mds_listener = TcpListener::bind(Some(self.mds_port))?; - relay::Relay { - xds_listener, - mds_listener, + xds_port: self.xds_port, + mds_port: self.mds_port, provider: self.providers, } .run(crate::components::RunArgs { diff --git a/src/components/agent.rs b/src/components/agent.rs index 634a113b2a..cfd3528de2 100644 --- a/src/components/agent.rs +++ b/src/components/agent.rs @@ -40,7 +40,7 @@ impl Ready { pub struct Agent { pub locality: Option, - pub qcmp_socket: socket2::Socket, + pub port: u16, pub icao_code: Option, pub relay_servers: Vec, pub provider: Option, @@ -66,7 +66,7 @@ impl Agent { unreachable!("this should be an agent config"); }; - qcmp_port.store(crate::net::socket_port(&self.qcmp_socket).into()); + qcmp_port.store(self.port.into()); icao_code.store(self.icao_code.unwrap_or_default().into()); } @@ -102,7 +102,10 @@ impl Agent { None }; - crate::codec::qcmp::spawn(self.qcmp_socket, shutdown_rx.clone())?; + crate::cli::Service::default() + .qcmp() + .qcmp_port(self.port) + .spawn_services(&config, &shutdown_rx)?; shutdown_rx.changed().await.map_err(From::from) } } diff --git a/src/components/manage.rs b/src/components/manage.rs index b9811b5716..0f360879b7 100644 --- a/src/components/manage.rs +++ b/src/components/manage.rs @@ -25,7 +25,7 @@ pub struct Manage { pub locality: Option, pub relay_servers: Vec, pub provider: Providers, - pub listener: crate::net::TcpListener, + pub port: u16, pub address_selector: Option, } @@ -73,19 +73,12 @@ impl Manage { None }; - use futures::TryFutureExt as _; - let server_task = tokio::spawn( - crate::net::xds::server::ControlPlane::from_arc( - config, - crate::components::admin::IDLE_REQUEST_INTERVAL, - ) - .management_server(self.listener)?, - ) - .map_err(From::from) - .and_then(std::future::ready); + crate::cli::Service::default() + .xds() + .xds_port(self.port) + .spawn_services(&config, &shutdown_rx)?; tokio::select! { - result = server_task => result, result = provider_task => result?, result = shutdown_rx.changed() => result.map_err(From::from), } diff --git a/src/components/relay.rs b/src/components/relay.rs index ff11bbca03..cf315a7d09 100644 --- a/src/components/relay.rs +++ b/src/components/relay.rs @@ -15,7 +15,7 @@ */ use super::RunArgs; -use crate::{config::Providers, net::TcpListener}; +use crate::config::Providers; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -44,8 +44,8 @@ impl Ready { } pub struct Relay { - pub xds_listener: TcpListener, - pub mds_listener: TcpListener, + pub xds_port: u16, + pub mds_port: u16, pub provider: Option, } @@ -59,15 +59,6 @@ impl Relay { mut shutdown_rx, }: RunArgs, ) -> crate::Result<()> { - use crate::net::xds::server::ControlPlane; - - let xds_server = ControlPlane::from_arc(config.clone(), ready.idle_request_interval) - .management_server(self.xds_listener)?; - let mds_server = tokio::spawn( - ControlPlane::from_arc(config.clone(), ready.idle_request_interval) - .relay_server(self.mds_listener)?, - ); - let _provider_task = self.provider.map(|provider| { let config = config.clone(); let provider_is_healthy = ready.provider_is_healthy.clone(); @@ -134,14 +125,13 @@ impl Relay { } }); - tokio::select! { - result = xds_server => { - result - } - result = mds_server => { - result? - } - result = shutdown_rx.changed() => result.map_err(From::from), - } + crate::cli::Service::default() + .xds() + .xds_port(self.xds_port) + .mds() + .mds_port(self.mds_port) + .spawn_services(&config, &shutdown_rx)?; + + shutdown_rx.changed().await.map_err(From::from) } }