Skip to content

Commit

Permalink
chore: Move to service API (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Jan 22, 2025
1 parent 0e4b949 commit 9c360eb
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 59 deletions.
21 changes: 9 additions & 12 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,8 @@ impl Pail {
PailConfig::Relay(rpc) => {
use components::relay;

let xds_listener = TcpListener::bind(None).unwrap();
let mds_listener = TcpListener::bind(None).unwrap();

let xds_port = xds_listener.port();
let mds_port = mds_listener.port();
let xds_port = TcpListener::bind(None).unwrap().port();
let mds_port = TcpListener::bind(None).unwrap().port();

let path = td.join(spc.name);
let mut tc = rpc.config.unwrap_or_default();
Expand All @@ -328,8 +325,8 @@ impl Pail {

let task = tokio::spawn(
relay::Relay {
xds_listener,
mds_listener,
xds_port,
mds_port,
provider: Some(Providers::File { path }),
}
.run(RunArgs {
Expand Down Expand Up @@ -392,9 +389,9 @@ impl Pail {
let (shutdown, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);

let qcmp_socket =
quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket");
let qcmp_port = quilkin::net::socket_port(&qcmp_socket);
let port = quilkin::net::socket_port(
&quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
);

let config_path = path.clone();
let config = Arc::new(Config::default_agent());
Expand All @@ -406,7 +403,7 @@ impl Pail {
locality: None,
icao_code: Some(apc.icao_code),
relay_servers,
qcmp_socket,
port,
provider: Some(Providers::File { path }),
address_selector: None,
}
Expand All @@ -419,7 +416,7 @@ impl Pail {
});

Self::Agent(AgentPail {
qcmp_port,
qcmp_port: port,
task,
shutdown,
config_file: Some(ConfigFile {
Expand Down
2 changes: 2 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ impl Cli {
shutdown_tx.send(crate::ShutdownKind::Normal).ok();
});

self.service.spawn_services(&config, &shutdown_rx)?;

match (self.command, mode) {
(Commands::Agent(agent), Admin::Agent(ready)) => {
agent.run(locality, config, ready, shutdown_rx).await
Expand Down
3 changes: 1 addition & 2 deletions src/cli/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@ impl Agent {
ready: Ready,
shutdown_rx: crate::ShutdownRx,
) -> crate::Result<()> {
let qcmp_socket = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
let icao_code = Some(self.icao_code);

agent::Agent {
locality,
qcmp_socket,
port: self.qcmp_port,
icao_code,
relay_servers: self.relay,
provider: self.provider,
Expand Down
4 changes: 1 addition & 3 deletions src/cli/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ impl Manage {
ready: Ready,
shutdown_rx: crate::ShutdownRx,
) -> crate::Result<()> {
let listener = crate::net::TcpListener::bind(Some(self.port))?;

manage::Manage {
locality,
port: self.port,
provider: self.provider,
relay_servers: self.relay,
listener,
address_selector: self.address_type.map(|at| crate::config::AddressSelector {
name: at,
kind: self.ip_kind,
Expand Down
8 changes: 2 additions & 6 deletions src/cli/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;
use crate::{
components::relay,
config::{Config, Providers},
net::TcpListener,
};
pub use relay::Ready;

Expand Down Expand Up @@ -63,12 +62,9 @@ impl Relay {
ready: Ready,
shutdown_rx: crate::ShutdownRx,
) -> crate::Result<()> {
let xds_listener = TcpListener::bind(Some(self.xds_port))?;
let mds_listener = TcpListener::bind(Some(self.mds_port))?;

relay::Relay {
xds_listener,
mds_listener,
xds_port: self.xds_port,
mds_port: self.mds_port,
provider: self.providers,
}
.run(crate::components::RunArgs {
Expand Down
9 changes: 6 additions & 3 deletions src/components/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Ready {

pub struct Agent {
pub locality: Option<Locality>,
pub qcmp_socket: socket2::Socket,
pub port: u16,
pub icao_code: Option<IcaoCode>,
pub relay_servers: Vec<tonic::transport::Endpoint>,
pub provider: Option<Providers>,
Expand All @@ -66,7 +66,7 @@ impl Agent {
unreachable!("this should be an agent config");
};

qcmp_port.store(crate::net::socket_port(&self.qcmp_socket).into());
qcmp_port.store(self.port.into());
icao_code.store(self.icao_code.unwrap_or_default().into());
}

Expand Down Expand Up @@ -102,7 +102,10 @@ impl Agent {
None
};

crate::codec::qcmp::spawn(self.qcmp_socket, shutdown_rx.clone())?;
crate::cli::Service::default()
.qcmp()
.qcmp_port(self.port)
.spawn_services(&config, &shutdown_rx)?;
shutdown_rx.changed().await.map_err(From::from)
}
}
17 changes: 5 additions & 12 deletions src/components/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct Manage {
pub locality: Option<Locality>,
pub relay_servers: Vec<tonic::transport::Endpoint>,
pub provider: Providers,
pub listener: crate::net::TcpListener,
pub port: u16,
pub address_selector: Option<crate::config::AddressSelector>,
}

Expand Down Expand Up @@ -73,19 +73,12 @@ impl Manage {
None
};

use futures::TryFutureExt as _;
let server_task = tokio::spawn(
crate::net::xds::server::ControlPlane::from_arc(
config,
crate::components::admin::IDLE_REQUEST_INTERVAL,
)
.management_server(self.listener)?,
)
.map_err(From::from)
.and_then(std::future::ready);
crate::cli::Service::default()
.xds()
.xds_port(self.port)
.spawn_services(&config, &shutdown_rx)?;

tokio::select! {
result = server_task => result,
result = provider_task => result?,
result = shutdown_rx.changed() => result.map_err(From::from),
}
Expand Down
32 changes: 11 additions & 21 deletions src/components/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

use super::RunArgs;
use crate::{config::Providers, net::TcpListener};
use crate::config::Providers;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -44,8 +44,8 @@ impl Ready {
}

pub struct Relay {
pub xds_listener: TcpListener,
pub mds_listener: TcpListener,
pub xds_port: u16,
pub mds_port: u16,
pub provider: Option<Providers>,
}

Expand All @@ -59,15 +59,6 @@ impl Relay {
mut shutdown_rx,
}: RunArgs<Ready>,
) -> crate::Result<()> {
use crate::net::xds::server::ControlPlane;

let xds_server = ControlPlane::from_arc(config.clone(), ready.idle_request_interval)
.management_server(self.xds_listener)?;
let mds_server = tokio::spawn(
ControlPlane::from_arc(config.clone(), ready.idle_request_interval)
.relay_server(self.mds_listener)?,
);

let _provider_task = self.provider.map(|provider| {
let config = config.clone();
let provider_is_healthy = ready.provider_is_healthy.clone();
Expand Down Expand Up @@ -134,14 +125,13 @@ impl Relay {
}
});

tokio::select! {
result = xds_server => {
result
}
result = mds_server => {
result?
}
result = shutdown_rx.changed() => result.map_err(From::from),
}
crate::cli::Service::default()
.xds()
.xds_port(self.xds_port)
.mds()
.mds_port(self.mds_port)
.spawn_services(&config, &shutdown_rx)?;

shutdown_rx.changed().await.map_err(From::from)
}
}

0 comments on commit 9c360eb

Please sign in to comment.