diff --git a/Cargo.lock b/Cargo.lock
index 770eacd990..1521ac66f4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2104,9 +2104,9 @@ dependencies = [
 
 [[package]]
 name = "neli-proc-macros"
-version = "0.1.3"
+version = "0.1.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c168194d373b1e134786274020dae7fc5513d565ea2ebb9bc9ff17ffb69106d4"
+checksum = "0c8034b7fbb6f9455b2a96c19e6edf8dc9fc34c70449938d8ee3b4df363f61fe"
 dependencies = [
  "either",
  "proc-macro2",
@@ -4291,8 +4291,8 @@ checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
 
 [[package]]
 name = "xdp"
-version = "0.0.1"
-source = "git+https://github.com/Jake-Shadle/xdp?branch=impl#4ec459e29fd1fdf38f85a7d48b64d3ae2f6098de"
+version = "0.1.0"
+source = "git+https://github.com/Jake-Shadle/xdp#579d5cad8be1342c6c53dd4236bb4ac35fa026a6"
 dependencies = [
  "libc",
  "memmap2",
diff --git a/bacon.toml b/bacon.toml
new file mode 100644
index 0000000000..3e91915a83
--- /dev/null
+++ b/bacon.toml
@@ -0,0 +1,111 @@
+# This is a configuration file for the bacon tool
+#
+# Complete help on configuration: https://dystroy.org/bacon/config/
+# 
+# You may check the current default at
+#   https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml
+
+default_job = "clippy-all"
+env.CARGO_TERM_COLOR = "always"
+
+[jobs.check]
+command = ["cargo", "check"]
+need_stdout = false
+
+[jobs.check-all]
+command = ["cargo", "check", "--all-targets"]
+need_stdout = false
+
+# Run clippy on the default target
+[jobs.clippy]
+command = ["cargo", "clippy"]
+need_stdout = false
+
+# Run clippy on all targets
+# To disable some lints, you may change the job this way:
+#    [jobs.clippy-all]
+#    command = [
+#        "cargo", "clippy",
+#        "--all-targets",
+#    	 "--",
+#    	 "-A", "clippy::bool_to_int_with_if",
+#    	 "-A", "clippy::collapsible_if",
+#    	 "-A", "clippy::derive_partial_eq_without_eq",
+#    ]
+# need_stdout = false
+[jobs.clippy-all]
+command = ["cargo", "clippy", "--all-targets", "--workspace"]
+need_stdout = false
+
+# This job lets you run
+# - all tests: bacon test
+# - a specific test: bacon test -- config::test_default_files
+# - the tests of a package: bacon test -- -- -p config
+[jobs.test]
+command = ["cargo", "test"]
+need_stdout = true
+
+[jobs.nextest]
+command = [
+    "cargo", "nextest", "run",
+    "--hide-progress-bar", "--failure-output", "final"
+]
+need_stdout = true
+analyzer = "nextest"
+
+[jobs.doc]
+command = ["cargo", "doc", "--no-deps"]
+need_stdout = false
+
+# If the doc compiles, then it opens in your browser and bacon switches
+# to the previous job
+[jobs.doc-open]
+command = ["cargo", "doc", "--no-deps", "--open"]
+need_stdout = false
+on_success = "back" # so that we don't open the browser at each change
+
+# You can run your application and have the result displayed in bacon,
+# if it makes sense for this crate.
+[jobs.run]
+command = [
+    "cargo", "run",
+    # put launch parameters for your program behind a `--` separator
+]
+need_stdout = true
+allow_warnings = true
+background = true
+
+# Run your long-running application (eg server) and have the result displayed in bacon.
+# For programs that never stop (eg a server), `background` is set to false
+# to have the cargo run output immediately displayed instead of waiting for
+# program's end.
+# 'on_change_strategy' is set to `kill_then_restart` to have your program restart
+# on every change (an alternative would be to use the 'F5' key manually in bacon).
+# If you often use this job, it makes sense to override the 'r' key by adding
+# a binding `r = job:run-long` at the end of this file .
+[jobs.run-long]
+command = [
+    "cargo", "run",
+    # put launch parameters for your program behind a `--` separator
+]
+need_stdout = true
+allow_warnings = true
+background = false
+on_change_strategy = "kill_then_restart"
+
+# This parameterized job runs the example of your choice, as soon
+# as the code compiles.
+# Call it as
+#    bacon ex -- my-example
+[jobs.ex]
+command = ["cargo", "run", "--example"]
+need_stdout = true
+allow_warnings = true
+
+# You may define here keybindings that would be specific to
+# a project, for example a shortcut to launch a specific job.
+# Shortcuts to internal functions (scrolling, toggling, etc.)
+# should go in your personal global prefs.toml file instead.
+[keybindings]
+# alt-m = "job:my-job"
+c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target
diff --git a/benches/shared.rs b/benches/shared.rs
index e6748c19aa..f52fc33b69 100644
--- a/benches/shared.rs
+++ b/benches/shared.rs
@@ -330,22 +330,15 @@ impl QuilkinLoop {
                     .insert_default([quilkin::net::endpoint::Endpoint::new(endpoint.into())].into())
             });
 
-            let proxy = quilkin::cli::Proxy {
-                port,
-                qcmp_port: runtime
-                    .block_on(quilkin::test::available_addr(
-                        quilkin::test::AddressType::Random,
-                    ))
-                    .port(),
-                ..<_>::default()
-            };
-
-            runtime.block_on(async move {
-                proxy
-                    .run(config, Default::default(), None, shutdown_rx)
-                    .await
-                    .unwrap();
-            });
+            runtime
+                .block_on(
+                    quilkin::cli::Publish::default()
+                        .udp()
+                        .udp_port(port)
+                        .spawn_publishers(&config, &shutdown_rx)
+                        .unwrap(),
+                )
+                .unwrap();
         });
 
         Self {
diff --git a/crates/agones/src/lib.rs b/crates/agones/src/lib.rs
index 136d53d8a7..7a4a33d370 100644
--- a/crates/agones/src/lib.rs
+++ b/crates/agones/src/lib.rs
@@ -318,6 +318,8 @@ pub async fn quilkin_proxy_deployment(
     let mut container = quilkin_container(
         client,
         Some(vec![
+            "--publish.udp".into(),
+            "--publish.qcmp".into(),
             "proxy".into(),
             format!("--management-server={management_server}"),
         ]),
diff --git a/crates/agones/src/relay.rs b/crates/agones/src/relay.rs
index b8c3626d0f..865b640766 100644
--- a/crates/agones/src/relay.rs
+++ b/crates/agones/src/relay.rs
@@ -263,6 +263,8 @@ mod tests {
 
         // Setup the relay
         let args = [
+            "--publish.mds",
+            "--publish.xds",
             "relay",
             "agones",
             "--config-namespace",
@@ -347,6 +349,7 @@ mod tests {
 
         // agent deployment
         let args = [
+            "--publish.qcmp",
             "agent",
             "--relay",
             &format!("http://{relay_name}:7900"),
diff --git a/crates/agones/src/sidecar.rs b/crates/agones/src/sidecar.rs
index 4ccf82ba0b..35a9e040ea 100644
--- a/crates/agones/src/sidecar.rs
+++ b/crates/agones/src/sidecar.rs
@@ -16,7 +16,9 @@
 
 #[cfg(test)]
 mod tests {
-    use crate::{game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client};
+    use crate::{
+        debug_pods, game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client,
+    };
     use k8s_openapi::api::core::v1::{ConfigMap, ConfigMapVolumeSource, Volume};
     use kube::{api::PostParams, runtime::wait::await_condition, Api, ResourceExt};
     use quilkin::{config::providers::k8s::agones::GameServer, test::TestHelper};
@@ -98,7 +100,7 @@ clusters:
         let mount_name = "config".to_string();
         template.containers.push(quilkin_container(
             &client,
-            Some(vec!["proxy".into()]),
+            Some(vec!["--publish.udp".into(), "proxy".into()]),
             Some(mount_name.clone()),
             true,
         ));
@@ -133,6 +135,8 @@ clusters:
             .await
             .expect("should receive packet")
             .unwrap();
+        debug_pods(&client, "role=proxy".into()).await;
+        debug_pods(&client, "{name}".into()).await;
         assert_eq!("ACK: hellosidecar\n", response);
     }
 }
diff --git a/crates/nmap-service-probes/src/lib.rs b/crates/nmap-service-probes/src/lib.rs
index fbbeb0c7a5..ce48f11244 100644
--- a/crates/nmap-service-probes/src/lib.rs
+++ b/crates/nmap-service-probes/src/lib.rs
@@ -209,6 +209,7 @@ pub enum MatchKind {
 mod tests {
     use super::*;
 
+    use alloc::vec;
     use pretty_assertions::assert_eq;
 
     #[test]
diff --git a/crates/nmap-service-probes/src/parser.rs b/crates/nmap-service-probes/src/parser.rs
index 72d6ebdc72..d7678c2423 100644
--- a/crates/nmap-service-probes/src/parser.rs
+++ b/crates/nmap-service-probes/src/parser.rs
@@ -531,6 +531,7 @@ pub fn fallback<'i, E: ParserError<Stream<'i>> + AddContext<Stream<'i>, StrConte
 #[cfg(test)]
 mod tests {
     use super::*;
+    use alloc::vec;
 
     #[test]
     fn exclude() -> PResult<()> {
diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs
index 89431db513..eae4d0bd7b 100644
--- a/crates/test/src/lib.rs
+++ b/crates/test/src/lib.rs
@@ -7,7 +7,7 @@ use quilkin::{
     Config, ShutdownTx,
 };
 pub use serde_json::json;
-use std::{net::SocketAddr, num::NonZeroUsize, path::PathBuf, sync::Arc};
+use std::{net::SocketAddr, path::PathBuf, sync::Arc};
 use tokio::sync::mpsc;
 
 pub static BUFFER_POOL: once_cell::sync::Lazy<Arc<BufferPool>> =
@@ -75,7 +75,9 @@ macro_rules! trace_test {
 
             let _guard = init_logging($crate::Level::DEBUG, mname);
 
-            $body
+            tokio::spawn(async move {
+                $body
+            });
         }
     };
 }
@@ -307,11 +309,8 @@ impl Pail {
             PailConfig::Relay(rpc) => {
                 use components::relay;
 
-                let xds_listener = TcpListener::bind(None).unwrap();
-                let mds_listener = TcpListener::bind(None).unwrap();
-
-                let xds_port = xds_listener.port();
-                let mds_port = mds_listener.port();
+                let xds_port = TcpListener::bind(None).unwrap().port();
+                let mds_port = TcpListener::bind(None).unwrap().port();
 
                 let path = td.join(spc.name);
                 let mut tc = rpc.config.unwrap_or_default();
@@ -326,10 +325,17 @@ impl Pail {
                 let config = Arc::new(Config::default_non_agent());
                 config.id.store(Arc::new(spc.name.into()));
 
+                let _task = tokio::spawn(
+                    quilkin::cli::Publish::default()
+                        .xds()
+                        .xds_port(xds_port)
+                        .mds()
+                        .mds_port(mds_port)
+                        .spawn_publishers(&config, &shutdown_rx)
+                        .unwrap(),
+                );
                 let task = tokio::spawn(
                     relay::Relay {
-                        xds_listener,
-                        mds_listener,
                         provider: Some(Providers::File { path }),
                     }
                     .run(RunArgs {
@@ -392,21 +398,28 @@ impl Pail {
                 let (shutdown, shutdown_rx) =
                     quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);
 
-                let qcmp_socket =
-                    quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket");
-                let qcmp_port = quilkin::net::socket_port(&qcmp_socket);
+                let qcmp_port = quilkin::net::socket_port(
+                    &quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
+                );
 
                 let config_path = path.clone();
                 let config = Arc::new(Config::default_agent());
                 config.id.store(Arc::new(spc.name.into()));
                 let acfg = config.clone();
 
+                let _task = tokio::spawn(
+                    quilkin::cli::Publish::default()
+                        .qcmp()
+                        .qcmp_port(qcmp_port)
+                        .spawn_publishers(&config, &shutdown_rx)
+                        .unwrap(),
+                );
+
                 let task = tokio::spawn(async move {
                     components::agent::Agent {
                         locality: None,
                         icao_code: Some(apc.icao_code),
                         relay_servers,
-                        qcmp_socket,
                         provider: Some(Providers::File { path }),
                         address_selector: None,
                     }
@@ -430,14 +443,18 @@ impl Pail {
                 })
             }
             PailConfig::Proxy(ppc) => {
-                let socket = quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket");
-                let qcmp =
-                    quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket");
-                let qcmp_port = quilkin::net::socket_port(&qcmp);
-                let phoenix = TcpListener::bind(None).expect("failed to bind phoenix socket");
-                let phoenix_port = phoenix.port();
-
-                let port = quilkin::net::socket_port(&socket);
+                let port = {
+                    let socket =
+                        quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket");
+                    quilkin::net::socket_port(&socket)
+                };
+
+                let qcmp_port = quilkin::net::socket_port(
+                    &quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"),
+                );
+                let phoenix_port = TcpListener::bind(None)
+                    .expect("failed to bind phoenix socket")
+                    .port();
 
                 let management_servers = spc
                     .dependencies
@@ -496,13 +513,20 @@ impl Pail {
 
                 let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel();
 
+                let _task = tokio::spawn(
+                    quilkin::cli::Publish::default()
+                        .udp()
+                        .udp_port(port)
+                        .qcmp()
+                        .qcmp_port(qcmp_port)
+                        .phoenix()
+                        .phoenix_port(phoenix_port)
+                        .spawn_publishers(&config, &shutdown_rx)
+                        .unwrap(),
+                );
                 let task = tokio::spawn(async move {
                     components::proxy::Proxy {
-                        num_workers: NonZeroUsize::new(1).unwrap(),
                         management_servers,
-                        socket: Some(socket),
-                        qcmp,
-                        phoenix,
                         notifier: Some(rttx),
                         ..Default::default()
                     }
diff --git a/crates/test/tests/proxy.rs b/crates/test/tests/proxy.rs
index b3dddea8f0..b125afefde 100644
--- a/crates/test/tests/proxy.rs
+++ b/crates/test/tests/proxy.rs
@@ -158,7 +158,7 @@ trace_test!(
             config,
             socket,
             pending_sends,
-            &sessions,
+            sessions,
             BUFFER_POOL.clone(),
         )
         .await
diff --git a/crates/xdp/Cargo.toml b/crates/xdp/Cargo.toml
index eec23d412b..07cb0b3f9c 100644
--- a/crates/xdp/Cargo.toml
+++ b/crates/xdp/Cargo.toml
@@ -12,8 +12,7 @@ aya-log = "0.2.1"
 libc.workspace = true
 thiserror.workspace = true
 tracing.workspace = true
-xdp = { git = "https://github.com/Jake-Shadle/xdp", branch = "impl" }
-#xdp = { path = "../../../xdp" }
+xdp = "0.1"
 
 [lints]
 workspace = true
diff --git a/crates/xds/src/server.rs b/crates/xds/src/server.rs
index 8a4b125655..480c3896ee 100644
--- a/crates/xds/src/server.rs
+++ b/crates/xds/src/server.rs
@@ -79,7 +79,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
         let server = AggregatedDiscoveryServiceServer::new(self)
             .max_encoding_message_size(crate::config::max_grpc_message_size());
         let server = tonic::transport::Server::builder().add_service(server);
-        tracing::info!("serving management server on port `{}`", listener.port());
+        tracing::info!(port = listener.port(), "publishing xDS server");
         Ok(server
             .serve_with_incoming(listener.into_stream()?)
             .map_err(From::from))
@@ -98,7 +98,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
         let server = AggregatedControlPlaneDiscoveryServiceServer::new(self)
             .max_encoding_message_size(crate::config::max_grpc_message_size());
         let server = tonic::transport::Server::builder().add_service(server);
-        tracing::info!("serving relay server on port `{}`", listener.port());
+        tracing::info!(port = listener.port(), "publishing mDS server");
         Ok(server
             .serve_with_incoming(listener.into_stream()?)
             .map_err(From::from))
diff --git a/src/cli.rs b/src/cli.rs
index b9137a105c..bd5c44d4ed 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -28,18 +28,10 @@ use strum_macros::{Display, EnumString};
 
 pub use self::{
     agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy,
-    qcmp::Qcmp, relay::Relay,
+    publish::Publish, qcmp::Qcmp, relay::Relay,
 };
 
-macro_rules! define_port {
-    ($port:expr) => {
-        pub const PORT: u16 = $port;
-
-        pub fn default_port() -> u16 {
-            PORT
-        }
-    };
-}
+mod publish;
 
 pub mod agent;
 pub mod generate_config_schema;
@@ -49,7 +41,6 @@ pub mod qcmp;
 pub mod relay;
 
 const ETC_CONFIG_PATH: &str = "/etc/quilkin/quilkin.yaml";
-const PORT_ENV_VAR: &str = "QUILKIN_PORT";
 
 #[derive(Debug, clap::Parser)]
 #[command(next_help_heading = "Administration Options")]
@@ -121,6 +112,8 @@ pub struct Cli {
     pub admin: AdminCli,
     #[command(flatten)]
     pub locality: LocalityCli,
+    #[command(flatten)]
+    pub publish: publish::Publish,
 }
 
 /// The various log format options
@@ -193,12 +186,9 @@ impl Cli {
                 return generator.generate_config_schema();
             }
             Commands::Agent(_) => Admin::Agent(<_>::default()),
-            Commands::Proxy(proxy) => {
+            Commands::Proxy(_) => {
                 let ready = components::proxy::Ready {
-                    idle_request_interval: proxy
-                        .idle_request_interval_secs
-                        .map(std::time::Duration::from_secs)
-                        .unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
+                    idle_request_interval: admin_server::IDLE_REQUEST_INTERVAL,
                     ..Default::default()
                 };
                 Admin::Proxy(ready)
@@ -294,6 +284,8 @@ impl Cli {
             shutdown_tx.send(crate::ShutdownKind::Normal).ok();
         });
 
+        let _publish_task = self.publish.spawn_publishers(&config, &shutdown_rx)?;
+
         match (self.command, mode) {
             (Commands::Agent(agent), Admin::Agent(ready)) => {
                 agent.run(locality, config, ready, shutdown_rx).await
diff --git a/src/cli/agent.rs b/src/cli/agent.rs
index b1678b7c0a..c0a7c3bcfa 100644
--- a/src/cli/agent.rs
+++ b/src/cli/agent.rs
@@ -19,17 +19,12 @@ use std::sync::Arc;
 use crate::{components::agent, config::Config};
 pub use agent::Ready;
 
-define_port!(7600);
-
 /// Runs Quilkin as a relay service that runs a Manager Discovery Service
 /// (mDS) for accepting cluster and configuration information from xDS
 /// management services, and exposing it as a single merged xDS service for
 /// proxy services.
-#[derive(clap::Args, Clone, Debug)]
+#[derive(clap::Args, Clone, Debug, Default)]
 pub struct Agent {
-    /// Port for QCMP service.
-    #[clap(short, long, env = "QCMP_PORT", default_value_t = PORT)]
-    pub qcmp_port: u16,
     /// One or more `quilkin relay` endpoints to push configuration changes to.
     #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")]
     pub relay: Vec<tonic::transport::Endpoint>,
@@ -63,19 +58,6 @@ impl clap::ValueEnum for crate::config::AddrKind {
     }
 }
 
-impl Default for Agent {
-    fn default() -> Self {
-        Self {
-            qcmp_port: PORT,
-            relay: <_>::default(),
-            provider: <_>::default(),
-            icao_code: <_>::default(),
-            address_type: None,
-            ip_kind: None,
-        }
-    }
-}
-
 impl Agent {
     #[tracing::instrument(skip_all)]
     pub async fn run(
@@ -85,12 +67,10 @@ impl Agent {
         ready: Ready,
         shutdown_rx: crate::ShutdownRx,
     ) -> crate::Result<()> {
-        let qcmp_socket = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
         let icao_code = Some(self.icao_code);
 
         agent::Agent {
             locality,
-            qcmp_socket,
             icao_code,
             relay_servers: self.relay,
             provider: self.provider,
diff --git a/src/cli/manage.rs b/src/cli/manage.rs
index 32790dbb95..c813bbdfd5 100644
--- a/src/cli/manage.rs
+++ b/src/cli/manage.rs
@@ -17,8 +17,6 @@
 use crate::components::manage;
 pub use manage::Ready;
 
-define_port!(7800);
-
 /// Runs Quilkin as a xDS management server, using `provider` as
 /// a configuration source.
 #[derive(clap::Args, Clone, Debug)]
@@ -26,9 +24,6 @@ pub struct Manage {
     /// One or more `quilkin relay` endpoints to push configuration changes to.
     #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")]
     pub relay: Vec<tonic::transport::Endpoint>,
-    /// The TCP port to listen to, to serve discovery responses.
-    #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)]
-    pub port: u16,
     /// The configuration source for a management server.
     #[clap(subcommand)]
     pub provider: crate::config::Providers,
@@ -50,13 +45,10 @@ impl Manage {
         ready: Ready,
         shutdown_rx: crate::ShutdownRx,
     ) -> crate::Result<()> {
-        let listener = crate::net::TcpListener::bind(Some(self.port))?;
-
         manage::Manage {
             locality,
             provider: self.provider,
             relay_servers: self.relay,
-            listener,
             address_selector: self.address_type.map(|at| crate::config::AddressSelector {
                 name: at,
                 kind: self.ip_kind,
diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs
index 677598c81d..9a2557a82e 100644
--- a/src/cli/proxy.rs
+++ b/src/cli/proxy.rs
@@ -24,12 +24,8 @@ use crate::ShutdownRx;
 
 pub use crate::components::proxy::Ready;
 
-define_port!(7777);
-
-const QCMP_PORT: u16 = 7600;
-
 /// Run Quilkin as a UDP reverse proxy.
-#[derive(clap::Args, Clone, Debug)]
+#[derive(clap::Args, Clone, Debug, Default)]
 pub struct Proxy {
     /// One or more `quilkin manage` endpoints to listen to for config changes
     #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER", conflicts_with("to"))]
@@ -37,12 +33,6 @@ pub struct Proxy {
     /// The remote URL or local file path to retrieve the Maxmind database.
     #[clap(long, env)]
     pub mmdb: Option<crate::net::maxmind_db::Source>,
-    /// The port to listen on.
-    #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)]
-    pub port: u16,
-    /// The port to listen on.
-    #[clap(short, long, env = "QUILKIN_QCMP_PORT", default_value_t = QCMP_PORT)]
-    pub qcmp_port: u16,
     /// One or more socket addresses to forward packets to.
     #[clap(long, env = "QUILKIN_DEST")]
     pub to: Vec<SocketAddr>,
@@ -51,83 +41,6 @@ pub struct Proxy {
     /// Format is `<number of unique tokens>:<length of token suffix for each packet>`
     #[clap(long, env = "QUILKIN_DEST_TOKENS", requires("to"))]
     pub to_tokens: Option<String>,
-    /// The interval in seconds at which the relay will send a discovery request
-    /// to an management server after receiving no updates.
-    #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")]
-    pub idle_request_interval_secs: Option<u64>,
-    /// Number of worker threads used to process packets.
-    ///
-    /// If not specified defaults to number of cpus. Has no effect if XDP is used,
-    /// as the number of workers is always the same as the NIC queue size.
-    #[clap(short, long, env = "QUILKIN_WORKERS")]
-    pub workers: Option<std::num::NonZeroUsize>,
-    #[clap(flatten)]
-    pub xdp_opts: XdpOptions,
-}
-
-/// XDP (eXpress Data Path) options
-#[derive(clap::Args, Clone, Debug)]
-pub struct XdpOptions {
-    /// The name of the network interface to bind the XDP socket(s) to.
-    ///
-    /// If not specified quilkin will attempt to determine the most appropriate
-    /// network interface to use. Quilkin will exit with an error if the network
-    /// interface does not exist, or a suitable default cannot be determined.
-    #[clap(long = "publish.udp.xdp.network-interface")]
-    pub network_interface: Option<String>,
-    /// Forces the use of XDP.
-    ///
-    /// If XDP is not available on the chosen NIC, Quilkin exits with an error.
-    /// If false, io-uring will be used as the fallback implementation.
-    #[clap(long = "publish.udp.xdp")]
-    pub force_xdp: bool,
-    /// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags)
-    ///
-    /// If zero copy is not available on the chosen NIC, Quilkin exits with an error
-    #[clap(long = "publish.udp.xdp.zerocopy")]
-    pub force_zerocopy: bool,
-    /// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html)
-    ///
-    /// TX checksum offload is an optional feature allowing the data portion of
-    /// a packet to have its internet checksum calculation offloaded to the NIC,
-    /// as otherwise this is done in software
-    #[clap(long = "publish.udp.xdp.tco")]
-    pub force_tx_checksum_offload: bool,
-    /// The maximum amount of memory mapped for packet buffers, in bytes
-    ///
-    /// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time)
-    /// per NIC queue, ie 128MiB on a 32 queue NIC
-    #[clap(long = "publish.udp.xdp.memory-limit")]
-    pub maximum_memory: Option<u64>,
-}
-
-#[allow(clippy::derivable_impls)]
-impl Default for XdpOptions {
-    fn default() -> Self {
-        Self {
-            network_interface: None,
-            force_xdp: false,
-            force_zerocopy: false,
-            force_tx_checksum_offload: false,
-            maximum_memory: None,
-        }
-    }
-}
-
-impl Default for Proxy {
-    fn default() -> Self {
-        Self {
-            management_server: <_>::default(),
-            mmdb: <_>::default(),
-            port: PORT,
-            qcmp_port: QCMP_PORT,
-            to: <_>::default(),
-            to_tokens: None,
-            idle_request_interval_secs: None,
-            workers: None,
-            xdp_opts: Default::default(),
-        }
-    }
 }
 
 impl Proxy {
@@ -140,22 +53,7 @@ impl Proxy {
         initialized: Option<tokio::sync::oneshot::Sender<()>>,
         shutdown_rx: ShutdownRx,
     ) -> crate::Result<()> {
-        tracing::info!(
-            port = self.port,
-            proxy_id = &*config.id.load(),
-            "Starting proxy"
-        );
-
-        // The number of worker tasks to spawn. Each task gets a dedicated queue to
-        // consume packets off.
-        let num_workers = self.workers.unwrap_or_else(|| {
-            std::num::NonZeroUsize::new(num_cpus::get())
-                .expect("num_cpus returned 0, which should be impossible")
-        });
-
-        let socket = crate::net::raw_socket_with_reuse(self.port)?;
-        let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
-        let phoenix = crate::net::TcpListener::bind(Some(self.qcmp_port))?;
+        tracing::info!(proxy_id = &*config.id.load(), "Starting proxy");
 
         let to_tokens = self
             .to_tokens
@@ -176,12 +74,7 @@ impl Proxy {
             mmdb: self.mmdb,
             to: self.to,
             to_tokens,
-            num_workers,
-            socket: Some(socket),
-            qcmp,
-            phoenix,
             notifier: None,
-            xdp: self.xdp_opts,
         }
         .run(
             crate::components::RunArgs {
diff --git a/src/cli/publish.rs b/src/cli/publish.rs
new file mode 100644
index 0000000000..9624c9c529
--- /dev/null
+++ b/src/cli/publish.rs
@@ -0,0 +1,443 @@
+use std::{future::Future, sync::Arc};
+
+use crate::{components::proxy::SessionPool, config::Config};
+
+#[derive(Debug, clap::Parser)]
+#[command(next_help_heading = "Publish Options")]
+pub struct Publish {
+    /// Whether to serve mDS requests.
+    #[arg(
+        long = "publish.mds",
+        env = "QUILKIN_PUBLISH_MDS",
+        default_value_t = false
+    )]
+    mds_enabled: bool,
+    /// The TCP port to listen to serve xDS requests.
+    #[clap(
+        long = "publish.mds.port",
+        env = "QUILKIN_PUBLISH_MDS_PORT",
+        default_value_t = 7900
+    )]
+    mds_port: u16,
+    /// Whether to serve UDP requests.
+    #[arg(
+        long = "publish.phoenix",
+        env = "QUILKIN_PUBLISH_PHOENIX",
+        default_value_t = false
+    )]
+    phoenix_enabled: bool,
+    /// The UDP port to listen for UDP packets.
+    #[clap(
+        long = "publish.phoenix.port",
+        env = "QUILKIN_PUBLISH_PHOENIX_PORT",
+        default_value_t = 7600
+    )]
+    phoenix_port: u16,
+    /// Whether to serve UDP requests.
+    #[arg(
+        long = "publish.qcmp",
+        env = "QUILKIN_PUBLISH_QCMP",
+        default_value_t = false
+    )]
+    qcmp_enabled: bool,
+    /// The UDP port to listen for UDP packets.
+    #[clap(
+        long = "publish.qcmp.port",
+        env = "QUILKIN_PUBLISH_QCMP_PORT",
+        default_value_t = 7600
+    )]
+    qcmp_port: u16,
+    /// Whether to serve UDP requests.
+    #[arg(
+        long = "publish.udp",
+        env = "QUILKIN_PUBLISH_UDP",
+        default_value_t = false
+    )]
+    udp_enabled: bool,
+    /// The UDP port to listen for UDP packets.
+    #[clap(
+        long = "publish.udp.port",
+        env = "QUILKIN_PUBLISH_UDP_PORT",
+        default_value_t = 7777
+    )]
+    udp_port: u16,
+    #[clap(flatten)]
+    pub xdp: XdpOptions,
+    /// Amount of UDP workers to run.
+    #[clap(long = "publish.udp.workers", env = "QUILKIN_PUBLISH_UDP_WORKERS", default_value_t = std::num::NonZeroUsize::new(num_cpus::get()).unwrap())]
+    pub udp_workers: std::num::NonZeroUsize,
+    /// Whether to serve xDS requests.
+    #[arg(
+        long = "publish.xds",
+        env = "QUILKIN_PUBLISH_XDS",
+        default_value_t = false
+    )]
+    xds_enabled: bool,
+    /// The TCP port to listen to serve xDS requests.
+    #[clap(
+        long = "publish.xds.port",
+        env = "QUILKIN_PUBLISH_XDS_PORT",
+        default_value_t = 7800
+    )]
+    xds_port: u16,
+}
+
+impl Default for Publish {
+    fn default() -> Self {
+        Self {
+            mds_enabled: <_>::default(),
+            mds_port: 7900,
+            phoenix_enabled: <_>::default(),
+            phoenix_port: 7600,
+            qcmp_enabled: <_>::default(),
+            qcmp_port: 7600,
+            udp_enabled: <_>::default(),
+            udp_port: 7777,
+            udp_workers: std::num::NonZeroUsize::new(num_cpus::get()).unwrap(),
+            xds_enabled: <_>::default(),
+            xds_port: 7800,
+            xdp_opts: <_>::default(),
+        }
+    }
+}
+
+impl Publish {
+    /// Enables the UDP publisher.
+    pub fn udp(mut self) -> Self {
+        self.udp_enabled = true;
+        self
+    }
+
+    /// Sets the UDP publisher port.
+    pub fn udp_port(mut self, port: u16) -> Self {
+        self.udp_port = port;
+        self
+    }
+
+    /// Enables the QCMP publisher.
+    pub fn qcmp(mut self) -> Self {
+        self.qcmp_enabled = true;
+        self
+    }
+
+    /// Sets the UDP publisher port.
+    pub fn qcmp_port(mut self, port: u16) -> Self {
+        self.qcmp_port = port;
+        self
+    }
+
+    /// Enables the mDS publisher.
+    pub fn mds(mut self) -> Self {
+        self.mds_enabled = true;
+        self
+    }
+
+    /// Sets the mDS publisher port.
+    pub fn mds_port(mut self, port: u16) -> Self {
+        self.mds_port = port;
+        self
+    }
+
+    /// Enables the Phoenix publisher.
+    pub fn phoenix(mut self) -> Self {
+        self.phoenix_enabled = true;
+        self
+    }
+
+    /// Sets the Phoenix publisher port.
+    pub fn phoenix_port(mut self, port: u16) -> Self {
+        self.phoenix_port = port;
+        self
+    }
+
+    /// Enables the xDS publisher.
+    pub fn xds(mut self) -> Self {
+        self.xds_enabled = true;
+        self
+    }
+
+    /// Sets the xDS publisher port.
+    pub fn xds_port(mut self, port: u16) -> Self {
+        self.xds_port = port;
+        self
+    }
+
+    /// The main entrypoint for publishing network servers. When called will
+    /// spawn any and all enabled publishers, if successful returning a future
+    /// that can be await to wait on publishers to be cancelled.
+    pub fn spawn_publishers(
+        &self,
+        config: &Arc<Config>,
+        shutdown_rx: &crate::ShutdownRx,
+    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
+        let mds_task = self.publish_mds(config)?;
+        let phoenix_task = self.publish_phoenix(config, shutdown_rx)?;
+        let qcmp_task = self.publish_qcmp(shutdown_rx)?;
+        let (udp_task, finalizer) = self.publish_udp(config)?;
+        let xds_task = self.publish_xds(config)?;
+
+        let shutdown_rx = shutdown_rx.clone();
+        Ok(async move {
+            let result = tokio::select! {
+                result = mds_task => result,
+                result = phoenix_task => result,
+                result = qcmp_task => result,
+                result = udp_task => result,
+                result = xds_task => result,
+            };
+
+            if let Some(finalizer) = finalizer {
+                (finalizer)(shutdown_rx.clone());
+            }
+
+            result
+        })
+    }
+
+    /// Spawns an QCMP server if enabled, otherwise returns a future which never completes.
+    fn publish_phoenix(
+        &self,
+        config: &Arc<Config>,
+        shutdown_rx: &crate::ShutdownRx,
+    ) -> crate::Result<impl std::future::Future<Output = crate::Result<()>>> {
+        if self.phoenix_enabled {
+            let phoenix = crate::net::TcpListener::bind(Some(self.phoenix_port))?;
+            crate::net::phoenix::spawn(
+                phoenix,
+                config.clone(),
+                shutdown_rx.clone(),
+                crate::net::phoenix::Phoenix::new(crate::codec::qcmp::QcmpMeasurement::new()?),
+            )?
+        }
+
+        Ok(std::future::pending())
+    }
+
+    /// Spawns an QCMP server if enabled, otherwise returns a future which never completes.
+    fn publish_qcmp(
+        &self,
+        shutdown_rx: &crate::ShutdownRx,
+    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
+        Ok(if self.qcmp_enabled {
+            let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
+            either::Right(crate::codec::qcmp::spawn(qcmp, shutdown_rx.clone()))
+        } else {
+            either::Left(std::future::pending())
+        })
+    }
+
+    /// Spawns an xDS server if enabled, otherwise returns a future which never completes.
+    fn publish_mds(
+        &self,
+        config: &Arc<Config>,
+    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
+        if !self.mds_enabled {
+            return Ok(either::Left(std::future::pending()));
+        }
+
+        use futures::TryFutureExt as _;
+
+        let listener = crate::net::TcpListener::bind(Some(self.mds_port))?;
+
+        Ok(either::Right(
+            tokio::spawn(
+                crate::net::xds::server::ControlPlane::from_arc(
+                    config.clone(),
+                    crate::components::admin::IDLE_REQUEST_INTERVAL,
+                )
+                .relay_server(listener)?,
+            )
+            .map_err(From::from)
+            .and_then(std::future::ready),
+        ))
+    }
+
+    /// Spawns an xDS server if enabled, otherwise returns a future which never completes.
+    fn publish_xds(
+        &self,
+        config: &Arc<Config>,
+    ) -> crate::Result<impl Future<Output = crate::Result<()>>> {
+        if !self.xds_enabled {
+            return Ok(either::Left(std::future::pending()));
+        }
+
+        use futures::TryFutureExt as _;
+
+        let listener = crate::net::TcpListener::bind(Some(self.xds_port))?;
+
+        Ok(either::Right(
+            tokio::spawn(
+                crate::net::xds::server::ControlPlane::from_arc(
+                    config.clone(),
+                    crate::components::admin::IDLE_REQUEST_INTERVAL,
+                )
+                .management_server(listener)?,
+            )
+            .map_err(From::from)
+            .and_then(std::future::ready),
+        ))
+    }
+
+    #[allow(clippy::type_complexity)]
+    pub fn publish_udp(
+        &self,
+        config: &Arc<crate::config::Config>,
+    ) -> eyre::Result<(
+        impl Future<Output = crate::Result<()>>,
+        Option<Box<dyn FnOnce(crate::ShutdownRx) + Send>>,
+    )> {
+        if !self.udp_enabled {
+            return Ok((either::Left(std::future::pending()), None));
+        }
+
+        #[cfg(target_os = "linux")]
+        {
+            match self.spawn_xdp(config.clone(), self.xdp.force_xdp) {
+                Ok(xdp) => return Ok((either::Left(std::future::pending()), Some(xdp))),
+                Err(err) => {
+                    if self.xdp.force_xdp {
+                        return Err(err);
+                    }
+
+                    tracing::warn!(
+                        ?err,
+                        "failed to spawn XDP I/O loop, falling back to io-uring"
+                    );
+                }
+            }
+        }
+
+        self.spawn_user_space_router(config.clone())
+            .map(|(fut, func)| (either::Right(fut), Some(func)))
+    }
+
+    /// Launches the user space implementation of the packet router using
+    /// sockets. This implementation uses a pool of buffers and sockets to
+    /// manage UDP sessions and sockets. On Linux this will use io-uring, where
+    /// as it will use epoll interfaces on non-Linux platforms.
+    #[allow(clippy::type_complexity)]
+    pub fn spawn_user_space_router(
+        &self,
+        config: Arc<crate::config::Config>,
+    ) -> crate::Result<(
+        impl Future<Output = crate::Result<()>>,
+        Box<dyn FnOnce(crate::ShutdownRx) + Send>,
+    )> {
+        let socket = crate::net::raw_socket_with_reuse(self.udp_port)?;
+        let workers = self.udp_workers.get();
+        let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024));
+
+        let mut worker_sends = Vec::with_capacity(workers);
+        let mut session_sends = Vec::with_capacity(workers);
+        for _ in 0..workers {
+            let queue = crate::net::queue(15)?;
+            session_sends.push(queue.0.clone());
+            worker_sends.push(queue);
+        }
+
+        let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone());
+
+        Ok((
+            crate::components::proxy::packet_router::spawn_receivers(
+                config,
+                socket,
+                worker_sends,
+                sessions.clone(),
+                buffer_pool,
+            ),
+            Box::from(move |shutdown_rx: crate::ShutdownRx| {
+                sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal);
+            }),
+        ))
+    }
+
+    #[cfg(target_os = "linux")]
+    fn spawn_xdp(
+        &mut self,
+        config: Arc<crate::config::Config>,
+        force_xdp: bool,
+    ) -> eyre::Result<Box<dyn FnOnce(crate::ShutdownRx) + Send>> {
+        use crate::net::xdp;
+        use eyre::Context as _;
+
+        // TODO: remove this once it's been more stabilized
+        if !force_xdp {
+            eyre::bail!("XDP currently disabled by default");
+        }
+
+        let Some(external_port) = self.socket.as_ref().and_then(|s| {
+            s.local_addr()
+                .ok()
+                .and_then(|la| la.as_socket().map(|sa| sa.port()))
+        }) else {
+            eyre::bail!("unable to determine port");
+        };
+
+        let workers = xdp::setup_xdp_io(xdp::XdpConfig {
+            nic: self
+                .xdp
+                .network_interface
+                .as_deref()
+                .map_or(xdp::NicConfig::Default, xdp::NicConfig::Name),
+            external_port,
+            maximum_packet_memory: self.xdp.maximum_memory,
+            require_zero_copy: self.xdp.force_zerocopy,
+            require_tx_checksum: self.xdp.force_tx_checksum_offload,
+        })
+        .context("failed to setup XDP")?;
+
+        let io_loop = xdp::spawn(workers, config).context("failed to spawn XDP I/O loop")?;
+        Ok(Box::new(move |srx: crate::ShutdownRx| {
+            io_loop.shutdown(*srx.borrow() == crate::ShutdownKind::Normal);
+        }))
+    }
+}
+
+/// XDP (eXpress Data Path) options
+#[derive(clap::Args, Clone, Debug)]
+pub struct XdpOptions {
+    /// The name of the network interface to bind the XDP socket(s) to.
+    ///
+    /// If not specified quilkin will attempt to determine the most appropriate
+    /// network interface to use. Quilkin will exit with an error if the network
+    /// interface does not exist, or a suitable default cannot be determined.
+    #[clap(long = "publish.udp.xdp.network-interface")]
+    pub network_interface: Option<String>,
+    /// Forces the use of XDP.
+    ///
+    /// If XDP is not available on the chosen NIC, Quilkin exits with an error.
+    /// If false, io-uring will be used as the fallback implementation.
+    #[clap(long = "publish.udp.xdp")]
+    pub force_xdp: bool,
+    /// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags)
+    ///
+    /// If zero copy is not available on the chosen NIC, Quilkin exits with an error
+    #[clap(long = "publish.udp.xdp.zerocopy")]
+    pub force_zerocopy: bool,
+    /// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html)
+    ///
+    /// TX checksum offload is an optional feature allowing the data portion of
+    /// a packet to have its internet checksum calculation offloaded to the NIC,
+    /// as otherwise this is done in software
+    #[clap(long = "publish.udp.xdp.tco")]
+    pub force_tx_checksum_offload: bool,
+    /// The maximum amount of memory mapped for packet buffers, in bytes
+    ///
+    /// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time)
+    /// per NIC queue, ie 128MiB on a 32 queue NIC
+    #[clap(long = "publish.udp.xdp.memory-limit")]
+    pub maximum_memory: Option<u64>,
+}
+
+#[allow(clippy::derivable_impls)]
+impl Default for XdpOptions {
+    fn default() -> Self {
+        Self {
+            network_interface: None,
+            force_xdp: false,
+            force_zerocopy: false,
+            force_tx_checksum_offload: false,
+            maximum_memory: None,
+        }
+    }
+}
diff --git a/src/cli/relay.rs b/src/cli/relay.rs
index b146c7a341..74c183c4b0 100644
--- a/src/cli/relay.rs
+++ b/src/cli/relay.rs
@@ -19,7 +19,6 @@ use std::sync::Arc;
 use crate::{
     components::relay,
     config::{Config, Providers},
-    net::TcpListener,
 };
 pub use relay::Ready;
 
@@ -29,14 +28,8 @@ pub const PORT: u16 = 7900;
 /// (mDS) for accepting cluster and configuration information from xDS
 /// management services, and exposing it as a single merged xDS service for
 /// proxy services.
-#[derive(clap::Args, Clone, Debug)]
+#[derive(clap::Args, Clone, Debug, Default)]
 pub struct Relay {
-    /// Port for mDS service.
-    #[clap(short, long, env = "QUILKIN_MDS_PORT", default_value_t = PORT)]
-    pub mds_port: u16,
-    /// Port for xDS management_server service
-    #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = super::manage::PORT)]
-    pub xds_port: u16,
     /// The interval in seconds at which the relay will send a discovery request
     /// to an management server after receiving no updates.
     #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")]
@@ -45,17 +38,6 @@ pub struct Relay {
     pub providers: Option<Providers>,
 }
 
-impl Default for Relay {
-    fn default() -> Self {
-        Self {
-            mds_port: PORT,
-            xds_port: super::manage::PORT,
-            idle_request_interval_secs: None,
-            providers: None,
-        }
-    }
-}
-
 impl Relay {
     pub async fn run(
         self,
@@ -63,12 +45,7 @@ impl Relay {
         ready: Ready,
         shutdown_rx: crate::ShutdownRx,
     ) -> crate::Result<()> {
-        let xds_listener = TcpListener::bind(Some(self.xds_port))?;
-        let mds_listener = TcpListener::bind(Some(self.mds_port))?;
-
         relay::Relay {
-            xds_listener,
-            mds_listener,
             provider: self.providers,
         }
         .run(crate::components::RunArgs {
diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs
index 606728156f..c9232c9a70 100644
--- a/src/codec/qcmp.rs
+++ b/src/codec/qcmp.rs
@@ -192,7 +192,11 @@ impl Measurement for QcmpMeasurement {
     }
 }
 
-pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
+pub fn spawn(
+    socket: socket2::Socket,
+    mut shutdown_rx: crate::ShutdownRx,
+) -> impl std::future::Future<Output = crate::Result<()>> {
+    use futures::TryFutureExt as _;
     use tracing::{instrument::WithSubscriber as _, Instrument as _};
 
     let port = crate::net::socket_port(&socket);
@@ -258,9 +262,7 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra
         }
         .instrument(tracing::debug_span!("qcmp"))
         .with_current_subscriber(),
-    );
-
-    Ok(())
+    ).map_err(From::from)
 }
 
 /// The set of possible QCMP commands.
@@ -676,7 +678,7 @@ mod tests {
         let addr = socket.local_addr().unwrap().as_socket().unwrap();
 
         let (_tx, rx) = crate::make_shutdown_channel(Default::default());
-        spawn(socket, rx).unwrap();
+        let _task = spawn(socket, rx);
 
         let delay = Duration::from_millis(50);
         let node = QcmpMeasurement::with_artificial_delay(delay).unwrap();
diff --git a/src/components/agent.rs b/src/components/agent.rs
index 634a113b2a..fb9dd58013 100644
--- a/src/components/agent.rs
+++ b/src/components/agent.rs
@@ -40,7 +40,6 @@ impl Ready {
 
 pub struct Agent {
     pub locality: Option<Locality>,
-    pub qcmp_socket: socket2::Socket,
     pub icao_code: Option<IcaoCode>,
     pub relay_servers: Vec<tonic::transport::Endpoint>,
     pub provider: Option<Providers>,
@@ -58,15 +57,11 @@ impl Agent {
         }: RunArgs<Ready>,
     ) -> crate::Result<()> {
         {
-            let crate::config::DatacenterConfig::Agent {
-                icao_code,
-                qcmp_port,
-            } = &config.datacenter
+            let crate::config::DatacenterConfig::Agent { icao_code, .. } = &config.datacenter
             else {
                 unreachable!("this should be an agent config");
             };
 
-            qcmp_port.store(crate::net::socket_port(&self.qcmp_socket).into());
             icao_code.store(self.icao_code.unwrap_or_default().into());
         }
 
@@ -102,7 +97,6 @@ impl Agent {
             None
         };
 
-        crate::codec::qcmp::spawn(self.qcmp_socket, shutdown_rx.clone())?;
         shutdown_rx.changed().await.map_err(From::from)
     }
 }
diff --git a/src/components/manage.rs b/src/components/manage.rs
index b9811b5716..386fb5fb2d 100644
--- a/src/components/manage.rs
+++ b/src/components/manage.rs
@@ -25,7 +25,6 @@ pub struct Manage {
     pub locality: Option<Locality>,
     pub relay_servers: Vec<tonic::transport::Endpoint>,
     pub provider: Providers,
-    pub listener: crate::net::TcpListener,
     pub address_selector: Option<crate::config::AddressSelector>,
 }
 
@@ -73,19 +72,7 @@ impl Manage {
             None
         };
 
-        use futures::TryFutureExt as _;
-        let server_task = tokio::spawn(
-            crate::net::xds::server::ControlPlane::from_arc(
-                config,
-                crate::components::admin::IDLE_REQUEST_INTERVAL,
-            )
-            .management_server(self.listener)?,
-        )
-        .map_err(From::from)
-        .and_then(std::future::ready);
-
         tokio::select! {
-            result = server_task => result,
             result = provider_task => result?,
             result = shutdown_rx.changed() => result.map_err(From::from),
         }
diff --git a/src/components/proxy.rs b/src/components/proxy.rs
index f17903495f..e44848a605 100644
--- a/src/components/proxy.rs
+++ b/src/components/proxy.rs
@@ -62,42 +62,18 @@ pub struct ToTokens {
     pub length: usize,
 }
 
+#[derive(Default)]
 pub struct Proxy {
-    pub num_workers: std::num::NonZeroUsize,
     pub mmdb: Option<crate::net::maxmind_db::Source>,
     pub management_servers: Vec<tonic::transport::Endpoint>,
     pub to: Vec<SocketAddr>,
     pub to_tokens: Option<ToTokens>,
-    pub socket: Option<socket2::Socket>,
-    pub qcmp: socket2::Socket,
-    pub phoenix: crate::net::TcpListener,
     pub notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
-    pub xdp: crate::cli::proxy::XdpOptions,
-}
-
-impl Default for Proxy {
-    fn default() -> Self {
-        let qcmp = crate::net::raw_socket_with_reuse(0).unwrap();
-        let phoenix = crate::net::TcpListener::bind(Some(crate::net::socket_port(&qcmp))).unwrap();
-
-        Self {
-            num_workers: std::num::NonZeroUsize::new(1).unwrap(),
-            mmdb: None,
-            management_servers: Vec::new(),
-            to: Vec::new(),
-            to_tokens: None,
-            socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()),
-            qcmp,
-            phoenix,
-            notifier: None,
-            xdp: Default::default(),
-        }
-    }
 }
 
 impl Proxy {
     pub async fn run(
-        mut self,
+        self,
         RunArgs {
             config,
             ready,
@@ -270,15 +246,6 @@ impl Proxy {
                 .expect("failed to spawn proxy-subscription thread");
         }
 
-        let router_shutdown = self.spawn_packet_router(config.clone()).await?;
-        crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone())?;
-        crate::net::phoenix::spawn(
-            self.phoenix,
-            config.clone(),
-            shutdown_rx.clone(),
-            crate::net::phoenix::Phoenix::new(crate::codec::qcmp::QcmpMeasurement::new()?),
-        )?;
-
         tracing::info!("Quilkin is ready");
         if let Some(initialized) = initialized {
             let _ = initialized.send(());
@@ -289,110 +256,6 @@ impl Proxy {
             .await
             .map_err(|error| eyre::eyre!(error))?;
 
-        (router_shutdown)(shutdown_rx);
-
         Ok(())
     }
-
-    pub async fn spawn_packet_router(
-        &mut self,
-        config: Arc<crate::config::Config>,
-    ) -> eyre::Result<Box<dyn FnOnce(crate::ShutdownRx) + Send>> {
-        #[cfg(target_os = "linux")]
-        {
-            match self.spawn_xdp(config.clone(), self.xdp.force_xdp) {
-                Ok(xdp) => {
-                    return Ok(xdp);
-                }
-                Err(err) => {
-                    if self.xdp.force_xdp {
-                        return Err(err);
-                    }
-
-                    tracing::warn!(
-                        ?err,
-                        "failed to spawn XDP I/O loop, falling back to io-uring"
-                    );
-                }
-            }
-        }
-
-        self.spawn_user_space_router(config).await
-    }
-
-    /// Launches the user space implementation of the packet router using
-    /// sockets. This implementation uses a pool of buffers and sockets to
-    /// manage UDP sessions and sockets. On Linux this will use io-uring, where
-    /// as it will use epoll interfaces on non-Linux platforms.
-    pub async fn spawn_user_space_router(
-        &mut self,
-        config: Arc<crate::config::Config>,
-    ) -> eyre::Result<Box<dyn FnOnce(crate::ShutdownRx) + Send>> {
-        let workers = self.num_workers.get();
-        let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024));
-
-        let mut worker_sends = Vec::with_capacity(workers);
-        let mut session_sends = Vec::with_capacity(workers);
-        for _ in 0..workers {
-            let queue = crate::net::queue(15)?;
-            session_sends.push(queue.0.clone());
-            worker_sends.push(queue);
-        }
-
-        let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone());
-
-        packet_router::spawn_receivers(
-            config,
-            self.socket.take().unwrap(),
-            worker_sends,
-            &sessions,
-            buffer_pool,
-        )
-        .await?;
-
-        Ok(Box::new(move |shutdown_rx: crate::ShutdownRx| {
-            sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal);
-        }))
-    }
-
-    #[cfg(target_os = "linux")]
-    fn spawn_xdp(
-        &mut self,
-        config: Arc<crate::config::Config>,
-        force_xdp: bool,
-    ) -> eyre::Result<Box<dyn FnOnce(crate::ShutdownRx) + Send>> {
-        use crate::net::xdp;
-        use eyre::Context as _;
-
-        // TODO: remove this once it's been more stabilized
-        if !force_xdp {
-            eyre::bail!("XDP currently disabled by default");
-        }
-
-        let Some(external_port) = self.socket.as_ref().and_then(|s| {
-            s.local_addr()
-                .ok()
-                .and_then(|la| la.as_socket().map(|sa| sa.port()))
-        }) else {
-            eyre::bail!("unable to determine port");
-        };
-
-        let workers = xdp::setup_xdp_io(xdp::XdpConfig {
-            nic: self
-                .xdp
-                .network_interface
-                .as_deref()
-                .map_or(xdp::NicConfig::Default, xdp::NicConfig::Name),
-            external_port,
-            maximum_packet_memory: self.xdp.maximum_memory,
-            require_zero_copy: self.xdp.force_zerocopy,
-            require_tx_checksum: self.xdp.force_tx_checksum_offload,
-        })
-        .context("failed to setup XDP")?;
-
-        let io_loop = xdp::spawn(workers, config).context("failed to spawn XDP I/O loop")?;
-        Ok(Box::new(move |srx: crate::ShutdownRx| {
-            io_loop.shutdown(*srx.borrow() == crate::ShutdownKind::Normal);
-        }))
-    }
 }
diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs
index fcf72547ee..a10f87d706 100644
--- a/src/components/proxy/packet_router.rs
+++ b/src/components/proxy/packet_router.rs
@@ -161,7 +161,7 @@ pub async fn spawn_receivers(
     config: Arc<Config>,
     socket: socket2::Socket,
     worker_sends: Vec<crate::net::PacketQueue>,
-    sessions: &Arc<SessionPool>,
+    sessions: Arc<SessionPool>,
     buffer_pool: Arc<crate::collections::BufferPool>,
 ) -> crate::Result<()> {
     let port = crate::net::socket_port(&socket);
diff --git a/src/components/relay.rs b/src/components/relay.rs
index ff11bbca03..2b0a8a59ae 100644
--- a/src/components/relay.rs
+++ b/src/components/relay.rs
@@ -15,7 +15,7 @@
  */
 
 use super::RunArgs;
-use crate::{config::Providers, net::TcpListener};
+use crate::config::Providers;
 use std::sync::{
     atomic::{AtomicBool, Ordering},
     Arc,
@@ -44,8 +44,6 @@ impl Ready {
 }
 
 pub struct Relay {
-    pub xds_listener: TcpListener,
-    pub mds_listener: TcpListener,
     pub provider: Option<Providers>,
 }
 
@@ -59,15 +57,6 @@ impl Relay {
             mut shutdown_rx,
         }: RunArgs<Ready>,
     ) -> crate::Result<()> {
-        use crate::net::xds::server::ControlPlane;
-
-        let xds_server = ControlPlane::from_arc(config.clone(), ready.idle_request_interval)
-            .management_server(self.xds_listener)?;
-        let mds_server = tokio::spawn(
-            ControlPlane::from_arc(config.clone(), ready.idle_request_interval)
-                .relay_server(self.mds_listener)?,
-        );
-
         let _provider_task = self.provider.map(|provider| {
             let config = config.clone();
             let provider_is_healthy = ready.provider_is_healthy.clone();
@@ -135,12 +124,6 @@ impl Relay {
         });
 
         tokio::select! {
-            result = xds_server => {
-                result
-            }
-            result = mds_server => {
-                result?
-            }
             result = shutdown_rx.changed() => result.map_err(From::from),
         }
     }
diff --git a/src/net/phoenix.rs b/src/net/phoenix.rs
index 9bbf0053fc..eb7e1ca090 100644
--- a/src/net/phoenix.rs
+++ b/src/net/phoenix.rs
@@ -848,56 +848,59 @@ mod tests {
             },
         );
 
-        let (_tx, rx) = crate::make_shutdown_channel(Default::default());
-        let socket = raw_socket_with_reuse(qcmp_port).unwrap();
-        crate::codec::qcmp::spawn(socket, rx.clone()).unwrap();
-        tokio::time::sleep(Duration::from_millis(150)).await;
-
-        let measurement =
-            crate::codec::qcmp::QcmpMeasurement::with_artificial_delay(Duration::from_millis(50))
-                .unwrap();
-
-        let phoenix = Phoenix::builder(measurement)
-            .interval_range(Duration::from_millis(10)..Duration::from_millis(15))
-            .build();
-
-        super::spawn(qcmp_listener, config.clone(), rx, phoenix).unwrap();
-        tokio::time::sleep(Duration::from_millis(150)).await;
-
-        let client =
-            hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
-                .build_http::<http_body_util::Empty<bytes::Bytes>>();
-        use http_body_util::BodyExt;
-        for _ in 0..10 {
-            let resp = tokio::time::timeout(
-                Duration::from_millis(100),
-                client
-                    .get(format!("http://localhost:{qcmp_port}/").parse().unwrap())
-                    .await
-                    .unwrap()
-                    .into_body()
-                    .collect(),
+        tokio::spawn(async move {
+            let (_tx, rx) = crate::make_shutdown_channel(Default::default());
+            let socket = raw_socket_with_reuse(qcmp_port).unwrap();
+            let _qcmp_task = crate::codec::qcmp::spawn(socket, rx.clone());
+            tokio::time::sleep(Duration::from_millis(150)).await;
+
+            let measurement = crate::codec::qcmp::QcmpMeasurement::with_artificial_delay(
+                Duration::from_millis(50),
             )
-            .await
-            .unwrap()
-            .unwrap()
-            .to_bytes();
-
-            let map = serde_json::from_slice::<serde_json::Map<_, _>>(&resp).unwrap();
+            .unwrap();
 
-            let coords = Coordinates {
-                x: std::time::Duration::from_millis(50).as_nanos() as f64 / 2.0,
-                y: std::time::Duration::from_millis(1).as_nanos() as f64 / 2.0,
-            };
+            let phoenix = Phoenix::builder(measurement)
+                .interval_range(Duration::from_millis(10)..Duration::from_millis(15))
+                .build();
+
+            super::spawn(qcmp_listener, config.clone(), rx, phoenix).unwrap();
+            tokio::time::sleep(Duration::from_millis(150)).await;
+
+            let client =
+                hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
+                    .build_http::<http_body_util::Empty<bytes::Bytes>>();
+            use http_body_util::BodyExt;
+            for _ in 0..10 {
+                let resp = tokio::time::timeout(
+                    Duration::from_millis(100),
+                    client
+                        .get(format!("http://localhost:{qcmp_port}/").parse().unwrap())
+                        .await
+                        .unwrap()
+                        .into_body()
+                        .collect(),
+                )
+                .await
+                .unwrap()
+                .unwrap()
+                .to_bytes();
+
+                let map = serde_json::from_slice::<serde_json::Map<_, _>>(&resp).unwrap();
+
+                let coords = Coordinates {
+                    x: std::time::Duration::from_millis(50).as_nanos() as f64 / 2.0,
+                    y: std::time::Duration::from_millis(1).as_nanos() as f64 / 2.0,
+                };
 
-            let min = Coordinates::ORIGIN.distance_to(&coords);
-            let max = min * 3.0;
-            let distance = map[icao_code.as_ref()].as_f64().unwrap();
+                let min = Coordinates::ORIGIN.distance_to(&coords);
+                let max = min * 3.0;
+                let distance = map[icao_code.as_ref()].as_f64().unwrap();
 
-            assert!(
-                distance > min && distance < max,
-                "expected distance {distance} to be > {min} and < {max}",
-            );
-        }
+                assert!(
+                    distance > min && distance < max,
+                    "expected distance {distance} to be > {min} and < {max}",
+                );
+            }
+        });
     }
 }
diff --git a/src/test.rs b/src/test.rs
index 6841831d0f..dc5359e612 100644
--- a/src/test.rs
+++ b/src/test.rs
@@ -296,23 +296,24 @@ impl TestHelper {
             mode.server(config.clone(), address);
         }
 
-        let server = server.unwrap_or_else(|| {
-            let qcmp = crate::net::raw_socket_with_reuse(0).unwrap();
-            let phoenix = crate::net::TcpListener::bind(None).unwrap();
-
-            crate::components::proxy::Proxy {
-                num_workers: std::num::NonZeroUsize::new(1).unwrap(),
-                socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()),
-                qcmp,
-                phoenix,
-                ..Default::default()
-            }
+        let port = {
+            let socket = crate::net::raw_socket_with_reuse(0).unwrap();
+            crate::net::socket_port(&socket)
+        };
+
+        let publisher = crate::cli::Publish::default().udp().udp_port(port);
+        tokio::spawn(publisher.spawn_publishers(&config, &shutdown_rx).unwrap());
+
+        let server = server.unwrap_or_else(|| crate::components::proxy::Proxy {
+            mmdb: None,
+            management_servers: Vec::new(),
+            to: Vec::new(),
+            to_tokens: None,
+            notifier: None,
         });
 
         let (prox_tx, prox_rx) = tokio::sync::oneshot::channel();
 
-        let port = crate::net::socket_port(server.socket.as_ref().unwrap());
-
         tokio::spawn(async move {
             server
                 .run(
diff --git a/tests/capture.rs b/tests/capture.rs
index df3f132841..96532d48f8 100644
--- a/tests/capture.rs
+++ b/tests/capture.rs
@@ -31,68 +31,70 @@ use quilkin::{
 #[cfg_attr(target_os = "macos", ignore)]
 async fn token_router() {
     let mut t = TestHelper::default();
-    let mut echo = t.run_echo_server(AddressType::Random).await;
-    quilkin::test::map_to_localhost(&mut echo);
+    tokio::spawn(async move {
+        let mut echo = t.run_echo_server(AddressType::Random).await;
+        quilkin::test::map_to_localhost(&mut echo);
 
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([
-            Filter {
-                name: Capture::factory().name().into(),
-                label: None,
-                config: serde_json::from_value(serde_json::json!({
-                    "regex": {
-                        "pattern": ".{3}$"
-                    }
-                }))
-                .unwrap(),
-            },
-            Filter {
-                name: TokenRouter::factory().name().into(),
-                label: None,
-                config: None,
-            },
-        ])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([
+                Filter {
+                    name: Capture::factory().name().into(),
+                    label: None,
+                    config: serde_json::from_value(serde_json::json!({
+                        "regex": {
+                            "pattern": ".{3}$"
+                        }
+                    }))
+                    .unwrap(),
+                },
+                Filter {
+                    name: TokenRouter::factory().name().into(),
+                    label: None,
+                    config: None,
+                },
+            ])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
 
-    server_config.clusters.modify(|clusters| {
-        clusters.insert_default(
-            [Endpoint::with_metadata(
-                echo.clone(),
-                serde_json::from_value::<MetadataView<_>>(serde_json::json!({
-                    "quilkin.dev": {
-                        "tokens": ["YWJj"]
-                    }
-                }))
-                .unwrap(),
-            )]
-            .into(),
-        )
-    });
+        server_config.clusters.modify(|clusters| {
+            clusters.insert_default(
+                [Endpoint::with_metadata(
+                    echo.clone(),
+                    serde_json::from_value::<MetadataView<_>>(serde_json::json!({
+                        "quilkin.dev": {
+                            "tokens": ["YWJj"]
+                        }
+                    }))
+                    .unwrap(),
+                )]
+                .into(),
+            )
+        });
 
-    let server_port = t.run_server(server_config, None, None).await;
+        let server_port = t.run_server(server_config, None, None).await;
 
-    // valid packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+        // valid packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
 
-    let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port);
-    let msg = b"helloabc";
-    socket.send_to(msg, &local_addr).await.unwrap();
+        let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port);
+        let msg = b"helloabc";
+        socket.send_to(msg, &local_addr).await.unwrap();
 
-    assert_eq!(
-        "helloabc",
-        timeout(Duration::from_millis(500), recv_chan.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
+        assert_eq!(
+            "helloabc",
+            timeout(Duration::from_millis(500), recv_chan.recv())
+                .await
+                .expect("should have received a packet")
+                .unwrap()
+        );
 
-    // send an invalid packet
-    let msg = b"helloxyz";
-    socket.send_to(msg, &local_addr).await.unwrap();
+        // send an invalid packet
+        let msg = b"helloxyz";
+        socket.send_to(msg, &local_addr).await.unwrap();
 
-    let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
-    assert!(result.is_err(), "should not have received a packet");
+        let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
+        assert!(result.is_err(), "should not have received a packet");
+    });
 }
diff --git a/tests/compress.rs b/tests/compress.rs
index 0d0d20160a..a7751d2799 100644
--- a/tests/compress.rs
+++ b/tests/compress.rs
@@ -27,59 +27,61 @@ use quilkin::{
 #[cfg_attr(target_os = "macos", ignore)]
 async fn client_and_server() {
     let mut t = TestHelper::default();
-    let echo = t.run_echo_server(AddressType::Random).await;
+    tokio::spawn(async move {
+        let echo = t.run_echo_server(AddressType::Random).await;
 
-    // create server configuration as
-    let yaml = "
+        // create server configuration as
+        let yaml = "
 on_read: DECOMPRESS
 on_write: COMPRESS
 ";
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: Compress::factory().name().into(),
-            label: None,
-            config: serde_yaml::from_str(yaml).unwrap(),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-    // Run server proxy.
-    let server_port = t.run_server(server_config, None, None).await;
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: Compress::factory().name().into(),
+                label: None,
+                config: serde_yaml::from_str(yaml).unwrap(),
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+        // Run server proxy.
+        let server_port = t.run_server(server_config, None, None).await;
 
-    // create a local client
-    let yaml = "
+        // create a local client
+        let yaml = "
 on_read: COMPRESS
 on_write: DECOMPRESS
 ";
-    let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    client_config.clusters.modify(|clusters| {
-        clusters.insert_default([(std::net::Ipv6Addr::LOCALHOST, server_port).into()].into())
-    });
-    client_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: Compress::factory().name().into(),
-            label: None,
-            config: serde_yaml::from_str(yaml).unwrap(),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-    // Run client proxy.
-    let client_port = t.run_server(client_config, None, None).await;
+        let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        client_config.clusters.modify(|clusters| {
+            clusters.insert_default([(std::net::Ipv6Addr::LOCALHOST, server_port).into()].into())
+        });
+        client_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: Compress::factory().name().into(),
+                label: None,
+                config: serde_yaml::from_str(yaml).unwrap(),
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+        // Run client proxy.
+        let client_port = t.run_server(client_config, None, None).await;
 
-    // let's send the packet
-    let (mut rx, tx) = t.open_socket_and_recv_multiple_packets().await;
+        // let's send the packet
+        let (mut rx, tx) = t.open_socket_and_recv_multiple_packets().await;
 
-    tx.send_to(b"hello", (std::net::Ipv6Addr::LOCALHOST, client_port))
-        .await
-        .unwrap();
-    let expected = timeout(Duration::from_millis(500), rx.recv())
-        .await
-        .expect("should have received a packet")
-        .unwrap();
-    assert_eq!("hello", expected);
+        tx.send_to(b"hello", (std::net::Ipv6Addr::LOCALHOST, client_port))
+            .await
+            .unwrap();
+        let expected = timeout(Duration::from_millis(500), rx.recv())
+            .await
+            .expect("should have received a packet")
+            .unwrap();
+        assert_eq!("hello", expected);
+    });
 }
diff --git a/tests/concatenate.rs b/tests/concatenate.rs
index 258ab18e26..51e4e7378c 100644
--- a/tests/concatenate.rs
+++ b/tests/concatenate.rs
@@ -33,34 +33,37 @@ async fn concatenate() {
 on_read: APPEND
 bytes: YWJj #abc
 ";
-    let echo = t.run_echo_server(AddressType::Random).await;
 
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: Concatenate::factory().name().into(),
-            label: None,
-            config: serde_yaml::from_str(yaml).unwrap(),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-    let server_port = t.run_server(server_config, None, None).await;
+    tokio::spawn(async move {
+        let echo = t.run_echo_server(AddressType::Random).await;
 
-    // let's send the packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: Concatenate::factory().name().into(),
+                label: None,
+                config: serde_yaml::from_str(yaml).unwrap(),
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+        let server_port = t.run_server(server_config, None, None).await;
 
-    let local_addr = (Ipv4Addr::LOCALHOST, server_port);
-    socket.send_to(b"hello", &local_addr).await.unwrap();
+        // let's send the packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
 
-    assert_eq!(
-        "helloabc",
-        timeout(Duration::from_millis(250), recv_chan.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
+        let local_addr = (Ipv4Addr::LOCALHOST, server_port);
+        socket.send_to(b"hello", &local_addr).await.unwrap();
+
+        assert_eq!(
+            "helloabc",
+            timeout(Duration::from_millis(250), recv_chan.recv())
+                .await
+                .expect("should have received a packet")
+                .unwrap()
+        );
+    });
 }
diff --git a/tests/filter_order.rs b/tests/filter_order.rs
index b56fec61f2..616b16db33 100644
--- a/tests/filter_order.rs
+++ b/tests/filter_order.rs
@@ -45,62 +45,64 @@ on_read: COMPRESS
 on_write: DECOMPRESS
 ";
 
-    let mut echo = t
-        .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| {
-            assert!(
-                from_utf8(bytes).is_err(),
-                "Should be compressed, and therefore unable to be turned into a string"
-            );
-        })
-        .await;
-
-    quilkin::test::map_to_localhost(&mut echo);
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([
-            Filter {
-                name: Concatenate::factory().name().into(),
-                label: None,
-                config: serde_yaml::from_str(yaml_concat_read).unwrap(),
-            },
-            Filter {
-                name: Concatenate::factory().name().into(),
-                label: None,
-                config: serde_yaml::from_str(yaml_concat_write).unwrap(),
-            },
-            Filter {
-                name: Compress::factory().name().into(),
-                label: None,
-                config: serde_yaml::from_str(yaml_compress).unwrap(),
-            },
-        ])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-
-    let server_port = t.run_server(server_config, None, None).await;
-
-    // let's send the packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
-
-    let buf = b"hello".repeat(98);
-
-    let local_addr = (Ipv4Addr::LOCALHOST, server_port);
-    socket.send_to(&buf, &local_addr).await.unwrap();
-
-    let received = timeout(Duration::from_millis(500), recv_chan.recv())
-        .await
-        .expect("should have received a packet")
-        .unwrap();
-
-    let hellos = received
-        .strip_suffix("xyzabc")
-        .expect("expected appended data");
-
-    assert_eq!(&buf, hellos.as_bytes());
+    tokio::spawn(async move {
+        let mut echo = t
+            .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| {
+                assert!(
+                    from_utf8(bytes).is_err(),
+                    "Should be compressed, and therefore unable to be turned into a string"
+                );
+            })
+            .await;
+
+        quilkin::test::map_to_localhost(&mut echo);
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([
+                Filter {
+                    name: Concatenate::factory().name().into(),
+                    label: None,
+                    config: serde_yaml::from_str(yaml_concat_read).unwrap(),
+                },
+                Filter {
+                    name: Concatenate::factory().name().into(),
+                    label: None,
+                    config: serde_yaml::from_str(yaml_concat_write).unwrap(),
+                },
+                Filter {
+                    name: Compress::factory().name().into(),
+                    label: None,
+                    config: serde_yaml::from_str(yaml_compress).unwrap(),
+                },
+            ])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+
+        let server_port = t.run_server(server_config, None, None).await;
+
+        // let's send the packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+
+        let buf = b"hello".repeat(98);
+
+        let local_addr = (Ipv4Addr::LOCALHOST, server_port);
+        socket.send_to(&buf, &local_addr).await.unwrap();
+
+        let received = timeout(Duration::from_millis(500), recv_chan.recv())
+            .await
+            .expect("should have received a packet")
+            .unwrap();
+
+        let hellos = received
+            .strip_suffix("xyzabc")
+            .expect("expected appended data");
+
+        assert_eq!(&buf, hellos.as_bytes());
+    });
 }
 
 #[tokio::test]
@@ -123,40 +125,42 @@ async fn multiple_mutations() {
 
     let filters: Vec<Filter> = serde_yaml::from_str(filters).unwrap();
 
-    let mut t = TestHelper::default();
-    let mut echo = t
-        .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| {
-            assert_eq!(b"hello", bytes);
-        })
-        .await;
-
-    quilkin::test::map_to_localhost(&mut echo);
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create(filters)
-            .map(std::sync::Arc::new)
-            .unwrap(),
-    );
-
-    let server_port = t.run_server(server_config, None, None).await;
-
-    // let's send the packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
-
-    let local_addr = (Ipv4Addr::LOCALHOST, server_port);
-
-    socket
-        .send_to(b"helloxxxxxxxxxxxxxxxx6", &local_addr)
-        .await
-        .unwrap();
-
-    let received = timeout(Duration::from_millis(500), recv_chan.recv())
-        .await
-        .expect("should have received a packet")
-        .unwrap();
-
-    assert_eq!(b"hello", received.as_bytes());
+    tokio::spawn(async move {
+        let mut t = TestHelper::default();
+        let mut echo = t
+            .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| {
+                assert_eq!(b"hello", bytes);
+            })
+            .await;
+
+        quilkin::test::map_to_localhost(&mut echo);
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create(filters)
+                .map(std::sync::Arc::new)
+                .unwrap(),
+        );
+
+        let server_port = t.run_server(server_config, None, None).await;
+
+        // let's send the packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+
+        let local_addr = (Ipv4Addr::LOCALHOST, server_port);
+
+        socket
+            .send_to(b"helloxxxxxxxxxxxxxxxx6", &local_addr)
+            .await
+            .unwrap();
+
+        let received = timeout(Duration::from_millis(500), recv_chan.recv())
+            .await
+            .expect("should have received a packet")
+            .unwrap();
+
+        assert_eq!(b"hello", received.as_bytes());
+    });
 }
diff --git a/tests/filters.rs b/tests/filters.rs
index 26bd9a2ed1..bd85effece 100644
--- a/tests/filters.rs
+++ b/tests/filters.rs
@@ -33,77 +33,79 @@ async fn test_filter() {
     let mut t = TestHelper::default();
     load_test_filters();
 
-    // create an echo server as an endpoint.
-    let echo = t.run_echo_server(AddressType::Random).await;
-
-    // create server configuration
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: "TestFilter".to_string(),
-            label: None,
-            config: None,
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-
-    let server_port = t.run_server(server_config, None, None).await;
-
-    // create a local client
-    let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    client_config.clusters.modify(|clusters| {
-        clusters.insert_default(
-            [Endpoint::new(
-                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(),
-            )]
-            .into(),
-        )
+    tokio::spawn(async move {
+        // create an echo server as an endpoint.
+        let echo = t.run_echo_server(AddressType::Random).await;
+
+        // create server configuration
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: "TestFilter".to_string(),
+                label: None,
+                config: None,
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+
+        let server_port = t.run_server(server_config, None, None).await;
+
+        // create a local client
+        let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        client_config.clusters.modify(|clusters| {
+            clusters.insert_default(
+                [Endpoint::new(
+                    SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(),
+                )]
+                .into(),
+            )
+        });
+        client_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: "TestFilter".to_string(),
+                label: None,
+                config: None,
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+
+        // Run client proxy.
+        let client_port = t.run_server(client_config, None, None).await;
+
+        // let's send the packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+
+        // game_client
+        let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port);
+        tracing::info!(address = %local_addr, "Sending hello");
+        socket.send_to(b"hello", &local_addr).await.unwrap();
+
+        let result = timeout(Duration::from_secs(5), recv_chan.recv())
+            .await
+            .unwrap()
+            .unwrap();
+        // since we don't know the ephemeral ip addresses in use, we'll search for
+        // substrings for the results we expect that the TestFilter will inject in
+        // the round-tripped packets.
+        assert_eq!(
+            2,
+            result.matches("odr").count(),
+            "Should be 2 read calls in {}",
+            result
+        );
+        assert_eq!(
+            2,
+            result.matches("our").count(),
+            "Should be 2 write calls in {}",
+            result
+        );
     });
-    client_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: "TestFilter".to_string(),
-            label: None,
-            config: None,
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-
-    // Run client proxy.
-    let client_port = t.run_server(client_config, None, None).await;
-
-    // let's send the packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
-
-    // game_client
-    let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port);
-    tracing::info!(address = %local_addr, "Sending hello");
-    socket.send_to(b"hello", &local_addr).await.unwrap();
-
-    let result = timeout(Duration::from_secs(5), recv_chan.recv())
-        .await
-        .unwrap()
-        .unwrap();
-    // since we don't know the ephemeral ip addresses in use, we'll search for
-    // substrings for the results we expect that the TestFilter will inject in
-    // the round-tripped packets.
-    assert_eq!(
-        2,
-        result.matches("odr").count(),
-        "Should be 2 read calls in {}",
-        result
-    );
-    assert_eq!(
-        2,
-        result.matches("our").count(),
-        "Should be 2 write calls in {}",
-        result
-    );
 }
 
 #[tokio::test]
@@ -114,60 +116,62 @@ async fn debug_filter() {
     // handy for grabbing the configuration name
     let factory = Debug::factory();
 
-    // create an echo server as an endpoint.
-    let echo = t.run_echo_server(AddressType::Random).await;
-
-    tracing::trace!(%echo, "running echo server");
-    // create server configuration
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: factory.name().into(),
-            label: None,
-            config: Some(serde_json::json!({ "id":  "server", })),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-
-    let server_port = t.run_server(server_config, None, None).await;
-
-    // create a local client
-    let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    client_config.clusters.modify(|clusters| {
-        clusters.insert_default(
-            [Endpoint::new(
-                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(),
-            )]
-            .into(),
-        )
+    tokio::spawn(async move {
+        // create an echo server as an endpoint.
+        let echo = t.run_echo_server(AddressType::Random).await;
+
+        tracing::trace!(%echo, "running echo server");
+        // create server configuration
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: factory.name().into(),
+                label: None,
+                config: Some(serde_json::json!({ "id":  "server", })),
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+
+        let server_port = t.run_server(server_config, None, None).await;
+
+        // create a local client
+        let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        client_config.clusters.modify(|clusters| {
+            clusters.insert_default(
+                [Endpoint::new(
+                    SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(),
+                )]
+                .into(),
+            )
+        });
+        client_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: factory.name().into(),
+                label: None,
+                config: Some(serde_json::json!({ "id":  "client" })),
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+        let client_port = t.run_server(client_config, None, None).await;
+
+        // let's send the packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+
+        // game client
+        let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port);
+        tracing::info!(address = %local_addr, "Sending hello");
+        socket.send_to(b"hello", &local_addr).await.unwrap();
+
+        // since the debug filter doesn't change the data, it should be exactly the same
+        let value = timeout(Duration::from_millis(500), recv_chan.recv())
+            .await
+            .unwrap()
+            .unwrap();
+        assert_eq!("hello", value);
     });
-    client_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: factory.name().into(),
-            label: None,
-            config: Some(serde_json::json!({ "id":  "client" })),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-    let client_port = t.run_server(client_config, None, None).await;
-
-    // let's send the packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
-
-    // game client
-    let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port);
-    tracing::info!(address = %local_addr, "Sending hello");
-    socket.send_to(b"hello", &local_addr).await.unwrap();
-
-    // since the debug filter doesn't change the data, it should be exactly the same
-    let value = timeout(Duration::from_millis(500), recv_chan.recv())
-        .await
-        .unwrap()
-        .unwrap();
-    assert_eq!("hello", value);
 }
diff --git a/tests/firewall.rs b/tests/firewall.rs
deleted file mode 100644
index 62f2edca0c..0000000000
--- a/tests/firewall.rs
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Copyright 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-use tokio::{
-    sync::mpsc,
-    time::{timeout, Duration},
-};
-
-use quilkin::{
-    config::Filter,
-    filters::{Firewall, StaticFilter},
-    net::endpoint::Endpoint,
-    test::{AddressType, TestHelper},
-};
-
-#[tokio::test]
-#[cfg_attr(target_os = "macos", ignore)]
-async fn ipv4_firewall_allow() {
-    let mut t = TestHelper::default();
-    let yaml = "
-on_read:
-  - action: ALLOW
-    sources:
-      - 127.0.0.1/32
-    ports:
-      - %1
-on_write:
-  - action: ALLOW
-    sources:
-      - 127.0.0.0/24
-    ports:
-      - %2
-";
-    let mut rx = test(&mut t, yaml, AddressType::Ipv4).await;
-
-    assert_eq!(
-        "hello",
-        timeout(Duration::from_millis(500), rx.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
-}
-
-#[tokio::test]
-#[cfg_attr(target_os = "macos", ignore)]
-async fn ipv6_firewall_allow() {
-    let mut t = TestHelper::default();
-    let yaml = "
-on_read:
-  - action: ALLOW
-    sources:
-      - ::1/128
-    ports:
-      - %1
-on_write:
-  - action: ALLOW
-    sources:
-      - ::1/64
-    ports:
-      - %2
-";
-    let mut rx = test(&mut t, yaml, AddressType::Ipv6).await;
-
-    assert_eq!(
-        "hello",
-        timeout(Duration::from_millis(500), rx.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
-}
-
-#[tokio::test]
-async fn ipv4_firewall_read_deny() {
-    let mut t = TestHelper::default();
-    let yaml = "
-on_read:
-  - action: DENY
-    sources:
-      - 127.0.0.1/32
-    ports:
-      - %1
-on_write:
-  - action: ALLOW
-    sources:
-      - 127.0.0.0/24
-    ports:
-      - %2
-";
-    let mut rx = test(&mut t, yaml, AddressType::Ipv4).await;
-
-    let result = timeout(Duration::from_millis(500), rx.recv()).await;
-    assert!(result.is_err(), "should not have received a packet");
-}
-
-#[tokio::test]
-async fn ipv6_firewall_read_deny() {
-    let mut t = TestHelper::default();
-    let yaml = "
-on_read:
-  - action: DENY
-    sources:
-       - ::1/128
-    ports:
-       - %1
-on_write:
-  - action: ALLOW
-    sources:
-      - ::1/64
-    ports:
-      - %2
-";
-    let mut rx = test(&mut t, yaml, AddressType::Ipv6).await;
-
-    let result = timeout(Duration::from_millis(500), rx.recv()).await;
-    assert!(result.is_err(), "should not have received a packet");
-}
-
-#[tokio::test]
-async fn ipv4_firewall_write_deny() {
-    let mut t = TestHelper::default();
-    let yaml = "
-on_read:
-  - action: ALLOW
-    sources:
-      - 127.0.0.1/32
-    ports:
-      - %1
-on_write:
-  - action: DENY
-    sources:
-      - 127.0.0.0/24
-    ports:
-      - %2
-";
-    let mut rx = test(&mut t, yaml, AddressType::Ipv4).await;
-
-    let result = timeout(Duration::from_millis(500), rx.recv()).await;
-    assert!(result.is_err(), "should not have received a packet");
-}
-
-#[tokio::test]
-async fn ipv6_firewall_write_deny() {
-    let mut t = TestHelper::default();
-
-    let yaml = "
-on_read:
-  - action: ALLOW
-    sources:
-      - ::1/128
-    ports:
-      - %1
-on_write:
-  - action: DENY
-    sources: 
-      - ::1/64
-    ports:
-      - %2
-";
-    let mut rx = test(&mut t, yaml, AddressType::Ipv6).await;
-
-    let result = timeout(Duration::from_millis(500), rx.recv()).await;
-    assert!(result.is_err(), "should not have received a packet");
-}
-
-async fn test(t: &mut TestHelper, yaml: &str, address_type: AddressType) -> mpsc::Receiver<String> {
-    let echo = t.run_echo_server(address_type).await;
-
-    let (rx, socket) = match address_type {
-        AddressType::Ipv4 => t.open_ipv4_socket_and_recv_multiple_packets().await,
-        AddressType::Ipv6 => t.open_socket_and_recv_multiple_packets().await,
-        AddressType::Random => unreachable!(),
-    };
-
-    let client_addr = match address_type {
-        AddressType::Ipv4 => socket.local_addr().unwrap(),
-        AddressType::Ipv6 => socket.local_addr().unwrap(),
-        AddressType::Random => unreachable!(),
-    };
-
-    let yaml = yaml
-        .replace("%1", client_addr.port().to_string().as_str())
-        .replace("%2", echo.port().to_string().as_str());
-    tracing::info!(config = yaml.as_str(), "Config");
-
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: Firewall::factory().name().into(),
-            label: None,
-            config: serde_yaml::from_str(yaml.as_str()).unwrap(),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-
-    let proxy_socket =
-        quilkin::net::raw_socket_with_reuse_and_address(address_type.into()).unwrap();
-    let local_addr = proxy_socket.local_addr().unwrap().as_socket().unwrap();
-
-    let proxy = quilkin::components::proxy::Proxy {
-        socket: Some(proxy_socket),
-        ..Default::default()
-    };
-
-    t.run_server(server_config, Some(proxy), None).await;
-
-    tracing::info!(source = %client_addr, address = %local_addr, "Sending hello");
-    socket.send_to(b"hello", local_addr).await.unwrap();
-    rx
-}
diff --git a/tests/health.rs b/tests/health.rs
index 1ce9ac1f87..9a91afb994 100644
--- a/tests/health.rs
+++ b/tests/health.rs
@@ -30,34 +30,38 @@ async fn health_server() {
     server_config.clusters.modify(|clusters| {
         clusters.insert_default(["127.0.0.1:0".parse::<Endpoint>().unwrap()].into())
     });
-    t.run_server(
-        server_config,
-        None,
-        Some(Some((std::net::Ipv6Addr::UNSPECIFIED, 9093).into())),
-    )
-    .await;
-    tokio::time::sleep(std::time::Duration::from_millis(250)).await;
-
-    let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
-        .build_http::<http_body_util::Empty<bytes::Bytes>>();
-    use http_body_util::BodyExt;
-    let resp = client
-        .get(Uri::from_static(LIVE_ADDRESS))
-        .await
-        .unwrap()
-        .into_body()
-        .collect()
-        .await
-        .unwrap()
-        .to_bytes()
-        .to_vec();
-
-    assert_eq!("ok", String::from_utf8(resp).unwrap());
-
-    let _ = panic::catch_unwind(|| {
-        panic!("oh no!");
-    });
 
-    let resp = client.get(Uri::from_static(LIVE_ADDRESS)).await.unwrap();
-    assert!(resp.status().is_server_error(), "Should be unhealthy");
+    tokio::spawn(async move {
+        t.run_server(
+            server_config,
+            None,
+            Some(Some((std::net::Ipv6Addr::UNSPECIFIED, 9093).into())),
+        )
+        .await;
+        tokio::time::sleep(std::time::Duration::from_millis(250)).await;
+
+        let client =
+            hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
+                .build_http::<http_body_util::Empty<bytes::Bytes>>();
+        use http_body_util::BodyExt;
+        let resp = client
+            .get(Uri::from_static(LIVE_ADDRESS))
+            .await
+            .unwrap()
+            .into_body()
+            .collect()
+            .await
+            .unwrap()
+            .to_bytes()
+            .to_vec();
+
+        assert_eq!("ok", String::from_utf8(resp).unwrap());
+
+        let _ = panic::catch_unwind(|| {
+            panic!("oh no!");
+        });
+
+        let resp = client.get(Uri::from_static(LIVE_ADDRESS)).await.unwrap();
+        assert!(resp.status().is_server_error(), "Should be unhealthy");
+    });
 }
diff --git a/tests/load_balancer.rs b/tests/load_balancer.rs
index 4f1c3755ad..8ecdedf4fa 100644
--- a/tests/load_balancer.rs
+++ b/tests/load_balancer.rs
@@ -36,47 +36,49 @@ policy: ROUND_ROBIN
 ";
     let selected_endpoint = Arc::new(Mutex::new(None::<SocketAddr>));
 
-    let mut echo_addresses = std::collections::BTreeSet::new();
-    for _ in 0..2 {
-        let selected_endpoint = selected_endpoint.clone();
-        echo_addresses.insert(
-            t.run_echo_server_with_tap(AddressType::Random, move |_, _, echo_addr| {
-                let _ = selected_endpoint.lock().unwrap().replace(echo_addr);
-            })
-            .await,
-        );
-    }
+    tokio::spawn(async move {
+        let mut echo_addresses = std::collections::BTreeSet::new();
+        for _ in 0..2 {
+            let selected_endpoint = selected_endpoint.clone();
+            echo_addresses.insert(
+                t.run_echo_server_with_tap(AddressType::Random, move |_, _, echo_addr| {
+                    let _ = selected_endpoint.lock().unwrap().replace(echo_addr);
+                })
+                .await,
+            );
+        }
 
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config.clusters.modify(|clusters| {
-        clusters.insert_default(echo_addresses.iter().cloned().map(Endpoint::new).collect())
-    });
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: LoadBalancer::factory().name().into(),
-            label: None,
-            config: serde_yaml::from_str(yaml).unwrap(),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config.clusters.modify(|clusters| {
+            clusters.insert_default(echo_addresses.iter().cloned().map(Endpoint::new).collect())
+        });
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: LoadBalancer::factory().name().into(),
+                label: None,
+                config: serde_yaml::from_str(yaml).unwrap(),
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
 
-    let server_port = t.run_server(server_config, None, None).await;
-    let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port);
+        let server_port = t.run_server(server_config, None, None).await;
+        let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port);
 
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
 
-    for addr in echo_addresses {
-        socket.send_to(b"hello", &server_addr).await.unwrap();
-        let value = timeout(Duration::from_secs(5), recv_chan.recv())
-            .await
-            .unwrap()
-            .unwrap();
-        assert_eq!("hello", value);
+        for addr in echo_addresses {
+            socket.send_to(b"hello", &server_addr).await.unwrap();
+            let value = timeout(Duration::from_secs(5), recv_chan.recv())
+                .await
+                .unwrap()
+                .unwrap();
+            assert_eq!("hello", value);
 
-        assert_eq!(
-            addr,
-            selected_endpoint.lock().unwrap().take().unwrap().into()
-        );
-    }
+            assert_eq!(
+                addr,
+                selected_endpoint.lock().unwrap().take().unwrap().into()
+            );
+        }
+    });
 }
diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs
index 205b3a423b..a7ce7e125d 100644
--- a/tests/local_rate_limit.rs
+++ b/tests/local_rate_limit.rs
@@ -34,47 +34,50 @@ async fn local_rate_limit_filter() {
 max_packets: 2
 period: 1
 ";
-    let echo = t.run_echo_server(AddressType::Random).await;
 
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([Filter {
-            name: LocalRateLimit::factory().name().into(),
-            label: None,
-            config: serde_yaml::from_str(yaml).unwrap(),
-        }])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-    tracing::trace!("spawning server");
-    let server_port = t.run_server(server_config, None, None).await;
-    let server_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, server_port));
+    tokio::spawn(async move {
+        let echo = t.run_echo_server(AddressType::Random).await;
 
-    let msg = "hello";
-    let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await;
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([Filter {
+                name: LocalRateLimit::factory().name().into(),
+                label: None,
+                config: serde_yaml::from_str(yaml).unwrap(),
+            }])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+        tracing::trace!("spawning server");
+        let server_port = t.run_server(server_config, None, None).await;
+        let server_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, server_port));
 
-    for _ in 0..3 {
-        tracing::trace!(%server_addr, %msg, "sending");
-        socket.send_to(msg.as_bytes(), &server_addr).await.unwrap();
-    }
+        let msg = "hello";
+        let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await;
 
-    for _ in 0..2 {
-        assert_eq!(
-            msg,
-            timeout(Duration::from_millis(500), rx.recv())
-                .await
-                .unwrap()
-                .unwrap()
-        );
-    }
+        for _ in 0..3 {
+            tracing::trace!(%server_addr, %msg, "sending");
+            socket.send_to(msg.as_bytes(), &server_addr).await.unwrap();
+        }
+
+        for _ in 0..2 {
+            assert_eq!(
+                msg,
+                timeout(Duration::from_millis(500), rx.recv())
+                    .await
+                    .unwrap()
+                    .unwrap()
+            );
+        }
 
-    // Allow enough time to have received any response.
-    tokio::time::sleep(Duration::from_millis(100)).await;
-    // Check that we do not get any response.
-    assert!(timeout(Duration::from_millis(500), rx.recv())
-        .await
-        .is_err());
+        // Allow enough time to have received any response.
+        tokio::time::sleep(Duration::from_millis(100)).await;
+        // Check that we do not get any response.
+        assert!(timeout(Duration::from_millis(500), rx.recv())
+            .await
+            .is_err());
+    });
 }
diff --git a/tests/match.rs b/tests/match.rs
index d131f97c1b..196fbd16e9 100644
--- a/tests/match.rs
+++ b/tests/match.rs
@@ -29,15 +29,16 @@ use quilkin::{
 #[cfg_attr(target_os = "macos", ignore)]
 async fn r#match() {
     let mut t = TestHelper::default();
-    let echo = t.run_echo_server(AddressType::Random).await;
+    tokio::spawn(async move {
+        let echo = t.run_echo_server(AddressType::Random).await;
 
-    let capture_yaml = "
+        let capture_yaml = "
 suffix:
     size: 3
     remove: true
-";
+    ";
 
-    let matches_yaml = "
+        let matches_yaml = "
 on_read:
     metadataKey: quilkin.dev/capture
     fallthrough:
@@ -56,80 +57,81 @@ on_read:
           config:
             on_read: APPEND
             bytes: YWJj # abc
-";
-
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    server_config.filters.store(
-        quilkin::filters::FilterChain::try_create([
-            Filter {
-                name: Capture::NAME.into(),
-                label: None,
-                config: serde_yaml::from_str(capture_yaml).unwrap(),
-            },
-            Filter {
-                name: Match::NAME.into(),
-                label: None,
-                config: serde_yaml::from_str(matches_yaml).unwrap(),
-            },
-        ])
-        .map(std::sync::Arc::new)
-        .unwrap(),
-    );
-
-    let server_port = t.run_server(server_config, None, None).await;
-
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
-
-    let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port);
-
-    // abc packet
-    let msg = b"helloabc";
-    socket.send_to(msg, &local_addr).await.unwrap();
-
-    assert_eq!(
-        "helloxyz",
-        timeout(Duration::from_millis(500), recv_chan.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
-
-    // send an xyz packet
-    let msg = b"helloxyz";
-    socket.send_to(msg, &local_addr).await.unwrap();
-
-    assert_eq!(
-        "helloabc",
-        timeout(Duration::from_millis(500), recv_chan.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
-
-    // fallthrough packet
-    let msg = b"hellodef";
-    socket.send_to(msg, &local_addr).await.unwrap();
-
-    assert_eq!(
-        "hellodef",
-        timeout(Duration::from_millis(500), recv_chan.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
-
-    // second fallthrough packet
-    let msg = b"hellofgh";
-    socket.send_to(msg, &local_addr).await.unwrap();
-
-    assert_eq!(
-        "hellodef",
-        timeout(Duration::from_millis(500), recv_chan.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
+            ";
+
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        server_config.filters.store(
+            quilkin::filters::FilterChain::try_create([
+                Filter {
+                    name: Capture::NAME.into(),
+                    label: None,
+                    config: serde_yaml::from_str(capture_yaml).unwrap(),
+                },
+                Filter {
+                    name: Match::NAME.into(),
+                    label: None,
+                    config: serde_yaml::from_str(matches_yaml).unwrap(),
+                },
+            ])
+            .map(std::sync::Arc::new)
+            .unwrap(),
+        );
+
+        let server_port = t.run_server(server_config, None, None).await;
+
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+
+        let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port);
+
+        // abc packet
+        let msg = b"helloabc";
+        socket.send_to(msg, &local_addr).await.unwrap();
+
+        assert_eq!(
+            "helloxyz",
+            timeout(Duration::from_millis(500), recv_chan.recv())
+                .await
+                .expect("should have received a packet")
+                .unwrap()
+        );
+
+        // send an xyz packet
+        let msg = b"helloxyz";
+        socket.send_to(msg, &local_addr).await.unwrap();
+
+        assert_eq!(
+            "helloabc",
+            timeout(Duration::from_millis(500), recv_chan.recv())
+                .await
+                .expect("should have received a packet")
+                .unwrap()
+        );
+
+        // fallthrough packet
+        let msg = b"hellodef";
+        socket.send_to(msg, &local_addr).await.unwrap();
+
+        assert_eq!(
+            "hellodef",
+            timeout(Duration::from_millis(500), recv_chan.recv())
+                .await
+                .expect("should have received a packet")
+                .unwrap()
+        );
+
+        // second fallthrough packet
+        let msg = b"hellofgh";
+        socket.send_to(msg, &local_addr).await.unwrap();
+
+        assert_eq!(
+            "hellodef",
+            timeout(Duration::from_millis(500), recv_chan.recv())
+                .await
+                .expect("should have received a packet")
+                .unwrap()
+        );
+    });
 }
diff --git a/tests/metrics.rs b/tests/metrics.rs
index 9a3b3eb91b..85e4c97017 100644
--- a/tests/metrics.rs
+++ b/tests/metrics.rs
@@ -26,70 +26,75 @@ use quilkin::{
 async fn metrics_server() {
     let mut t = TestHelper::default();
 
-    // create an echo server as an endpoint.
-    let echo = t.run_echo_server(AddressType::Random).await;
-    let metrics_port = quilkin::test::available_addr(AddressType::Random)
-        .await
-        .port();
+    tokio::spawn(async move {
+        // create an echo server as an endpoint.
+        let echo = t.run_echo_server(AddressType::Random).await;
+        let metrics_port = quilkin::test::available_addr(AddressType::Random)
+            .await
+            .port();
 
-    // create server configuration
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config
-        .clusters
-        .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
-    let server_port = t
-        .run_server(
-            server_config,
-            None,
-            Some(Some((std::net::Ipv4Addr::UNSPECIFIED, metrics_port).into())),
-        )
-        .await;
+        // create server configuration
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config
+            .clusters
+            .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into()));
+        let server_port = t
+            .run_server(
+                server_config,
+                None,
+                Some(Some((std::net::Ipv4Addr::UNSPECIFIED, metrics_port).into())),
+            )
+            .await;
 
-    // create a local client
-    let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    client_config.clusters.modify(|clusters| {
-        clusters.insert_default(
-            [Endpoint::new(
-                (std::net::Ipv6Addr::LOCALHOST, server_port).into(),
-            )]
-            .into(),
-        )
-    });
-    let client_port = t.run_server(client_config, None, None).await;
+        // create a local client
+        let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        client_config.clusters.modify(|clusters| {
+            clusters.insert_default(
+                [Endpoint::new(
+                    (std::net::Ipv6Addr::LOCALHOST, server_port).into(),
+                )]
+                .into(),
+            )
+        });
+        let client_port = t.run_server(client_config, None, None).await;
 
-    // let's send the packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+        // let's send the packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
 
-    // game_client
-    let local_addr = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, client_port));
-    tracing::info!(address = %local_addr, "Sending hello");
-    socket.send_to(b"hello", &local_addr).await.unwrap();
+        // game_client
+        let local_addr = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, client_port));
+        tracing::info!(address = %local_addr, "Sending hello");
+        socket.send_to(b"hello", &local_addr).await.unwrap();
 
-    let _ = tokio::time::timeout(std::time::Duration::from_millis(100), recv_chan.recv())
-        .await
-        .unwrap()
-        .unwrap();
+        let _ = tokio::time::timeout(std::time::Duration::from_millis(100), recv_chan.recv())
+            .await
+            .unwrap()
+            .unwrap();
 
-    let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
-        .build_http::<http_body_util::Empty<bytes::Bytes>>();
-    use http_body_util::BodyExt;
-    let resp = client
-        .get(
-            format!("http://localhost:{metrics_port}/metrics")
-                .parse()
-                .unwrap(),
-        )
-        .await
-        .unwrap()
-        .into_body()
-        .collect()
-        .await
-        .unwrap()
-        .to_bytes();
+        let client =
+            hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
+                .build_http::<http_body_util::Empty<bytes::Bytes>>();
+        use http_body_util::BodyExt;
+        let resp = client
+            .get(
+                format!("http://localhost:{metrics_port}/metrics")
+                    .parse()
+                    .unwrap(),
+            )
+            .await
+            .unwrap()
+            .into_body()
+            .collect()
+            .await
+            .unwrap()
+            .to_bytes();
 
-    let response = String::from_utf8(resp.to_vec()).unwrap();
-    let read_regex = regex::Regex::new(r#"quilkin_packets_total\{.*event="read".*\} 2"#).unwrap();
-    let write_regex = regex::Regex::new(r#"quilkin_packets_total\{.*event="write".*\} 2"#).unwrap();
-    assert!(read_regex.is_match(&response));
-    assert!(write_regex.is_match(&response));
+        let response = String::from_utf8(resp.to_vec()).unwrap();
+        let read_regex =
+            regex::Regex::new(r#"quilkin_packets_total\{.*event="read".*\} 2"#).unwrap();
+        let write_regex =
+            regex::Regex::new(r#"quilkin_packets_total\{.*event="write".*\} 2"#).unwrap();
+        assert!(read_regex.is_match(&response));
+        assert!(write_regex.is_match(&response));
+    });
 }
diff --git a/tests/no_filter.rs b/tests/no_filter.rs
index e159c890a4..2cadc2b69c 100644
--- a/tests/no_filter.rs
+++ b/tests/no_filter.rs
@@ -27,42 +27,44 @@ use quilkin::{
 async fn echo() {
     let mut t = TestHelper::default();
 
-    // create two echo servers as endpoints
-    let server1 = t.run_echo_server(AddressType::Random).await;
-    let server2 = t.run_echo_server(AddressType::Random).await;
+    tokio::spawn(async move {
+        // create two echo servers as endpoints
+        let server1 = t.run_echo_server(AddressType::Random).await;
+        let server2 = t.run_echo_server(AddressType::Random).await;
 
-    // create server configuration
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    server_config.clusters.modify(|clusters| {
-        clusters.insert_default(
-            [
-                Endpoint::new(server1.clone()),
-                Endpoint::new(server2.clone()),
-            ]
-            .into(),
-        )
-    });
+        // create server configuration
+        let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
+        server_config.clusters.modify(|clusters| {
+            clusters.insert_default(
+                [
+                    Endpoint::new(server1.clone()),
+                    Endpoint::new(server2.clone()),
+                ]
+                .into(),
+            )
+        });
 
-    let local_port = t.run_server(server_config, None, None).await;
-    let local_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, local_port));
+        let local_port = t.run_server(server_config, None, None).await;
+        let local_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, local_port));
 
-    // let's send the packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+        // let's send the packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
 
-    socket.send_to(b"hello", &local_addr).await.unwrap();
-    let value = timeout(Duration::from_millis(500), recv_chan.recv())
-        .await
-        .unwrap()
-        .unwrap();
-    assert_eq!("hello", value);
-    let value = timeout(Duration::from_millis(500), recv_chan.recv())
-        .await
-        .unwrap()
-        .unwrap();
-    assert_eq!("hello", value);
+        socket.send_to(b"hello", &local_addr).await.unwrap();
+        let value = timeout(Duration::from_millis(500), recv_chan.recv())
+            .await
+            .unwrap()
+            .unwrap();
+        assert_eq!("hello", value);
+        let value = timeout(Duration::from_millis(500), recv_chan.recv())
+            .await
+            .unwrap()
+            .unwrap();
+        assert_eq!("hello", value);
 
-    // should only be two returned items
-    assert!(timeout(Duration::from_millis(500), recv_chan.recv())
-        .await
-        .is_err());
+        // should only be two returned items
+        assert!(timeout(Duration::from_millis(500), recv_chan.recv())
+            .await
+            .is_err());
+    });
 }
diff --git a/tests/qcmp.rs b/tests/qcmp.rs
index ca089e4ab3..9ad1fff380 100644
--- a/tests/qcmp.rs
+++ b/tests/qcmp.rs
@@ -18,24 +18,15 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 
 use tokio::time::Duration;
 
-use quilkin::{
-    codec::qcmp::Protocol,
-    test::{AddressType, TestHelper},
-};
+use quilkin::{codec::qcmp::Protocol, test::AddressType};
 
 #[tokio::test]
 #[cfg_attr(target_os = "macos", ignore)]
 async fn proxy_ping() {
-    let mut t = TestHelper::default();
     let qcmp = quilkin::net::raw_socket_with_reuse(0).unwrap();
     let qcmp_port = quilkin::net::socket_port(&qcmp);
-    let server_proxy = quilkin::components::proxy::Proxy {
-        qcmp,
-        to: vec![(Ipv4Addr::UNSPECIFIED, 0).into()],
-        ..<_>::default()
-    };
-    let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
-    t.run_server(server_config, Some(server_proxy), None).await;
+    let (_tx, rx) = quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);
+    let _task = quilkin::codec::qcmp::spawn(qcmp, rx);
     ping(qcmp_port).await;
 }
 
@@ -45,18 +36,15 @@ async fn agent_ping() {
     let qcmp_port = quilkin::test::available_addr(AddressType::Random)
         .await
         .port();
-    let agent = quilkin::cli::Agent {
-        qcmp_port,
-        ..<_>::default()
-    };
     let server_config = std::sync::Arc::new(quilkin::Config::default_agent());
     let (_tx, rx) = quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);
-    tokio::spawn(async move {
-        agent
-            .run(None, server_config, Default::default(), rx)
-            .await
-            .expect("Agent should run")
-    });
+    tokio::spawn(
+        quilkin::cli::Publish::default()
+            .qcmp()
+            .qcmp_port(qcmp_port)
+            .spawn_publishers(&server_config, &rx)
+            .unwrap(),
+    );
     ping(qcmp_port).await;
 }
 
diff --git a/tests/token_router.rs b/tests/token_router.rs
index a4a91a1dc0..4e2b1139b7 100644
--- a/tests/token_router.rs
+++ b/tests/token_router.rs
@@ -31,30 +31,32 @@ use tokio::time::{timeout, Duration};
 async fn token_router() {
     let mut t = TestHelper::default();
 
-    let local_addr = echo_server(&mut t).await;
-
-    // valid packet
-    let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
-
-    let msg = b"helloabc";
-    tracing::trace!(%local_addr, "sending echo packet");
-    socket.send_to(msg, &local_addr).await.unwrap();
-
-    tracing::trace!("awaiting echo packet");
-    assert_eq!(
-        "hello",
-        timeout(Duration::from_millis(500), recv_chan.recv())
-            .await
-            .expect("should have received a packet")
-            .unwrap()
-    );
-
-    // send an invalid packet
-    let msg = b"helloxyz";
-    socket.send_to(msg, &local_addr).await.unwrap();
-
-    let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
-    assert!(result.is_err(), "should not have received a packet");
+    tokio::spawn(async move {
+        let local_addr = echo_server(&mut t).await;
+
+        // valid packet
+        let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;
+
+        let msg = b"helloabc";
+        tracing::trace!(%local_addr, "sending echo packet");
+        socket.send_to(msg, &local_addr).await.unwrap();
+
+        tracing::trace!("awaiting echo packet");
+        assert_eq!(
+            "hello",
+            timeout(Duration::from_millis(500), recv_chan.recv())
+                .await
+                .expect("should have received a packet")
+                .unwrap()
+        );
+
+        // send an invalid packet
+        let msg = b"helloxyz";
+        socket.send_to(msg, &local_addr).await.unwrap();
+
+        let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
+        assert!(result.is_err(), "should not have received a packet");
+    });
 }
 
 // This test covers the scenario in https://github.com/googleforgames/quilkin/issues/988
@@ -64,59 +66,61 @@ async fn token_router() {
 async fn multiple_clients() {
     let limit = 10_000;
     let mut t = TestHelper::default();
-    let local_addr = echo_server(&mut t).await;
-
-    let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await;
-    let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await;
-
-    tokio::spawn(async move {
-        // some room to breath
-        tokio::time::sleep(Duration::from_millis(50)).await;
-        for _ in 0..limit {
-            a_socket.send_to(b"Aabc", &local_addr).await.unwrap();
-            tokio::time::sleep(Duration::from_nanos(5)).await;
-        }
-    });
     tokio::spawn(async move {
-        // some room to breath
-        tokio::time::sleep(Duration::from_millis(50)).await;
-        for _ in 0..limit {
-            b_socket.send_to(b"Babc", &local_addr).await.unwrap();
-            tokio::time::sleep(Duration::from_nanos(5)).await;
-        }
-    });
+        let local_addr = echo_server(&mut t).await;
 
-    let mut success = 0;
-    let mut failed = 0;
-    for _ in 0..limit {
-        match timeout(Duration::from_millis(60), a_rx.recv()).await {
-            Ok(packet) => {
-                assert_eq!("A", packet.unwrap());
-                success += 1;
+        let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await;
+        let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await;
+
+        tokio::spawn(async move {
+            // some room to breath
+            tokio::time::sleep(Duration::from_millis(50)).await;
+            for _ in 0..limit {
+                a_socket.send_to(b"Aabc", &local_addr).await.unwrap();
+                tokio::time::sleep(Duration::from_nanos(5)).await;
             }
-            Err(_) => {
-                failed += 1;
+        });
+        tokio::spawn(async move {
+            // some room to breath
+            tokio::time::sleep(Duration::from_millis(50)).await;
+            for _ in 0..limit {
+                b_socket.send_to(b"Babc", &local_addr).await.unwrap();
+                tokio::time::sleep(Duration::from_nanos(5)).await;
             }
-        }
-        match timeout(Duration::from_millis(60), b_rx.recv()).await {
-            Ok(packet) => {
-                assert_eq!("B", packet.unwrap());
-                success += 1;
+        });
+
+        let mut success = 0;
+        let mut failed = 0;
+        for _ in 0..limit {
+            match timeout(Duration::from_millis(60), a_rx.recv()).await {
+                Ok(packet) => {
+                    assert_eq!("A", packet.unwrap());
+                    success += 1;
+                }
+                Err(_) => {
+                    failed += 1;
+                }
             }
-            Err(_) => {
-                failed += 1;
+            match timeout(Duration::from_millis(60), b_rx.recv()).await {
+                Ok(packet) => {
+                    assert_eq!("B", packet.unwrap());
+                    success += 1;
+                }
+                Err(_) => {
+                    failed += 1;
+                }
             }
         }
-    }
-
-    // allow for some dropped packets, since UDP.
-    let threshold = 0.95 * (2 * limit) as f64;
-    assert!(
-        success as f64 > threshold,
-        "Success: {}, Failed: {}",
-        success,
-        failed
-    );
+
+        // allow for some dropped packets, since UDP.
+        let threshold = 0.95 * (2 * limit) as f64;
+        assert!(
+            success as f64 > threshold,
+            "Success: {}, Failed: {}",
+            success,
+            failed
+        );
+    });
 }
 
 // start an echo server and return what port it's on.