Skip to content

Commit

Permalink
refactor: Move network servers under publish flags
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Jan 20, 2025
1 parent 86dcfbb commit 0094bb9
Show file tree
Hide file tree
Showing 24 changed files with 524 additions and 627 deletions.
25 changes: 9 additions & 16 deletions benches/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,22 +330,15 @@ impl QuilkinLoop {
.insert_default([quilkin::net::endpoint::Endpoint::new(endpoint.into())].into())
});

let proxy = quilkin::cli::Proxy {
port,
qcmp_port: runtime
.block_on(quilkin::test::available_addr(
quilkin::test::AddressType::Random,
))
.port(),
..<_>::default()
};

runtime.block_on(async move {
proxy
.run(config, Default::default(), None, shutdown_rx)
.await
.unwrap();
});
runtime
.block_on(
quilkin::cli::Publish::default()
.udp()
.udp_port(port)
.spawn_publishers(&config, &shutdown_rx)
.unwrap(),
)
.unwrap();
});

Self {
Expand Down
1 change: 1 addition & 0 deletions crates/agones/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ pub async fn quilkin_proxy_deployment(
let mut container = quilkin_container(
client,
Some(vec![
"--publish.udp".into(),
"proxy".into(),
format!("--management-server={management_server}"),
]),
Expand Down
1 change: 1 addition & 0 deletions crates/nmap-service-probes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub enum MatchKind {
mod tests {
use super::*;

use alloc::vec;
use pretty_assertions::assert_eq;

#[test]
Expand Down
1 change: 1 addition & 0 deletions crates/nmap-service-probes/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ pub fn fallback<'i, E: ParserError<Stream<'i>> + AddContext<Stream<'i>, StrConte
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;

#[test]
fn exclude() -> PResult<()> {
Expand Down
74 changes: 49 additions & 25 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use quilkin::{
Config, ShutdownTx,
};
pub use serde_json::json;
use std::{net::SocketAddr, num::NonZeroUsize, path::PathBuf, sync::Arc};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::sync::mpsc;

pub static BUFFER_POOL: once_cell::sync::Lazy<Arc<BufferPool>> =
Expand Down Expand Up @@ -75,7 +75,9 @@ macro_rules! trace_test {

let _guard = init_logging($crate::Level::DEBUG, mname);

$body
tokio::spawn(async move {
$body
});
}
};
}
Expand Down Expand Up @@ -307,11 +309,8 @@ impl Pail {
PailConfig::Relay(rpc) => {
use components::relay;

let xds_listener = TcpListener::bind(None).unwrap();
let mds_listener = TcpListener::bind(None).unwrap();

let xds_port = xds_listener.port();
let mds_port = mds_listener.port();
let xds_port = TcpListener::bind(None).unwrap().port();
let mds_port = TcpListener::bind(None).unwrap().port();

let path = td.join(spc.name);
let mut tc = rpc.config.unwrap_or_default();
Expand All @@ -326,10 +325,17 @@ impl Pail {
let config = Arc::new(Config::default_non_agent());
config.id.store(Arc::new(spc.name.into()));

let _task = tokio::spawn(
quilkin::cli::Publish::default()
.xds()
.xds_port(xds_port)
.mds()
.mds_port(mds_port)
.spawn_publishers(&config, &shutdown_rx)
.unwrap(),
);
let task = tokio::spawn(
relay::Relay {
xds_listener,
mds_listener,
provider: Some(Providers::File { path }),
}
.run(RunArgs {
Expand Down Expand Up @@ -392,21 +398,28 @@ impl Pail {
let (shutdown, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);

let qcmp_socket =
quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket");
let qcmp_port = quilkin::net::socket_port(&qcmp_socket);
let qcmp_port = quilkin::net::socket_port(
&quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
);

let config_path = path.clone();
let config = Arc::new(Config::default_agent());
config.id.store(Arc::new(spc.name.into()));
let acfg = config.clone();

let _task = tokio::spawn(
quilkin::cli::Publish::default()
.qcmp()
.qcmp_port(qcmp_port)
.spawn_publishers(&config, &shutdown_rx)
.unwrap(),
);

let task = tokio::spawn(async move {
components::agent::Agent {
locality: None,
icao_code: Some(apc.icao_code),
relay_servers,
qcmp_socket,
provider: Some(Providers::File { path }),
address_selector: None,
}
Expand All @@ -430,14 +443,18 @@ impl Pail {
})
}
PailConfig::Proxy(ppc) => {
let socket = quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket");
let qcmp =
quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket");
let qcmp_port = quilkin::net::socket_port(&qcmp);
let phoenix = TcpListener::bind(None).expect("failed to bind phoenix socket");
let phoenix_port = phoenix.port();

let port = quilkin::net::socket_port(&socket);
let port = {
let socket =
quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket");
quilkin::net::socket_port(&socket)
};

let qcmp_port = quilkin::net::socket_port(
&quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
);
let phoenix_port = TcpListener::bind(None)
.expect("failed to bind phoenix socket")
.port();

let management_servers = spc
.dependencies
Expand Down Expand Up @@ -496,16 +513,23 @@ impl Pail {

let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel();

let _task = tokio::spawn(
quilkin::cli::Publish::default()
.udp()
.udp_port(port)
.qcmp()
.qcmp_port(qcmp_port)
.phoenix()
.phoenix_port(phoenix_port)
.spawn_publishers(&config, &shutdown_rx)
.unwrap(),
);
let task = tokio::spawn(async move {
components::proxy::Proxy {
num_workers: NonZeroUsize::new(1).unwrap(),
mmdb: None,
to: Vec::new(),
to_tokens: None,
management_servers,
socket: Some(socket),
qcmp,
phoenix,
notifier: Some(rttx),
}
.run(
Expand Down
2 changes: 1 addition & 1 deletion crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ trace_test!(
config,
socket,
pending_sends,
&sessions,
sessions,
BUFFER_POOL.clone(),
)
.await
Expand Down
4 changes: 2 additions & 2 deletions crates/xds/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
let server = AggregatedDiscoveryServiceServer::new(self)
.max_encoding_message_size(crate::config::max_grpc_message_size());
let server = tonic::transport::Server::builder().add_service(server);
tracing::info!("serving management server on port `{}`", listener.port());
tracing::info!(port = listener.port(), "publishing xDS server");
Ok(server
.serve_with_incoming(listener.into_stream()?)
.map_err(From::from))
Expand All @@ -98,7 +98,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
let server = AggregatedControlPlaneDiscoveryServiceServer::new(self)
.max_encoding_message_size(crate::config::max_grpc_message_size());
let server = tonic::transport::Server::builder().add_service(server);
tracing::info!("serving relay server on port `{}`", listener.port());
tracing::info!(port = listener.port(), "publishing mDS server");
Ok(server
.serve_with_incoming(listener.into_stream()?)
.map_err(From::from))
Expand Down
24 changes: 8 additions & 16 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,10 @@ use strum_macros::{Display, EnumString};

pub use self::{
agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy,
qcmp::Qcmp, relay::Relay,
publish::Publish, qcmp::Qcmp, relay::Relay,
};

macro_rules! define_port {
($port:expr) => {
pub const PORT: u16 = $port;

pub fn default_port() -> u16 {
PORT
}
};
}
mod publish;

pub mod agent;
pub mod generate_config_schema;
Expand All @@ -49,7 +41,6 @@ pub mod qcmp;
pub mod relay;

const ETC_CONFIG_PATH: &str = "/etc/quilkin/quilkin.yaml";
const PORT_ENV_VAR: &str = "QUILKIN_PORT";

#[derive(Debug, clap::Parser)]
#[command(next_help_heading = "Administration Options")]
Expand Down Expand Up @@ -121,6 +112,8 @@ pub struct Cli {
pub admin: AdminCli,
#[command(flatten)]
pub locality: LocalityCli,
#[command(flatten)]
pub publish: publish::Publish,
}

/// The various log format options
Expand Down Expand Up @@ -193,12 +186,9 @@ impl Cli {
return generator.generate_config_schema();
}
Commands::Agent(_) => Admin::Agent(<_>::default()),
Commands::Proxy(proxy) => {
Commands::Proxy(_) => {
let ready = components::proxy::Ready {
idle_request_interval: proxy
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
idle_request_interval: admin_server::IDLE_REQUEST_INTERVAL,
..Default::default()
};
Admin::Proxy(ready)
Expand Down Expand Up @@ -294,6 +284,8 @@ impl Cli {
shutdown_tx.send(crate::ShutdownKind::Normal).ok();
});

let _publish_task = self.publish.spawn_publishers(&config, &shutdown_rx)?;

match (self.command, mode) {
(Commands::Agent(agent), Admin::Agent(ready)) => {
agent.run(locality, config, ready, shutdown_rx).await
Expand Down
22 changes: 1 addition & 21 deletions src/cli/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@ use std::sync::Arc;
use crate::{components::agent, config::Config};
pub use agent::Ready;

define_port!(7600);

/// Runs Quilkin as a relay service that runs a Manager Discovery Service
/// (mDS) for accepting cluster and configuration information from xDS
/// management services, and exposing it as a single merged xDS service for
/// proxy services.
#[derive(clap::Args, Clone, Debug)]
#[derive(clap::Args, Clone, Debug, Default)]
pub struct Agent {
/// Port for QCMP service.
#[clap(short, long, env = "QCMP_PORT", default_value_t = PORT)]
pub qcmp_port: u16,
/// One or more `quilkin relay` endpoints to push configuration changes to.
#[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")]
pub relay: Vec<tonic::transport::Endpoint>,
Expand Down Expand Up @@ -63,19 +58,6 @@ impl clap::ValueEnum for crate::config::AddrKind {
}
}

impl Default for Agent {
fn default() -> Self {
Self {
qcmp_port: PORT,
relay: <_>::default(),
provider: <_>::default(),
icao_code: <_>::default(),
address_type: None,
ip_kind: None,
}
}
}

impl Agent {
#[tracing::instrument(skip_all)]
pub async fn run(
Expand All @@ -85,12 +67,10 @@ impl Agent {
ready: Ready,
shutdown_rx: crate::ShutdownRx,
) -> crate::Result<()> {
let qcmp_socket = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
let icao_code = Some(self.icao_code);

agent::Agent {
locality,
qcmp_socket,
icao_code,
relay_servers: self.relay,
provider: self.provider,
Expand Down
8 changes: 0 additions & 8 deletions src/cli/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
use crate::components::manage;
pub use manage::Ready;

define_port!(7800);

/// Runs Quilkin as a xDS management server, using `provider` as
/// a configuration source.
#[derive(clap::Args, Clone, Debug)]
pub struct Manage {
/// One or more `quilkin relay` endpoints to push configuration changes to.
#[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")]
pub relay: Vec<tonic::transport::Endpoint>,
/// The TCP port to listen to, to serve discovery responses.
#[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)]
pub port: u16,
/// The configuration source for a management server.
#[clap(subcommand)]
pub provider: crate::config::Providers,
Expand All @@ -50,13 +45,10 @@ impl Manage {
ready: Ready,
shutdown_rx: crate::ShutdownRx,
) -> crate::Result<()> {
let listener = crate::net::TcpListener::bind(Some(self.port))?;

manage::Manage {
locality,
provider: self.provider,
relay_servers: self.relay,
listener,
address_selector: self.address_type.map(|at| crate::config::AddressSelector {
name: at,
kind: self.ip_kind,
Expand Down
Loading

0 comments on commit 0094bb9

Please sign in to comment.