diff --git a/Cargo.lock b/Cargo.lock index 770eacd99..e4194e493 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,12 +136,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" -[[package]] -name = "assert_matches" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" - [[package]] name = "async-broadcast" version = "0.7.1" @@ -173,7 +167,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -184,7 +178,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -246,61 +240,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "aya" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d18bc4e506fbb85ab7392ed993a7db4d1a452c71b75a246af4a80ab8c9d2dd50" -dependencies = [ - "assert_matches", - "aya-obj", - "bitflags 2.6.0", - "bytes", - "libc", - "log", - "object 0.36.7", - "once_cell", - "thiserror 1.0.69", - "tokio", -] - -[[package]] -name = "aya-log" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b600d806c1d07d3b81ab5f4a2a95fd80f479a0d3f1d68f29064d660865f85f02" -dependencies = [ - "aya", - "aya-log-common", - "bytes", - "log", - "thiserror 1.0.69", - "tokio", -] - -[[package]] -name = "aya-log-common" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "befef9fe882e63164a2ba0161874e954648a72b0e1c4b361f532d590638c4eec" -dependencies = [ - "num_enum", -] - -[[package]] -name = "aya-obj" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c51b96c5a8ed8705b40d655273bc4212cbbf38d4e3be2788f36306f154523ec7" -dependencies = [ - "bytes", - "core-error", - "hashbrown 0.15.1", - "log", - "object 0.36.7", - "thiserror 1.0.69", -] - [[package]] name = "backoff" version = "0.4.0" @@ -323,7 +262,7 @@ dependencies = [ "cfg-if", "libc", "miniz_oxide", - "object 0.32.2", + "object", "rustc-demangle", ] @@ -456,7 +395,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -498,15 +437,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "core-error" -version = "0.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efcdb2972eb64230b4c50646d8498ff73f5128d196a90c7236eec4cbe8619b8f" -dependencies = [ - "version_check", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -596,7 +526,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.89", + "syn", ] [[package]] @@ -607,7 +537,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -679,7 +609,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -704,7 +634,7 @@ checksum = "1b4464d46ce68bfc7cb76389248c7c254def7baca8bece0693b02b83842c4c88" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -722,7 +652,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -746,7 +676,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -766,7 +696,7 @@ checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -786,7 +716,7 @@ checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -798,7 +728,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -1000,7 +930,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -1527,7 +1457,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -1843,7 +1773,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.89", + "syn", ] [[package]] @@ -2090,31 +2020,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "neli" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1100229e06604150b3becd61a4965d5c70f3be1759544ea7274166f4be41ef43" -dependencies = [ - "byteorder", - "libc", - "log", - "neli-proc-macros", -] - -[[package]] -name = "neli-proc-macros" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c168194d373b1e134786274020dae7fc5513d565ea2ebb9bc9ff17ffb69106d4" -dependencies = [ - "either", - "proc-macro2", - "quote", - "serde", - "syn 1.0.109", -] - [[package]] name = "nix" version = "0.27.1" @@ -2198,26 +2103,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_enum" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" -dependencies = [ - "num_enum_derive", -] - -[[package]] -name = "num_enum_derive" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.89", -] - [[package]] name = "object" version = "0.32.2" @@ -2227,18 +2112,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "object" -version = "0.36.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" -dependencies = [ - "crc32fast", - "hashbrown 0.15.1", - "indexmap 2.6.0", - "memchr", -] - [[package]] name = "once_cell" version = "1.20.2" @@ -2268,7 +2141,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -2380,7 +2253,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -2421,7 +2294,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -2498,7 +2371,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.89", + "syn", ] [[package]] @@ -2551,7 +2424,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.89", + "syn", "tempfile", ] @@ -2565,7 +2438,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -2664,7 +2537,6 @@ dependencies = [ "prost-types", "quilkin-macros", "quilkin-proto", - "quilkin-xdp", "quilkin-xds", "rand", "regex", @@ -2705,7 +2577,7 @@ version = "0.10.0-dev" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -2717,18 +2589,6 @@ dependencies = [ "tonic", ] -[[package]] -name = "quilkin-xdp" -version = "0.1.0" -dependencies = [ - "aya", - "aya-log", - "libc", - "thiserror 2.0.3", - "tracing", - "xdp", -] - [[package]] name = "quilkin-xds" version = "0.10.0-dev" @@ -3069,7 +2929,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.89", + "syn", ] [[package]] @@ -3159,7 +3019,7 @@ checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3170,7 +3030,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3240,7 +3100,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3384,7 +3244,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.89", + "syn", ] [[package]] @@ -3416,17 +3276,6 @@ dependencies = [ "symbolic-common", ] -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.89" @@ -3458,7 +3307,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3520,7 +3369,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3531,7 +3380,7 @@ checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3615,7 +3464,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3782,7 +3631,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -3867,7 +3716,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" dependencies = [ "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -4021,7 +3870,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.89", + "syn", "wasm-bindgen-shared", ] @@ -4043,7 +3892,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4289,16 +4138,6 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" -[[package]] -name = "xdp" -version = "0.0.1" -source = "git+https://github.com/Jake-Shadle/xdp?branch=impl#4ec459e29fd1fdf38f85a7d48b64d3ae2f6098de" -dependencies = [ - "libc", - "memmap2", - "neli", -] - [[package]] name = "xxhash-rust" version = "0.8.12" @@ -4331,7 +4170,7 @@ checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", "synstructure", ] @@ -4353,7 +4192,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] [[package]] @@ -4373,7 +4212,7 @@ checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", "synstructure", ] @@ -4402,5 +4241,5 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn", ] diff --git a/bacon.toml b/bacon.toml new file mode 100644 index 000000000..3e91915a8 --- /dev/null +++ b/bacon.toml @@ -0,0 +1,111 @@ +# This is a configuration file for the bacon tool +# +# Complete help on configuration: https://dystroy.org/bacon/config/ +# +# You may check the current default at +# https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml + +default_job = "clippy-all" +env.CARGO_TERM_COLOR = "always" + +[jobs.check] +command = ["cargo", "check"] +need_stdout = false + +[jobs.check-all] +command = ["cargo", "check", "--all-targets"] +need_stdout = false + +# Run clippy on the default target +[jobs.clippy] +command = ["cargo", "clippy"] +need_stdout = false + +# Run clippy on all targets +# To disable some lints, you may change the job this way: +# [jobs.clippy-all] +# command = [ +# "cargo", "clippy", +# "--all-targets", +# "--", +# "-A", "clippy::bool_to_int_with_if", +# "-A", "clippy::collapsible_if", +# "-A", "clippy::derive_partial_eq_without_eq", +# ] +# need_stdout = false +[jobs.clippy-all] +command = ["cargo", "clippy", "--all-targets", "--workspace"] +need_stdout = false + +# This job lets you run +# - all tests: bacon test +# - a specific test: bacon test -- config::test_default_files +# - the tests of a package: bacon test -- -- -p config +[jobs.test] +command = ["cargo", "test"] +need_stdout = true + +[jobs.nextest] +command = [ + "cargo", "nextest", "run", + "--hide-progress-bar", "--failure-output", "final" +] +need_stdout = true +analyzer = "nextest" + +[jobs.doc] +command = ["cargo", "doc", "--no-deps"] +need_stdout = false + +# If the doc compiles, then it opens in your browser and bacon switches +# to the previous job +[jobs.doc-open] +command = ["cargo", "doc", "--no-deps", "--open"] +need_stdout = false +on_success = "back" # so that we don't open the browser at each change + +# You can run your application and have the result displayed in bacon, +# if it makes sense for this crate. +[jobs.run] +command = [ + "cargo", "run", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = true + +# Run your long-running application (eg server) and have the result displayed in bacon. +# For programs that never stop (eg a server), `background` is set to false +# to have the cargo run output immediately displayed instead of waiting for +# program's end. +# 'on_change_strategy' is set to `kill_then_restart` to have your program restart +# on every change (an alternative would be to use the 'F5' key manually in bacon). +# If you often use this job, it makes sense to override the 'r' key by adding +# a binding `r = job:run-long` at the end of this file . +[jobs.run-long] +command = [ + "cargo", "run", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = false +on_change_strategy = "kill_then_restart" + +# This parameterized job runs the example of your choice, as soon +# as the code compiles. +# Call it as +# bacon ex -- my-example +[jobs.ex] +command = ["cargo", "run", "--example"] +need_stdout = true +allow_warnings = true + +# You may define here keybindings that would be specific to +# a project, for example a shortcut to launch a specific job. +# Shortcuts to internal functions (scrolling, toggling, etc.) +# should go in your personal global prefs.toml file instead. +[keybindings] +# alt-m = "job:my-job" +c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target diff --git a/benches/shared.rs b/benches/shared.rs index e6748c19a..f52fc33b6 100644 --- a/benches/shared.rs +++ b/benches/shared.rs @@ -330,22 +330,15 @@ impl QuilkinLoop { .insert_default([quilkin::net::endpoint::Endpoint::new(endpoint.into())].into()) }); - let proxy = quilkin::cli::Proxy { - port, - qcmp_port: runtime - .block_on(quilkin::test::available_addr( - quilkin::test::AddressType::Random, - )) - .port(), - ..<_>::default() - }; - - runtime.block_on(async move { - proxy - .run(config, Default::default(), None, shutdown_rx) - .await - .unwrap(); - }); + runtime + .block_on( + quilkin::cli::Publish::default() + .udp() + .udp_port(port) + .spawn_publishers(&config, &shutdown_rx) + .unwrap(), + ) + .unwrap(); }); Self { diff --git a/crates/agones/src/lib.rs b/crates/agones/src/lib.rs index 136d53d8a..7a4a33d37 100644 --- a/crates/agones/src/lib.rs +++ b/crates/agones/src/lib.rs @@ -318,6 +318,8 @@ pub async fn quilkin_proxy_deployment( let mut container = quilkin_container( client, Some(vec![ + "--publish.udp".into(), + "--publish.qcmp".into(), "proxy".into(), format!("--management-server={management_server}"), ]), diff --git a/crates/agones/src/relay.rs b/crates/agones/src/relay.rs index b8c3626d0..865b64076 100644 --- a/crates/agones/src/relay.rs +++ b/crates/agones/src/relay.rs @@ -263,6 +263,8 @@ mod tests { // Setup the relay let args = [ + "--publish.mds", + "--publish.xds", "relay", "agones", "--config-namespace", @@ -347,6 +349,7 @@ mod tests { // agent deployment let args = [ + "--publish.qcmp", "agent", "--relay", &format!("http://{relay_name}:7900"), diff --git a/crates/agones/src/sidecar.rs b/crates/agones/src/sidecar.rs index 4ccf82ba0..35a9e040e 100644 --- a/crates/agones/src/sidecar.rs +++ b/crates/agones/src/sidecar.rs @@ -16,7 +16,9 @@ #[cfg(test)] mod tests { - use crate::{game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client}; + use crate::{ + debug_pods, game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client, + }; use k8s_openapi::api::core::v1::{ConfigMap, ConfigMapVolumeSource, Volume}; use kube::{api::PostParams, runtime::wait::await_condition, Api, ResourceExt}; use quilkin::{config::providers::k8s::agones::GameServer, test::TestHelper}; @@ -98,7 +100,7 @@ clusters: let mount_name = "config".to_string(); template.containers.push(quilkin_container( &client, - Some(vec!["proxy".into()]), + Some(vec!["--publish.udp".into(), "proxy".into()]), Some(mount_name.clone()), true, )); @@ -133,6 +135,8 @@ clusters: .await .expect("should receive packet") .unwrap(); + debug_pods(&client, "role=proxy".into()).await; + debug_pods(&client, "{name}".into()).await; assert_eq!("ACK: hellosidecar\n", response); } } diff --git a/crates/nmap-service-probes/src/lib.rs b/crates/nmap-service-probes/src/lib.rs index fbbeb0c7a..ce48f1124 100644 --- a/crates/nmap-service-probes/src/lib.rs +++ b/crates/nmap-service-probes/src/lib.rs @@ -209,6 +209,7 @@ pub enum MatchKind { mod tests { use super::*; + use alloc::vec; use pretty_assertions::assert_eq; #[test] diff --git a/crates/nmap-service-probes/src/parser.rs b/crates/nmap-service-probes/src/parser.rs index 72d6ebdc7..d7678c242 100644 --- a/crates/nmap-service-probes/src/parser.rs +++ b/crates/nmap-service-probes/src/parser.rs @@ -531,6 +531,7 @@ pub fn fallback<'i, E: ParserError> + AddContext, StrConte #[cfg(test)] mod tests { use super::*; + use alloc::vec; #[test] fn exclude() -> PResult<()> { diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs index 89431db51..eae4d0bd7 100644 --- a/crates/test/src/lib.rs +++ b/crates/test/src/lib.rs @@ -7,7 +7,7 @@ use quilkin::{ Config, ShutdownTx, }; pub use serde_json::json; -use std::{net::SocketAddr, num::NonZeroUsize, path::PathBuf, sync::Arc}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tokio::sync::mpsc; pub static BUFFER_POOL: once_cell::sync::Lazy> = @@ -75,7 +75,9 @@ macro_rules! trace_test { let _guard = init_logging($crate::Level::DEBUG, mname); - $body + tokio::spawn(async move { + $body + }); } }; } @@ -307,11 +309,8 @@ impl Pail { PailConfig::Relay(rpc) => { use components::relay; - let xds_listener = TcpListener::bind(None).unwrap(); - let mds_listener = TcpListener::bind(None).unwrap(); - - let xds_port = xds_listener.port(); - let mds_port = mds_listener.port(); + let xds_port = TcpListener::bind(None).unwrap().port(); + let mds_port = TcpListener::bind(None).unwrap().port(); let path = td.join(spc.name); let mut tc = rpc.config.unwrap_or_default(); @@ -326,10 +325,17 @@ impl Pail { let config = Arc::new(Config::default_non_agent()); config.id.store(Arc::new(spc.name.into())); + let _task = tokio::spawn( + quilkin::cli::Publish::default() + .xds() + .xds_port(xds_port) + .mds() + .mds_port(mds_port) + .spawn_publishers(&config, &shutdown_rx) + .unwrap(), + ); let task = tokio::spawn( relay::Relay { - xds_listener, - mds_listener, provider: Some(Providers::File { path }), } .run(RunArgs { @@ -392,21 +398,28 @@ impl Pail { let (shutdown, shutdown_rx) = quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing); - let qcmp_socket = - quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"); - let qcmp_port = quilkin::net::socket_port(&qcmp_socket); + let qcmp_port = quilkin::net::socket_port( + &quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"), + ); let config_path = path.clone(); let config = Arc::new(Config::default_agent()); config.id.store(Arc::new(spc.name.into())); let acfg = config.clone(); + let _task = tokio::spawn( + quilkin::cli::Publish::default() + .qcmp() + .qcmp_port(qcmp_port) + .spawn_publishers(&config, &shutdown_rx) + .unwrap(), + ); + let task = tokio::spawn(async move { components::agent::Agent { locality: None, icao_code: Some(apc.icao_code), relay_servers, - qcmp_socket, provider: Some(Providers::File { path }), address_selector: None, } @@ -430,14 +443,18 @@ impl Pail { }) } PailConfig::Proxy(ppc) => { - let socket = quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket"); - let qcmp = - quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"); - let qcmp_port = quilkin::net::socket_port(&qcmp); - let phoenix = TcpListener::bind(None).expect("failed to bind phoenix socket"); - let phoenix_port = phoenix.port(); - - let port = quilkin::net::socket_port(&socket); + let port = { + let socket = + quilkin::net::raw_socket_with_reuse(0).expect("failed to bind socket"); + quilkin::net::socket_port(&socket) + }; + + let qcmp_port = quilkin::net::socket_port( + &quilkin::net::raw_socket_with_reuse(0).expect("failed to bind qcmp socket"), + ); + let phoenix_port = TcpListener::bind(None) + .expect("failed to bind phoenix socket") + .port(); let management_servers = spc .dependencies @@ -496,13 +513,20 @@ impl Pail { let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel(); + let _task = tokio::spawn( + quilkin::cli::Publish::default() + .udp() + .udp_port(port) + .qcmp() + .qcmp_port(qcmp_port) + .phoenix() + .phoenix_port(phoenix_port) + .spawn_publishers(&config, &shutdown_rx) + .unwrap(), + ); let task = tokio::spawn(async move { components::proxy::Proxy { - num_workers: NonZeroUsize::new(1).unwrap(), management_servers, - socket: Some(socket), - qcmp, - phoenix, notifier: Some(rttx), ..Default::default() } diff --git a/crates/test/tests/proxy.rs b/crates/test/tests/proxy.rs index b3dddea8f..b125afefd 100644 --- a/crates/test/tests/proxy.rs +++ b/crates/test/tests/proxy.rs @@ -158,7 +158,7 @@ trace_test!( config, socket, pending_sends, - &sessions, + sessions, BUFFER_POOL.clone(), ) .await diff --git a/crates/xdp/Cargo.toml b/crates/xdp/Cargo.toml index eec23d412..75d5fcd1d 100644 --- a/crates/xdp/Cargo.toml +++ b/crates/xdp/Cargo.toml @@ -12,7 +12,7 @@ aya-log = "0.2.1" libc.workspace = true thiserror.workspace = true tracing.workspace = true -xdp = { git = "https://github.com/Jake-Shadle/xdp", branch = "impl" } +xdp = { git = "https://github.com/Jake-Shadle/xdp" } #xdp = { path = "../../../xdp" } [lints] diff --git a/crates/xds/src/server.rs b/crates/xds/src/server.rs index 8a4b12565..480c3896e 100644 --- a/crates/xds/src/server.rs +++ b/crates/xds/src/server.rs @@ -79,7 +79,7 @@ impl ControlPlane { let server = AggregatedDiscoveryServiceServer::new(self) .max_encoding_message_size(crate::config::max_grpc_message_size()); let server = tonic::transport::Server::builder().add_service(server); - tracing::info!("serving management server on port `{}`", listener.port()); + tracing::info!(port = listener.port(), "publishing xDS server"); Ok(server .serve_with_incoming(listener.into_stream()?) .map_err(From::from)) @@ -98,7 +98,7 @@ impl ControlPlane { let server = AggregatedControlPlaneDiscoveryServiceServer::new(self) .max_encoding_message_size(crate::config::max_grpc_message_size()); let server = tonic::transport::Server::builder().add_service(server); - tracing::info!("serving relay server on port `{}`", listener.port()); + tracing::info!(port = listener.port(), "publishing mDS server"); Ok(server .serve_with_incoming(listener.into_stream()?) .map_err(From::from)) diff --git a/src/cli.rs b/src/cli.rs index b9137a105..bd5c44d4e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -28,18 +28,10 @@ use strum_macros::{Display, EnumString}; pub use self::{ agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy, - qcmp::Qcmp, relay::Relay, + publish::Publish, qcmp::Qcmp, relay::Relay, }; -macro_rules! define_port { - ($port:expr) => { - pub const PORT: u16 = $port; - - pub fn default_port() -> u16 { - PORT - } - }; -} +mod publish; pub mod agent; pub mod generate_config_schema; @@ -49,7 +41,6 @@ pub mod qcmp; pub mod relay; const ETC_CONFIG_PATH: &str = "/etc/quilkin/quilkin.yaml"; -const PORT_ENV_VAR: &str = "QUILKIN_PORT"; #[derive(Debug, clap::Parser)] #[command(next_help_heading = "Administration Options")] @@ -121,6 +112,8 @@ pub struct Cli { pub admin: AdminCli, #[command(flatten)] pub locality: LocalityCli, + #[command(flatten)] + pub publish: publish::Publish, } /// The various log format options @@ -193,12 +186,9 @@ impl Cli { return generator.generate_config_schema(); } Commands::Agent(_) => Admin::Agent(<_>::default()), - Commands::Proxy(proxy) => { + Commands::Proxy(_) => { let ready = components::proxy::Ready { - idle_request_interval: proxy - .idle_request_interval_secs - .map(std::time::Duration::from_secs) - .unwrap_or(admin_server::IDLE_REQUEST_INTERVAL), + idle_request_interval: admin_server::IDLE_REQUEST_INTERVAL, ..Default::default() }; Admin::Proxy(ready) @@ -294,6 +284,8 @@ impl Cli { shutdown_tx.send(crate::ShutdownKind::Normal).ok(); }); + let _publish_task = self.publish.spawn_publishers(&config, &shutdown_rx)?; + match (self.command, mode) { (Commands::Agent(agent), Admin::Agent(ready)) => { agent.run(locality, config, ready, shutdown_rx).await diff --git a/src/cli/agent.rs b/src/cli/agent.rs index b1678b7c0..c0a7c3bcf 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -19,17 +19,12 @@ use std::sync::Arc; use crate::{components::agent, config::Config}; pub use agent::Ready; -define_port!(7600); - /// Runs Quilkin as a relay service that runs a Manager Discovery Service /// (mDS) for accepting cluster and configuration information from xDS /// management services, and exposing it as a single merged xDS service for /// proxy services. -#[derive(clap::Args, Clone, Debug)] +#[derive(clap::Args, Clone, Debug, Default)] pub struct Agent { - /// Port for QCMP service. - #[clap(short, long, env = "QCMP_PORT", default_value_t = PORT)] - pub qcmp_port: u16, /// One or more `quilkin relay` endpoints to push configuration changes to. #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")] pub relay: Vec, @@ -63,19 +58,6 @@ impl clap::ValueEnum for crate::config::AddrKind { } } -impl Default for Agent { - fn default() -> Self { - Self { - qcmp_port: PORT, - relay: <_>::default(), - provider: <_>::default(), - icao_code: <_>::default(), - address_type: None, - ip_kind: None, - } - } -} - impl Agent { #[tracing::instrument(skip_all)] pub async fn run( @@ -85,12 +67,10 @@ impl Agent { ready: Ready, shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let qcmp_socket = crate::net::raw_socket_with_reuse(self.qcmp_port)?; let icao_code = Some(self.icao_code); agent::Agent { locality, - qcmp_socket, icao_code, relay_servers: self.relay, provider: self.provider, diff --git a/src/cli/manage.rs b/src/cli/manage.rs index 32790dbb9..c813bbdfd 100644 --- a/src/cli/manage.rs +++ b/src/cli/manage.rs @@ -17,8 +17,6 @@ use crate::components::manage; pub use manage::Ready; -define_port!(7800); - /// Runs Quilkin as a xDS management server, using `provider` as /// a configuration source. #[derive(clap::Args, Clone, Debug)] @@ -26,9 +24,6 @@ pub struct Manage { /// One or more `quilkin relay` endpoints to push configuration changes to. #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")] pub relay: Vec, - /// The TCP port to listen to, to serve discovery responses. - #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)] - pub port: u16, /// The configuration source for a management server. #[clap(subcommand)] pub provider: crate::config::Providers, @@ -50,13 +45,10 @@ impl Manage { ready: Ready, shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let listener = crate::net::TcpListener::bind(Some(self.port))?; - manage::Manage { locality, provider: self.provider, relay_servers: self.relay, - listener, address_selector: self.address_type.map(|at| crate::config::AddressSelector { name: at, kind: self.ip_kind, diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 677598c81..9a2557a82 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -24,12 +24,8 @@ use crate::ShutdownRx; pub use crate::components::proxy::Ready; -define_port!(7777); - -const QCMP_PORT: u16 = 7600; - /// Run Quilkin as a UDP reverse proxy. -#[derive(clap::Args, Clone, Debug)] +#[derive(clap::Args, Clone, Debug, Default)] pub struct Proxy { /// One or more `quilkin manage` endpoints to listen to for config changes #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER", conflicts_with("to"))] @@ -37,12 +33,6 @@ pub struct Proxy { /// The remote URL or local file path to retrieve the Maxmind database. #[clap(long, env)] pub mmdb: Option, - /// The port to listen on. - #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)] - pub port: u16, - /// The port to listen on. - #[clap(short, long, env = "QUILKIN_QCMP_PORT", default_value_t = QCMP_PORT)] - pub qcmp_port: u16, /// One or more socket addresses to forward packets to. #[clap(long, env = "QUILKIN_DEST")] pub to: Vec, @@ -51,83 +41,6 @@ pub struct Proxy { /// Format is `:` #[clap(long, env = "QUILKIN_DEST_TOKENS", requires("to"))] pub to_tokens: Option, - /// The interval in seconds at which the relay will send a discovery request - /// to an management server after receiving no updates. - #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")] - pub idle_request_interval_secs: Option, - /// Number of worker threads used to process packets. - /// - /// If not specified defaults to number of cpus. Has no effect if XDP is used, - /// as the number of workers is always the same as the NIC queue size. - #[clap(short, long, env = "QUILKIN_WORKERS")] - pub workers: Option, - #[clap(flatten)] - pub xdp_opts: XdpOptions, -} - -/// XDP (eXpress Data Path) options -#[derive(clap::Args, Clone, Debug)] -pub struct XdpOptions { - /// The name of the network interface to bind the XDP socket(s) to. - /// - /// If not specified quilkin will attempt to determine the most appropriate - /// network interface to use. Quilkin will exit with an error if the network - /// interface does not exist, or a suitable default cannot be determined. - #[clap(long = "publish.udp.xdp.network-interface")] - pub network_interface: Option, - /// Forces the use of XDP. - /// - /// If XDP is not available on the chosen NIC, Quilkin exits with an error. - /// If false, io-uring will be used as the fallback implementation. - #[clap(long = "publish.udp.xdp")] - pub force_xdp: bool, - /// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags) - /// - /// If zero copy is not available on the chosen NIC, Quilkin exits with an error - #[clap(long = "publish.udp.xdp.zerocopy")] - pub force_zerocopy: bool, - /// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html) - /// - /// TX checksum offload is an optional feature allowing the data portion of - /// a packet to have its internet checksum calculation offloaded to the NIC, - /// as otherwise this is done in software - #[clap(long = "publish.udp.xdp.tco")] - pub force_tx_checksum_offload: bool, - /// The maximum amount of memory mapped for packet buffers, in bytes - /// - /// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time) - /// per NIC queue, ie 128MiB on a 32 queue NIC - #[clap(long = "publish.udp.xdp.memory-limit")] - pub maximum_memory: Option, -} - -#[allow(clippy::derivable_impls)] -impl Default for XdpOptions { - fn default() -> Self { - Self { - network_interface: None, - force_xdp: false, - force_zerocopy: false, - force_tx_checksum_offload: false, - maximum_memory: None, - } - } -} - -impl Default for Proxy { - fn default() -> Self { - Self { - management_server: <_>::default(), - mmdb: <_>::default(), - port: PORT, - qcmp_port: QCMP_PORT, - to: <_>::default(), - to_tokens: None, - idle_request_interval_secs: None, - workers: None, - xdp_opts: Default::default(), - } - } } impl Proxy { @@ -140,22 +53,7 @@ impl Proxy { initialized: Option>, shutdown_rx: ShutdownRx, ) -> crate::Result<()> { - tracing::info!( - port = self.port, - proxy_id = &*config.id.load(), - "Starting proxy" - ); - - // The number of worker tasks to spawn. Each task gets a dedicated queue to - // consume packets off. - let num_workers = self.workers.unwrap_or_else(|| { - std::num::NonZeroUsize::new(num_cpus::get()) - .expect("num_cpus returned 0, which should be impossible") - }); - - let socket = crate::net::raw_socket_with_reuse(self.port)?; - let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?; - let phoenix = crate::net::TcpListener::bind(Some(self.qcmp_port))?; + tracing::info!(proxy_id = &*config.id.load(), "Starting proxy"); let to_tokens = self .to_tokens @@ -176,12 +74,7 @@ impl Proxy { mmdb: self.mmdb, to: self.to, to_tokens, - num_workers, - socket: Some(socket), - qcmp, - phoenix, notifier: None, - xdp: self.xdp_opts, } .run( crate::components::RunArgs { diff --git a/src/cli/publish.rs b/src/cli/publish.rs new file mode 100644 index 000000000..13ab60a02 --- /dev/null +++ b/src/cli/publish.rs @@ -0,0 +1,443 @@ +use std::{future::Future, sync::Arc}; + +use crate::{components::proxy::SessionPool, config::Config}; + +#[derive(Debug, clap::Parser)] +#[command(next_help_heading = "Publish Options")] +pub struct Publish { + /// Whether to serve mDS requests. + #[arg( + long = "publish.mds", + env = "QUILKIN_PUBLISH_MDS", + default_value_t = false + )] + mds_enabled: bool, + /// The TCP port to listen to serve xDS requests. + #[clap( + long = "publish.mds.port", + env = "QUILKIN_PUBLISH_MDS_PORT", + default_value_t = 7900 + )] + mds_port: u16, + /// Whether to serve UDP requests. + #[arg( + long = "publish.phoenix", + env = "QUILKIN_PUBLISH_PHOENIX", + default_value_t = false + )] + phoenix_enabled: bool, + /// The UDP port to listen for UDP packets. + #[clap( + long = "publish.phoenix.port", + env = "QUILKIN_PUBLISH_PHOENIX_PORT", + default_value_t = 7600 + )] + phoenix_port: u16, + /// Whether to serve UDP requests. + #[arg( + long = "publish.qcmp", + env = "QUILKIN_PUBLISH_QCMP", + default_value_t = false + )] + qcmp_enabled: bool, + /// The UDP port to listen for UDP packets. + #[clap( + long = "publish.qcmp.port", + env = "QUILKIN_PUBLISH_QCMP_PORT", + default_value_t = 7600 + )] + qcmp_port: u16, + /// Whether to serve UDP requests. + #[arg( + long = "publish.udp", + env = "QUILKIN_PUBLISH_UDP", + default_value_t = false + )] + udp_enabled: bool, + /// The UDP port to listen for UDP packets. + #[clap( + long = "publish.udp.port", + env = "QUILKIN_PUBLISH_UDP_PORT", + default_value_t = 7777 + )] + udp_port: u16, + #[clap(flatten)] + pub xdp_opts: XdpOptions, + /// Amount of UDP workers to run. + #[clap(long = "publish.udp.workers", env = "QUILKIN_PUBLISH_UDP_WORKERS", default_value_t = std::num::NonZeroUsize::new(num_cpus::get()).unwrap())] + pub udp_workers: std::num::NonZeroUsize, + /// Whether to serve xDS requests. + #[arg( + long = "publish.xds", + env = "QUILKIN_PUBLISH_XDS", + default_value_t = false + )] + xds_enabled: bool, + /// The TCP port to listen to serve xDS requests. + #[clap( + long = "publish.xds.port", + env = "QUILKIN_PUBLISH_XDS_PORT", + default_value_t = 7800 + )] + xds_port: u16, +} + +impl Default for Publish { + fn default() -> Self { + Self { + mds_enabled: <_>::default(), + mds_port: 7900, + phoenix_enabled: <_>::default(), + phoenix_port: 7600, + qcmp_enabled: <_>::default(), + qcmp_port: 7600, + udp_enabled: <_>::default(), + udp_port: 7777, + udp_workers: std::num::NonZeroUsize::new(num_cpus::get()).unwrap(), + xds_enabled: <_>::default(), + xds_port: 7800, + xdp_opts: <_>::default(), + } + } +} + +impl Publish { + /// Enables the UDP publisher. + pub fn udp(mut self) -> Self { + self.udp_enabled = true; + self + } + + /// Sets the UDP publisher port. + pub fn udp_port(mut self, port: u16) -> Self { + self.udp_port = port; + self + } + + /// Enables the QCMP publisher. + pub fn qcmp(mut self) -> Self { + self.qcmp_enabled = true; + self + } + + /// Sets the UDP publisher port. + pub fn qcmp_port(mut self, port: u16) -> Self { + self.qcmp_port = port; + self + } + + /// Enables the mDS publisher. + pub fn mds(mut self) -> Self { + self.mds_enabled = true; + self + } + + /// Sets the mDS publisher port. + pub fn mds_port(mut self, port: u16) -> Self { + self.mds_port = port; + self + } + + /// Enables the Phoenix publisher. + pub fn phoenix(mut self) -> Self { + self.phoenix_enabled = true; + self + } + + /// Sets the Phoenix publisher port. + pub fn phoenix_port(mut self, port: u16) -> Self { + self.phoenix_port = port; + self + } + + /// Enables the xDS publisher. + pub fn xds(mut self) -> Self { + self.xds_enabled = true; + self + } + + /// Sets the xDS publisher port. + pub fn xds_port(mut self, port: u16) -> Self { + self.xds_port = port; + self + } + + /// The main entrypoint for publishing network servers. When called will + /// spawn any and all enabled publishers, if successful returning a future + /// that can be await to wait on publishers to be cancelled. + pub fn spawn_publishers( + &self, + config: &Arc, + shutdown_rx: &crate::ShutdownRx, + ) -> crate::Result>> { + let mds_task = self.publish_mds(config)?; + let phoenix_task = self.publish_phoenix(config, shutdown_rx)?; + let qcmp_task = self.publish_qcmp(shutdown_rx)?; + let (udp_task, finalizer) = self.publish_udp(config)?; + let xds_task = self.publish_xds(config)?; + + let shutdown_rx = shutdown_rx.clone(); + Ok(async move { + let result = tokio::select! { + result = mds_task => result, + result = phoenix_task => result, + result = qcmp_task => result, + result = udp_task => result, + result = xds_task => result, + }; + + if let Some(finalizer) = finalizer { + (finalizer)(shutdown_rx.clone()); + } + + result + }) + } + + /// Spawns an QCMP server if enabled, otherwise returns a future which never completes. + fn publish_phoenix( + &self, + config: &Arc, + shutdown_rx: &crate::ShutdownRx, + ) -> crate::Result>> { + if self.phoenix_enabled { + let phoenix = crate::net::TcpListener::bind(Some(self.phoenix_port))?; + crate::net::phoenix::spawn( + phoenix, + config.clone(), + shutdown_rx.clone(), + crate::net::phoenix::Phoenix::new(crate::codec::qcmp::QcmpMeasurement::new()?), + )? + } + + Ok(std::future::pending()) + } + + /// Spawns an QCMP server if enabled, otherwise returns a future which never completes. + fn publish_qcmp( + &self, + shutdown_rx: &crate::ShutdownRx, + ) -> crate::Result>> { + Ok(if self.qcmp_enabled { + let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?; + either::Right(crate::codec::qcmp::spawn(qcmp, shutdown_rx.clone())) + } else { + either::Left(std::future::pending()) + }) + } + + /// Spawns an xDS server if enabled, otherwise returns a future which never completes. + fn publish_mds( + &self, + config: &Arc, + ) -> crate::Result>> { + if !self.mds_enabled { + return Ok(either::Left(std::future::pending())); + } + + use futures::TryFutureExt as _; + + let listener = crate::net::TcpListener::bind(Some(self.mds_port))?; + + Ok(either::Right( + tokio::spawn( + crate::net::xds::server::ControlPlane::from_arc( + config.clone(), + crate::components::admin::IDLE_REQUEST_INTERVAL, + ) + .relay_server(listener)?, + ) + .map_err(From::from) + .and_then(std::future::ready), + )) + } + + /// Spawns an xDS server if enabled, otherwise returns a future which never completes. + fn publish_xds( + &self, + config: &Arc, + ) -> crate::Result>> { + if !self.xds_enabled { + return Ok(either::Left(std::future::pending())); + } + + use futures::TryFutureExt as _; + + let listener = crate::net::TcpListener::bind(Some(self.xds_port))?; + + Ok(either::Right( + tokio::spawn( + crate::net::xds::server::ControlPlane::from_arc( + config.clone(), + crate::components::admin::IDLE_REQUEST_INTERVAL, + ) + .management_server(listener)?, + ) + .map_err(From::from) + .and_then(std::future::ready), + )) + } + + #[allow(clippy::type_complexity)] + pub fn publish_udp( + &self, + config: &Arc, + ) -> eyre::Result<( + impl Future>, + Option>, + )> { + if !self.udp_enabled { + return Ok((either::Left(std::future::pending()), None)); + } + + #[cfg(target_os = "linux")] + { + match self.spawn_xdp(config.clone(), self.xdp.force_xdp) { + Ok(xdp) => return Ok((either::Left(std::future::pending()), Some(xdp))), + Err(err) => { + if self.xdp.force_xdp { + return Err(err); + } + + tracing::warn!( + ?err, + "failed to spawn XDP I/O loop, falling back to io-uring" + ); + } + } + } + + self.spawn_user_space_router(config.clone()) + .map(|(fut, func)| (either::Right(fut), Some(func))) + } + + /// Launches the user space implementation of the packet router using + /// sockets. This implementation uses a pool of buffers and sockets to + /// manage UDP sessions and sockets. On Linux this will use io-uring, where + /// as it will use epoll interfaces on non-Linux platforms. + #[allow(clippy::type_complexity)] + pub fn spawn_user_space_router( + &self, + config: Arc, + ) -> crate::Result<( + impl Future>, + Box, + )> { + let socket = crate::net::raw_socket_with_reuse(self.udp_port)?; + let workers = self.udp_workers.get(); + let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024)); + + let mut worker_sends = Vec::with_capacity(workers); + let mut session_sends = Vec::with_capacity(workers); + for _ in 0..workers { + let queue = crate::net::queue(15)?; + session_sends.push(queue.0.clone()); + worker_sends.push(queue); + } + + let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone()); + + Ok(( + crate::components::proxy::packet_router::spawn_receivers( + config, + socket, + worker_sends, + sessions.clone(), + buffer_pool, + ), + Box::from(move |shutdown_rx: crate::ShutdownRx| { + sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal); + }), + )) + } + + #[cfg(target_os = "linux")] + fn spawn_xdp( + &mut self, + config: Arc, + force_xdp: bool, + ) -> eyre::Result> { + use crate::net::xdp; + use eyre::Context as _; + + // TODO: remove this once it's been more stabilized + if !force_xdp { + eyre::bail!("XDP currently disabled by default"); + } + + let Some(external_port) = self.socket.as_ref().and_then(|s| { + s.local_addr() + .ok() + .and_then(|la| la.as_socket().map(|sa| sa.port())) + }) else { + eyre::bail!("unable to determine port"); + }; + + let workers = xdp::setup_xdp_io(xdp::XdpConfig { + nic: self + .xdp + .network_interface + .as_deref() + .map_or(xdp::NicConfig::Default, xdp::NicConfig::Name), + external_port, + maximum_packet_memory: self.xdp.maximum_memory, + require_zero_copy: self.xdp.force_zerocopy, + require_tx_checksum: self.xdp.force_tx_checksum_offload, + }) + .context("failed to setup XDP")?; + + let io_loop = xdp::spawn(workers, config).context("failed to spawn XDP I/O loop")?; + Ok(Box::new(move |srx: crate::ShutdownRx| { + io_loop.shutdown(*srx.borrow() == crate::ShutdownKind::Normal); + })) + } +} + +/// XDP (eXpress Data Path) options +#[derive(clap::Args, Clone, Debug)] +pub struct XdpOptions { + /// The name of the network interface to bind the XDP socket(s) to. + /// + /// If not specified quilkin will attempt to determine the most appropriate + /// network interface to use. Quilkin will exit with an error if the network + /// interface does not exist, or a suitable default cannot be determined. + #[clap(long = "publish.udp.xdp.network-interface")] + pub network_interface: Option, + /// Forces the use of XDP. + /// + /// If XDP is not available on the chosen NIC, Quilkin exits with an error. + /// If false, io-uring will be used as the fallback implementation. + #[clap(long = "publish.udp.xdp")] + pub force_xdp: bool, + /// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags) + /// + /// If zero copy is not available on the chosen NIC, Quilkin exits with an error + #[clap(long = "publish.udp.xdp.zerocopy")] + pub force_zerocopy: bool, + /// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html) + /// + /// TX checksum offload is an optional feature allowing the data portion of + /// a packet to have its internet checksum calculation offloaded to the NIC, + /// as otherwise this is done in software + #[clap(long = "publish.udp.xdp.tco")] + pub force_tx_checksum_offload: bool, + /// The maximum amount of memory mapped for packet buffers, in bytes + /// + /// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time) + /// per NIC queue, ie 128MiB on a 32 queue NIC + #[clap(long = "publish.udp.xdp.memory-limit")] + pub maximum_memory: Option, +} + +#[allow(clippy::derivable_impls)] +impl Default for XdpOptions { + fn default() -> Self { + Self { + network_interface: None, + force_xdp: false, + force_zerocopy: false, + force_tx_checksum_offload: false, + maximum_memory: None, + } + } +} diff --git a/src/cli/relay.rs b/src/cli/relay.rs index b146c7a34..74c183c4b 100644 --- a/src/cli/relay.rs +++ b/src/cli/relay.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use crate::{ components::relay, config::{Config, Providers}, - net::TcpListener, }; pub use relay::Ready; @@ -29,14 +28,8 @@ pub const PORT: u16 = 7900; /// (mDS) for accepting cluster and configuration information from xDS /// management services, and exposing it as a single merged xDS service for /// proxy services. -#[derive(clap::Args, Clone, Debug)] +#[derive(clap::Args, Clone, Debug, Default)] pub struct Relay { - /// Port for mDS service. - #[clap(short, long, env = "QUILKIN_MDS_PORT", default_value_t = PORT)] - pub mds_port: u16, - /// Port for xDS management_server service - #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = super::manage::PORT)] - pub xds_port: u16, /// The interval in seconds at which the relay will send a discovery request /// to an management server after receiving no updates. #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")] @@ -45,17 +38,6 @@ pub struct Relay { pub providers: Option, } -impl Default for Relay { - fn default() -> Self { - Self { - mds_port: PORT, - xds_port: super::manage::PORT, - idle_request_interval_secs: None, - providers: None, - } - } -} - impl Relay { pub async fn run( self, @@ -63,12 +45,7 @@ impl Relay { ready: Ready, shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let xds_listener = TcpListener::bind(Some(self.xds_port))?; - let mds_listener = TcpListener::bind(Some(self.mds_port))?; - relay::Relay { - xds_listener, - mds_listener, provider: self.providers, } .run(crate::components::RunArgs { diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 606728156..c9232c9a7 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -192,7 +192,11 @@ impl Measurement for QcmpMeasurement { } } -pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> { +pub fn spawn( + socket: socket2::Socket, + mut shutdown_rx: crate::ShutdownRx, +) -> impl std::future::Future> { + use futures::TryFutureExt as _; use tracing::{instrument::WithSubscriber as _, Instrument as _}; let port = crate::net::socket_port(&socket); @@ -258,9 +262,7 @@ pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> cra } .instrument(tracing::debug_span!("qcmp")) .with_current_subscriber(), - ); - - Ok(()) + ).map_err(From::from) } /// The set of possible QCMP commands. @@ -676,7 +678,7 @@ mod tests { let addr = socket.local_addr().unwrap().as_socket().unwrap(); let (_tx, rx) = crate::make_shutdown_channel(Default::default()); - spawn(socket, rx).unwrap(); + let _task = spawn(socket, rx); let delay = Duration::from_millis(50); let node = QcmpMeasurement::with_artificial_delay(delay).unwrap(); diff --git a/src/components/agent.rs b/src/components/agent.rs index 634a113b2..fb9dd5801 100644 --- a/src/components/agent.rs +++ b/src/components/agent.rs @@ -40,7 +40,6 @@ impl Ready { pub struct Agent { pub locality: Option, - pub qcmp_socket: socket2::Socket, pub icao_code: Option, pub relay_servers: Vec, pub provider: Option, @@ -58,15 +57,11 @@ impl Agent { }: RunArgs, ) -> crate::Result<()> { { - let crate::config::DatacenterConfig::Agent { - icao_code, - qcmp_port, - } = &config.datacenter + let crate::config::DatacenterConfig::Agent { icao_code, .. } = &config.datacenter else { unreachable!("this should be an agent config"); }; - qcmp_port.store(crate::net::socket_port(&self.qcmp_socket).into()); icao_code.store(self.icao_code.unwrap_or_default().into()); } @@ -102,7 +97,6 @@ impl Agent { None }; - crate::codec::qcmp::spawn(self.qcmp_socket, shutdown_rx.clone())?; shutdown_rx.changed().await.map_err(From::from) } } diff --git a/src/components/manage.rs b/src/components/manage.rs index b9811b571..386fb5fb2 100644 --- a/src/components/manage.rs +++ b/src/components/manage.rs @@ -25,7 +25,6 @@ pub struct Manage { pub locality: Option, pub relay_servers: Vec, pub provider: Providers, - pub listener: crate::net::TcpListener, pub address_selector: Option, } @@ -73,19 +72,7 @@ impl Manage { None }; - use futures::TryFutureExt as _; - let server_task = tokio::spawn( - crate::net::xds::server::ControlPlane::from_arc( - config, - crate::components::admin::IDLE_REQUEST_INTERVAL, - ) - .management_server(self.listener)?, - ) - .map_err(From::from) - .and_then(std::future::ready); - tokio::select! { - result = server_task => result, result = provider_task => result?, result = shutdown_rx.changed() => result.map_err(From::from), } diff --git a/src/components/proxy.rs b/src/components/proxy.rs index f17903495..e44848a60 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -62,42 +62,18 @@ pub struct ToTokens { pub length: usize, } +#[derive(Default)] pub struct Proxy { - pub num_workers: std::num::NonZeroUsize, pub mmdb: Option, pub management_servers: Vec, pub to: Vec, pub to_tokens: Option, - pub socket: Option, - pub qcmp: socket2::Socket, - pub phoenix: crate::net::TcpListener, pub notifier: Option>, - pub xdp: crate::cli::proxy::XdpOptions, -} - -impl Default for Proxy { - fn default() -> Self { - let qcmp = crate::net::raw_socket_with_reuse(0).unwrap(); - let phoenix = crate::net::TcpListener::bind(Some(crate::net::socket_port(&qcmp))).unwrap(); - - Self { - num_workers: std::num::NonZeroUsize::new(1).unwrap(), - mmdb: None, - management_servers: Vec::new(), - to: Vec::new(), - to_tokens: None, - socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()), - qcmp, - phoenix, - notifier: None, - xdp: Default::default(), - } - } } impl Proxy { pub async fn run( - mut self, + self, RunArgs { config, ready, @@ -270,15 +246,6 @@ impl Proxy { .expect("failed to spawn proxy-subscription thread"); } - let router_shutdown = self.spawn_packet_router(config.clone()).await?; - crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone())?; - crate::net::phoenix::spawn( - self.phoenix, - config.clone(), - shutdown_rx.clone(), - crate::net::phoenix::Phoenix::new(crate::codec::qcmp::QcmpMeasurement::new()?), - )?; - tracing::info!("Quilkin is ready"); if let Some(initialized) = initialized { let _ = initialized.send(()); @@ -289,110 +256,6 @@ impl Proxy { .await .map_err(|error| eyre::eyre!(error))?; - (router_shutdown)(shutdown_rx); - Ok(()) } - - pub async fn spawn_packet_router( - &mut self, - config: Arc, - ) -> eyre::Result> { - #[cfg(target_os = "linux")] - { - match self.spawn_xdp(config.clone(), self.xdp.force_xdp) { - Ok(xdp) => { - return Ok(xdp); - } - Err(err) => { - if self.xdp.force_xdp { - return Err(err); - } - - tracing::warn!( - ?err, - "failed to spawn XDP I/O loop, falling back to io-uring" - ); - } - } - } - - self.spawn_user_space_router(config).await - } - - /// Launches the user space implementation of the packet router using - /// sockets. This implementation uses a pool of buffers and sockets to - /// manage UDP sessions and sockets. On Linux this will use io-uring, where - /// as it will use epoll interfaces on non-Linux platforms. - pub async fn spawn_user_space_router( - &mut self, - config: Arc, - ) -> eyre::Result> { - let workers = self.num_workers.get(); - let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024)); - - let mut worker_sends = Vec::with_capacity(workers); - let mut session_sends = Vec::with_capacity(workers); - for _ in 0..workers { - let queue = crate::net::queue(15)?; - session_sends.push(queue.0.clone()); - worker_sends.push(queue); - } - - let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone()); - - packet_router::spawn_receivers( - config, - self.socket.take().unwrap(), - worker_sends, - &sessions, - buffer_pool, - ) - .await?; - - Ok(Box::new(move |shutdown_rx: crate::ShutdownRx| { - sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal); - })) - } - - #[cfg(target_os = "linux")] - fn spawn_xdp( - &mut self, - config: Arc, - force_xdp: bool, - ) -> eyre::Result> { - use crate::net::xdp; - use eyre::Context as _; - - // TODO: remove this once it's been more stabilized - if !force_xdp { - eyre::bail!("XDP currently disabled by default"); - } - - let Some(external_port) = self.socket.as_ref().and_then(|s| { - s.local_addr() - .ok() - .and_then(|la| la.as_socket().map(|sa| sa.port())) - }) else { - eyre::bail!("unable to determine port"); - }; - - let workers = xdp::setup_xdp_io(xdp::XdpConfig { - nic: self - .xdp - .network_interface - .as_deref() - .map_or(xdp::NicConfig::Default, xdp::NicConfig::Name), - external_port, - maximum_packet_memory: self.xdp.maximum_memory, - require_zero_copy: self.xdp.force_zerocopy, - require_tx_checksum: self.xdp.force_tx_checksum_offload, - }) - .context("failed to setup XDP")?; - - let io_loop = xdp::spawn(workers, config).context("failed to spawn XDP I/O loop")?; - Ok(Box::new(move |srx: crate::ShutdownRx| { - io_loop.shutdown(*srx.borrow() == crate::ShutdownKind::Normal); - })) - } } diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index fcf72547e..a10f87d70 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -161,7 +161,7 @@ pub async fn spawn_receivers( config: Arc, socket: socket2::Socket, worker_sends: Vec, - sessions: &Arc, + sessions: Arc, buffer_pool: Arc, ) -> crate::Result<()> { let port = crate::net::socket_port(&socket); diff --git a/src/components/relay.rs b/src/components/relay.rs index ff11bbca0..2b0a8a59a 100644 --- a/src/components/relay.rs +++ b/src/components/relay.rs @@ -15,7 +15,7 @@ */ use super::RunArgs; -use crate::{config::Providers, net::TcpListener}; +use crate::config::Providers; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -44,8 +44,6 @@ impl Ready { } pub struct Relay { - pub xds_listener: TcpListener, - pub mds_listener: TcpListener, pub provider: Option, } @@ -59,15 +57,6 @@ impl Relay { mut shutdown_rx, }: RunArgs, ) -> crate::Result<()> { - use crate::net::xds::server::ControlPlane; - - let xds_server = ControlPlane::from_arc(config.clone(), ready.idle_request_interval) - .management_server(self.xds_listener)?; - let mds_server = tokio::spawn( - ControlPlane::from_arc(config.clone(), ready.idle_request_interval) - .relay_server(self.mds_listener)?, - ); - let _provider_task = self.provider.map(|provider| { let config = config.clone(); let provider_is_healthy = ready.provider_is_healthy.clone(); @@ -135,12 +124,6 @@ impl Relay { }); tokio::select! { - result = xds_server => { - result - } - result = mds_server => { - result? - } result = shutdown_rx.changed() => result.map_err(From::from), } } diff --git a/src/net/phoenix.rs b/src/net/phoenix.rs index 9bbf0053f..eb7e1ca09 100644 --- a/src/net/phoenix.rs +++ b/src/net/phoenix.rs @@ -848,56 +848,59 @@ mod tests { }, ); - let (_tx, rx) = crate::make_shutdown_channel(Default::default()); - let socket = raw_socket_with_reuse(qcmp_port).unwrap(); - crate::codec::qcmp::spawn(socket, rx.clone()).unwrap(); - tokio::time::sleep(Duration::from_millis(150)).await; - - let measurement = - crate::codec::qcmp::QcmpMeasurement::with_artificial_delay(Duration::from_millis(50)) - .unwrap(); - - let phoenix = Phoenix::builder(measurement) - .interval_range(Duration::from_millis(10)..Duration::from_millis(15)) - .build(); - - super::spawn(qcmp_listener, config.clone(), rx, phoenix).unwrap(); - tokio::time::sleep(Duration::from_millis(150)).await; - - let client = - hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) - .build_http::>(); - use http_body_util::BodyExt; - for _ in 0..10 { - let resp = tokio::time::timeout( - Duration::from_millis(100), - client - .get(format!("http://localhost:{qcmp_port}/").parse().unwrap()) - .await - .unwrap() - .into_body() - .collect(), + tokio::spawn(async move { + let (_tx, rx) = crate::make_shutdown_channel(Default::default()); + let socket = raw_socket_with_reuse(qcmp_port).unwrap(); + let _qcmp_task = crate::codec::qcmp::spawn(socket, rx.clone()); + tokio::time::sleep(Duration::from_millis(150)).await; + + let measurement = crate::codec::qcmp::QcmpMeasurement::with_artificial_delay( + Duration::from_millis(50), ) - .await - .unwrap() - .unwrap() - .to_bytes(); - - let map = serde_json::from_slice::>(&resp).unwrap(); + .unwrap(); - let coords = Coordinates { - x: std::time::Duration::from_millis(50).as_nanos() as f64 / 2.0, - y: std::time::Duration::from_millis(1).as_nanos() as f64 / 2.0, - }; + let phoenix = Phoenix::builder(measurement) + .interval_range(Duration::from_millis(10)..Duration::from_millis(15)) + .build(); + + super::spawn(qcmp_listener, config.clone(), rx, phoenix).unwrap(); + tokio::time::sleep(Duration::from_millis(150)).await; + + let client = + hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .build_http::>(); + use http_body_util::BodyExt; + for _ in 0..10 { + let resp = tokio::time::timeout( + Duration::from_millis(100), + client + .get(format!("http://localhost:{qcmp_port}/").parse().unwrap()) + .await + .unwrap() + .into_body() + .collect(), + ) + .await + .unwrap() + .unwrap() + .to_bytes(); + + let map = serde_json::from_slice::>(&resp).unwrap(); + + let coords = Coordinates { + x: std::time::Duration::from_millis(50).as_nanos() as f64 / 2.0, + y: std::time::Duration::from_millis(1).as_nanos() as f64 / 2.0, + }; - let min = Coordinates::ORIGIN.distance_to(&coords); - let max = min * 3.0; - let distance = map[icao_code.as_ref()].as_f64().unwrap(); + let min = Coordinates::ORIGIN.distance_to(&coords); + let max = min * 3.0; + let distance = map[icao_code.as_ref()].as_f64().unwrap(); - assert!( - distance > min && distance < max, - "expected distance {distance} to be > {min} and < {max}", - ); - } + assert!( + distance > min && distance < max, + "expected distance {distance} to be > {min} and < {max}", + ); + } + }); } } diff --git a/src/test.rs b/src/test.rs index 6841831d0..dc5359e61 100644 --- a/src/test.rs +++ b/src/test.rs @@ -296,23 +296,24 @@ impl TestHelper { mode.server(config.clone(), address); } - let server = server.unwrap_or_else(|| { - let qcmp = crate::net::raw_socket_with_reuse(0).unwrap(); - let phoenix = crate::net::TcpListener::bind(None).unwrap(); - - crate::components::proxy::Proxy { - num_workers: std::num::NonZeroUsize::new(1).unwrap(), - socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()), - qcmp, - phoenix, - ..Default::default() - } + let port = { + let socket = crate::net::raw_socket_with_reuse(0).unwrap(); + crate::net::socket_port(&socket) + }; + + let publisher = crate::cli::Publish::default().udp().udp_port(port); + tokio::spawn(publisher.spawn_publishers(&config, &shutdown_rx).unwrap()); + + let server = server.unwrap_or_else(|| crate::components::proxy::Proxy { + mmdb: None, + management_servers: Vec::new(), + to: Vec::new(), + to_tokens: None, + notifier: None, }); let (prox_tx, prox_rx) = tokio::sync::oneshot::channel(); - let port = crate::net::socket_port(server.socket.as_ref().unwrap()); - tokio::spawn(async move { server .run( diff --git a/tests/capture.rs b/tests/capture.rs index df3f13284..96532d48f 100644 --- a/tests/capture.rs +++ b/tests/capture.rs @@ -31,68 +31,70 @@ use quilkin::{ #[cfg_attr(target_os = "macos", ignore)] async fn token_router() { let mut t = TestHelper::default(); - let mut echo = t.run_echo_server(AddressType::Random).await; - quilkin::test::map_to_localhost(&mut echo); + tokio::spawn(async move { + let mut echo = t.run_echo_server(AddressType::Random).await; + quilkin::test::map_to_localhost(&mut echo); - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([ - Filter { - name: Capture::factory().name().into(), - label: None, - config: serde_json::from_value(serde_json::json!({ - "regex": { - "pattern": ".{3}$" - } - })) - .unwrap(), - }, - Filter { - name: TokenRouter::factory().name().into(), - label: None, - config: None, - }, - ]) - .map(std::sync::Arc::new) - .unwrap(), - ); + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([ + Filter { + name: Capture::factory().name().into(), + label: None, + config: serde_json::from_value(serde_json::json!({ + "regex": { + "pattern": ".{3}$" + } + })) + .unwrap(), + }, + Filter { + name: TokenRouter::factory().name().into(), + label: None, + config: None, + }, + ]) + .map(std::sync::Arc::new) + .unwrap(), + ); - server_config.clusters.modify(|clusters| { - clusters.insert_default( - [Endpoint::with_metadata( - echo.clone(), - serde_json::from_value::>(serde_json::json!({ - "quilkin.dev": { - "tokens": ["YWJj"] - } - })) - .unwrap(), - )] - .into(), - ) - }); + server_config.clusters.modify(|clusters| { + clusters.insert_default( + [Endpoint::with_metadata( + echo.clone(), + serde_json::from_value::>(serde_json::json!({ + "quilkin.dev": { + "tokens": ["YWJj"] + } + })) + .unwrap(), + )] + .into(), + ) + }); - let server_port = t.run_server(server_config, None, None).await; + let server_port = t.run_server(server_config, None, None).await; - // valid packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + // valid packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); - let msg = b"helloabc"; - socket.send_to(msg, &local_addr).await.unwrap(); + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); + let msg = b"helloabc"; + socket.send_to(msg, &local_addr).await.unwrap(); - assert_eq!( - "helloabc", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); + assert_eq!( + "helloabc", + timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); - // send an invalid packet - let msg = b"helloxyz"; - socket.send_to(msg, &local_addr).await.unwrap(); + // send an invalid packet + let msg = b"helloxyz"; + socket.send_to(msg, &local_addr).await.unwrap(); - let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; - assert!(result.is_err(), "should not have received a packet"); + let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; + assert!(result.is_err(), "should not have received a packet"); + }); } diff --git a/tests/compress.rs b/tests/compress.rs index 0d0d20160..a7751d279 100644 --- a/tests/compress.rs +++ b/tests/compress.rs @@ -27,59 +27,61 @@ use quilkin::{ #[cfg_attr(target_os = "macos", ignore)] async fn client_and_server() { let mut t = TestHelper::default(); - let echo = t.run_echo_server(AddressType::Random).await; + tokio::spawn(async move { + let echo = t.run_echo_server(AddressType::Random).await; - // create server configuration as - let yaml = " + // create server configuration as + let yaml = " on_read: DECOMPRESS on_write: COMPRESS "; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: Compress::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - // Run server proxy. - let server_port = t.run_server(server_config, None, None).await; + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: Compress::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml).unwrap(), + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + // Run server proxy. + let server_port = t.run_server(server_config, None, None).await; - // create a local client - let yaml = " + // create a local client + let yaml = " on_read: COMPRESS on_write: DECOMPRESS "; - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - client_config.clusters.modify(|clusters| { - clusters.insert_default([(std::net::Ipv6Addr::LOCALHOST, server_port).into()].into()) - }); - client_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: Compress::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - // Run client proxy. - let client_port = t.run_server(client_config, None, None).await; + let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + client_config.clusters.modify(|clusters| { + clusters.insert_default([(std::net::Ipv6Addr::LOCALHOST, server_port).into()].into()) + }); + client_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: Compress::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml).unwrap(), + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + // Run client proxy. + let client_port = t.run_server(client_config, None, None).await; - // let's send the packet - let (mut rx, tx) = t.open_socket_and_recv_multiple_packets().await; + // let's send the packet + let (mut rx, tx) = t.open_socket_and_recv_multiple_packets().await; - tx.send_to(b"hello", (std::net::Ipv6Addr::LOCALHOST, client_port)) - .await - .unwrap(); - let expected = timeout(Duration::from_millis(500), rx.recv()) - .await - .expect("should have received a packet") - .unwrap(); - assert_eq!("hello", expected); + tx.send_to(b"hello", (std::net::Ipv6Addr::LOCALHOST, client_port)) + .await + .unwrap(); + let expected = timeout(Duration::from_millis(500), rx.recv()) + .await + .expect("should have received a packet") + .unwrap(); + assert_eq!("hello", expected); + }); } diff --git a/tests/concatenate.rs b/tests/concatenate.rs index 258ab18e2..51e4e7378 100644 --- a/tests/concatenate.rs +++ b/tests/concatenate.rs @@ -33,34 +33,37 @@ async fn concatenate() { on_read: APPEND bytes: YWJj #abc "; - let echo = t.run_echo_server(AddressType::Random).await; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: Concatenate::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - let server_port = t.run_server(server_config, None, None).await; + tokio::spawn(async move { + let echo = t.run_echo_server(AddressType::Random).await; - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: Concatenate::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml).unwrap(), + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + let server_port = t.run_server(server_config, None, None).await; - let local_addr = (Ipv4Addr::LOCALHOST, server_port); - socket.send_to(b"hello", &local_addr).await.unwrap(); + // let's send the packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - assert_eq!( - "helloabc", - timeout(Duration::from_millis(250), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); + let local_addr = (Ipv4Addr::LOCALHOST, server_port); + socket.send_to(b"hello", &local_addr).await.unwrap(); + + assert_eq!( + "helloabc", + timeout(Duration::from_millis(250), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + }); } diff --git a/tests/filter_order.rs b/tests/filter_order.rs index b56fec61f..616b16db3 100644 --- a/tests/filter_order.rs +++ b/tests/filter_order.rs @@ -45,62 +45,64 @@ on_read: COMPRESS on_write: DECOMPRESS "; - let mut echo = t - .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| { - assert!( - from_utf8(bytes).is_err(), - "Should be compressed, and therefore unable to be turned into a string" - ); - }) - .await; - - quilkin::test::map_to_localhost(&mut echo); - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([ - Filter { - name: Concatenate::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml_concat_read).unwrap(), - }, - Filter { - name: Concatenate::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml_concat_write).unwrap(), - }, - Filter { - name: Compress::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml_compress).unwrap(), - }, - ]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - let server_port = t.run_server(server_config, None, None).await; - - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - let buf = b"hello".repeat(98); - - let local_addr = (Ipv4Addr::LOCALHOST, server_port); - socket.send_to(&buf, &local_addr).await.unwrap(); - - let received = timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap(); - - let hellos = received - .strip_suffix("xyzabc") - .expect("expected appended data"); - - assert_eq!(&buf, hellos.as_bytes()); + tokio::spawn(async move { + let mut echo = t + .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| { + assert!( + from_utf8(bytes).is_err(), + "Should be compressed, and therefore unable to be turned into a string" + ); + }) + .await; + + quilkin::test::map_to_localhost(&mut echo); + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([ + Filter { + name: Concatenate::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml_concat_read).unwrap(), + }, + Filter { + name: Concatenate::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml_concat_write).unwrap(), + }, + Filter { + name: Compress::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml_compress).unwrap(), + }, + ]) + .map(std::sync::Arc::new) + .unwrap(), + ); + + let server_port = t.run_server(server_config, None, None).await; + + // let's send the packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + let buf = b"hello".repeat(98); + + let local_addr = (Ipv4Addr::LOCALHOST, server_port); + socket.send_to(&buf, &local_addr).await.unwrap(); + + let received = timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap(); + + let hellos = received + .strip_suffix("xyzabc") + .expect("expected appended data"); + + assert_eq!(&buf, hellos.as_bytes()); + }); } #[tokio::test] @@ -123,40 +125,42 @@ async fn multiple_mutations() { let filters: Vec = serde_yaml::from_str(filters).unwrap(); - let mut t = TestHelper::default(); - let mut echo = t - .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| { - assert_eq!(b"hello", bytes); - }) - .await; - - quilkin::test::map_to_localhost(&mut echo); - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create(filters) - .map(std::sync::Arc::new) - .unwrap(), - ); - - let server_port = t.run_server(server_config, None, None).await; - - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - let local_addr = (Ipv4Addr::LOCALHOST, server_port); - - socket - .send_to(b"helloxxxxxxxxxxxxxxxx6", &local_addr) - .await - .unwrap(); - - let received = timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap(); - - assert_eq!(b"hello", received.as_bytes()); + tokio::spawn(async move { + let mut t = TestHelper::default(); + let mut echo = t + .run_echo_server_with_tap(AddressType::Random, move |_, bytes, _| { + assert_eq!(b"hello", bytes); + }) + .await; + + quilkin::test::map_to_localhost(&mut echo); + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + server_config.filters.store( + quilkin::filters::FilterChain::try_create(filters) + .map(std::sync::Arc::new) + .unwrap(), + ); + + let server_port = t.run_server(server_config, None, None).await; + + // let's send the packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + let local_addr = (Ipv4Addr::LOCALHOST, server_port); + + socket + .send_to(b"helloxxxxxxxxxxxxxxxx6", &local_addr) + .await + .unwrap(); + + let received = timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap(); + + assert_eq!(b"hello", received.as_bytes()); + }); } diff --git a/tests/filters.rs b/tests/filters.rs index 26bd9a2ed..bd85effec 100644 --- a/tests/filters.rs +++ b/tests/filters.rs @@ -33,77 +33,79 @@ async fn test_filter() { let mut t = TestHelper::default(); load_test_filters(); - // create an echo server as an endpoint. - let echo = t.run_echo_server(AddressType::Random).await; - - // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: "TestFilter".to_string(), - label: None, - config: None, - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - - let server_port = t.run_server(server_config, None, None).await; - - // create a local client - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - client_config.clusters.modify(|clusters| { - clusters.insert_default( - [Endpoint::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), - )] - .into(), - ) + tokio::spawn(async move { + // create an echo server as an endpoint. + let echo = t.run_echo_server(AddressType::Random).await; + + // create server configuration + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: "TestFilter".to_string(), + label: None, + config: None, + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + + let server_port = t.run_server(server_config, None, None).await; + + // create a local client + let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + client_config.clusters.modify(|clusters| { + clusters.insert_default( + [Endpoint::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), + )] + .into(), + ) + }); + client_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: "TestFilter".to_string(), + label: None, + config: None, + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + + // Run client proxy. + let client_port = t.run_server(client_config, None, None).await; + + // let's send the packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + // game_client + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port); + tracing::info!(address = %local_addr, "Sending hello"); + socket.send_to(b"hello", &local_addr).await.unwrap(); + + let result = timeout(Duration::from_secs(5), recv_chan.recv()) + .await + .unwrap() + .unwrap(); + // since we don't know the ephemeral ip addresses in use, we'll search for + // substrings for the results we expect that the TestFilter will inject in + // the round-tripped packets. + assert_eq!( + 2, + result.matches("odr").count(), + "Should be 2 read calls in {}", + result + ); + assert_eq!( + 2, + result.matches("our").count(), + "Should be 2 write calls in {}", + result + ); }); - client_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: "TestFilter".to_string(), - label: None, - config: None, - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - // Run client proxy. - let client_port = t.run_server(client_config, None, None).await; - - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - // game_client - let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port); - tracing::info!(address = %local_addr, "Sending hello"); - socket.send_to(b"hello", &local_addr).await.unwrap(); - - let result = timeout(Duration::from_secs(5), recv_chan.recv()) - .await - .unwrap() - .unwrap(); - // since we don't know the ephemeral ip addresses in use, we'll search for - // substrings for the results we expect that the TestFilter will inject in - // the round-tripped packets. - assert_eq!( - 2, - result.matches("odr").count(), - "Should be 2 read calls in {}", - result - ); - assert_eq!( - 2, - result.matches("our").count(), - "Should be 2 write calls in {}", - result - ); } #[tokio::test] @@ -114,60 +116,62 @@ async fn debug_filter() { // handy for grabbing the configuration name let factory = Debug::factory(); - // create an echo server as an endpoint. - let echo = t.run_echo_server(AddressType::Random).await; - - tracing::trace!(%echo, "running echo server"); - // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: factory.name().into(), - label: None, - config: Some(serde_json::json!({ "id": "server", })), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - let server_port = t.run_server(server_config, None, None).await; - - // create a local client - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - client_config.clusters.modify(|clusters| { - clusters.insert_default( - [Endpoint::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), - )] - .into(), - ) + tokio::spawn(async move { + // create an echo server as an endpoint. + let echo = t.run_echo_server(AddressType::Random).await; + + tracing::trace!(%echo, "running echo server"); + // create server configuration + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: factory.name().into(), + label: None, + config: Some(serde_json::json!({ "id": "server", })), + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + + let server_port = t.run_server(server_config, None, None).await; + + // create a local client + let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + client_config.clusters.modify(|clusters| { + clusters.insert_default( + [Endpoint::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port).into(), + )] + .into(), + ) + }); + client_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: factory.name().into(), + label: None, + config: Some(serde_json::json!({ "id": "client" })), + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + let client_port = t.run_server(client_config, None, None).await; + + // let's send the packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + // game client + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port); + tracing::info!(address = %local_addr, "Sending hello"); + socket.send_to(b"hello", &local_addr).await.unwrap(); + + // since the debug filter doesn't change the data, it should be exactly the same + let value = timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!("hello", value); }); - client_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: factory.name().into(), - label: None, - config: Some(serde_json::json!({ "id": "client" })), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - let client_port = t.run_server(client_config, None, None).await; - - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - // game client - let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), client_port); - tracing::info!(address = %local_addr, "Sending hello"); - socket.send_to(b"hello", &local_addr).await.unwrap(); - - // since the debug filter doesn't change the data, it should be exactly the same - let value = timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .unwrap() - .unwrap(); - assert_eq!("hello", value); } diff --git a/tests/firewall.rs b/tests/firewall.rs deleted file mode 100644 index 62f2edca0..000000000 --- a/tests/firewall.rs +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use tokio::{ - sync::mpsc, - time::{timeout, Duration}, -}; - -use quilkin::{ - config::Filter, - filters::{Firewall, StaticFilter}, - net::endpoint::Endpoint, - test::{AddressType, TestHelper}, -}; - -#[tokio::test] -#[cfg_attr(target_os = "macos", ignore)] -async fn ipv4_firewall_allow() { - let mut t = TestHelper::default(); - let yaml = " -on_read: - - action: ALLOW - sources: - - 127.0.0.1/32 - ports: - - %1 -on_write: - - action: ALLOW - sources: - - 127.0.0.0/24 - ports: - - %2 -"; - let mut rx = test(&mut t, yaml, AddressType::Ipv4).await; - - assert_eq!( - "hello", - timeout(Duration::from_millis(500), rx.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); -} - -#[tokio::test] -#[cfg_attr(target_os = "macos", ignore)] -async fn ipv6_firewall_allow() { - let mut t = TestHelper::default(); - let yaml = " -on_read: - - action: ALLOW - sources: - - ::1/128 - ports: - - %1 -on_write: - - action: ALLOW - sources: - - ::1/64 - ports: - - %2 -"; - let mut rx = test(&mut t, yaml, AddressType::Ipv6).await; - - assert_eq!( - "hello", - timeout(Duration::from_millis(500), rx.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); -} - -#[tokio::test] -async fn ipv4_firewall_read_deny() { - let mut t = TestHelper::default(); - let yaml = " -on_read: - - action: DENY - sources: - - 127.0.0.1/32 - ports: - - %1 -on_write: - - action: ALLOW - sources: - - 127.0.0.0/24 - ports: - - %2 -"; - let mut rx = test(&mut t, yaml, AddressType::Ipv4).await; - - let result = timeout(Duration::from_millis(500), rx.recv()).await; - assert!(result.is_err(), "should not have received a packet"); -} - -#[tokio::test] -async fn ipv6_firewall_read_deny() { - let mut t = TestHelper::default(); - let yaml = " -on_read: - - action: DENY - sources: - - ::1/128 - ports: - - %1 -on_write: - - action: ALLOW - sources: - - ::1/64 - ports: - - %2 -"; - let mut rx = test(&mut t, yaml, AddressType::Ipv6).await; - - let result = timeout(Duration::from_millis(500), rx.recv()).await; - assert!(result.is_err(), "should not have received a packet"); -} - -#[tokio::test] -async fn ipv4_firewall_write_deny() { - let mut t = TestHelper::default(); - let yaml = " -on_read: - - action: ALLOW - sources: - - 127.0.0.1/32 - ports: - - %1 -on_write: - - action: DENY - sources: - - 127.0.0.0/24 - ports: - - %2 -"; - let mut rx = test(&mut t, yaml, AddressType::Ipv4).await; - - let result = timeout(Duration::from_millis(500), rx.recv()).await; - assert!(result.is_err(), "should not have received a packet"); -} - -#[tokio::test] -async fn ipv6_firewall_write_deny() { - let mut t = TestHelper::default(); - - let yaml = " -on_read: - - action: ALLOW - sources: - - ::1/128 - ports: - - %1 -on_write: - - action: DENY - sources: - - ::1/64 - ports: - - %2 -"; - let mut rx = test(&mut t, yaml, AddressType::Ipv6).await; - - let result = timeout(Duration::from_millis(500), rx.recv()).await; - assert!(result.is_err(), "should not have received a packet"); -} - -async fn test(t: &mut TestHelper, yaml: &str, address_type: AddressType) -> mpsc::Receiver { - let echo = t.run_echo_server(address_type).await; - - let (rx, socket) = match address_type { - AddressType::Ipv4 => t.open_ipv4_socket_and_recv_multiple_packets().await, - AddressType::Ipv6 => t.open_socket_and_recv_multiple_packets().await, - AddressType::Random => unreachable!(), - }; - - let client_addr = match address_type { - AddressType::Ipv4 => socket.local_addr().unwrap(), - AddressType::Ipv6 => socket.local_addr().unwrap(), - AddressType::Random => unreachable!(), - }; - - let yaml = yaml - .replace("%1", client_addr.port().to_string().as_str()) - .replace("%2", echo.port().to_string().as_str()); - tracing::info!(config = yaml.as_str(), "Config"); - - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: Firewall::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml.as_str()).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - - let proxy_socket = - quilkin::net::raw_socket_with_reuse_and_address(address_type.into()).unwrap(); - let local_addr = proxy_socket.local_addr().unwrap().as_socket().unwrap(); - - let proxy = quilkin::components::proxy::Proxy { - socket: Some(proxy_socket), - ..Default::default() - }; - - t.run_server(server_config, Some(proxy), None).await; - - tracing::info!(source = %client_addr, address = %local_addr, "Sending hello"); - socket.send_to(b"hello", local_addr).await.unwrap(); - rx -} diff --git a/tests/health.rs b/tests/health.rs index 1ce9ac1f8..9a91afb99 100644 --- a/tests/health.rs +++ b/tests/health.rs @@ -30,34 +30,38 @@ async fn health_server() { server_config.clusters.modify(|clusters| { clusters.insert_default(["127.0.0.1:0".parse::().unwrap()].into()) }); - t.run_server( - server_config, - None, - Some(Some((std::net::Ipv6Addr::UNSPECIFIED, 9093).into())), - ) - .await; - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - - let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) - .build_http::>(); - use http_body_util::BodyExt; - let resp = client - .get(Uri::from_static(LIVE_ADDRESS)) - .await - .unwrap() - .into_body() - .collect() - .await - .unwrap() - .to_bytes() - .to_vec(); - - assert_eq!("ok", String::from_utf8(resp).unwrap()); - - let _ = panic::catch_unwind(|| { - panic!("oh no!"); - }); - let resp = client.get(Uri::from_static(LIVE_ADDRESS)).await.unwrap(); - assert!(resp.status().is_server_error(), "Should be unhealthy"); + tokio::spawn(async move { + t.run_server( + server_config, + None, + Some(Some((std::net::Ipv6Addr::UNSPECIFIED, 9093).into())), + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + + let client = + hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .build_http::>(); + use http_body_util::BodyExt; + let resp = client + .get(Uri::from_static(LIVE_ADDRESS)) + .await + .unwrap() + .into_body() + .collect() + .await + .unwrap() + .to_bytes() + .to_vec(); + + assert_eq!("ok", String::from_utf8(resp).unwrap()); + + let _ = panic::catch_unwind(|| { + panic!("oh no!"); + }); + + let resp = client.get(Uri::from_static(LIVE_ADDRESS)).await.unwrap(); + assert!(resp.status().is_server_error(), "Should be unhealthy"); + }); } diff --git a/tests/load_balancer.rs b/tests/load_balancer.rs index 4f1c3755a..8ecdedf4f 100644 --- a/tests/load_balancer.rs +++ b/tests/load_balancer.rs @@ -36,47 +36,49 @@ policy: ROUND_ROBIN "; let selected_endpoint = Arc::new(Mutex::new(None::)); - let mut echo_addresses = std::collections::BTreeSet::new(); - for _ in 0..2 { - let selected_endpoint = selected_endpoint.clone(); - echo_addresses.insert( - t.run_echo_server_with_tap(AddressType::Random, move |_, _, echo_addr| { - let _ = selected_endpoint.lock().unwrap().replace(echo_addr); - }) - .await, - ); - } + tokio::spawn(async move { + let mut echo_addresses = std::collections::BTreeSet::new(); + for _ in 0..2 { + let selected_endpoint = selected_endpoint.clone(); + echo_addresses.insert( + t.run_echo_server_with_tap(AddressType::Random, move |_, _, echo_addr| { + let _ = selected_endpoint.lock().unwrap().replace(echo_addr); + }) + .await, + ); + } - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config.clusters.modify(|clusters| { - clusters.insert_default(echo_addresses.iter().cloned().map(Endpoint::new).collect()) - }); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: LoadBalancer::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config.clusters.modify(|clusters| { + clusters.insert_default(echo_addresses.iter().cloned().map(Endpoint::new).collect()) + }); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: LoadBalancer::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml).unwrap(), + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); - let server_port = t.run_server(server_config, None, None).await; - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); + let server_port = t.run_server(server_config, None, None).await; + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - for addr in echo_addresses { - socket.send_to(b"hello", &server_addr).await.unwrap(); - let value = timeout(Duration::from_secs(5), recv_chan.recv()) - .await - .unwrap() - .unwrap(); - assert_eq!("hello", value); + for addr in echo_addresses { + socket.send_to(b"hello", &server_addr).await.unwrap(); + let value = timeout(Duration::from_secs(5), recv_chan.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!("hello", value); - assert_eq!( - addr, - selected_endpoint.lock().unwrap().take().unwrap().into() - ); - } + assert_eq!( + addr, + selected_endpoint.lock().unwrap().take().unwrap().into() + ); + } + }); } diff --git a/tests/local_rate_limit.rs b/tests/local_rate_limit.rs index 205b3a423..a7ce7e125 100644 --- a/tests/local_rate_limit.rs +++ b/tests/local_rate_limit.rs @@ -34,47 +34,50 @@ async fn local_rate_limit_filter() { max_packets: 2 period: 1 "; - let echo = t.run_echo_server(AddressType::Random).await; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([Filter { - name: LocalRateLimit::factory().name().into(), - label: None, - config: serde_yaml::from_str(yaml).unwrap(), - }]) - .map(std::sync::Arc::new) - .unwrap(), - ); - tracing::trace!("spawning server"); - let server_port = t.run_server(server_config, None, None).await; - let server_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, server_port)); + tokio::spawn(async move { + let echo = t.run_echo_server(AddressType::Random).await; - let msg = "hello"; - let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([Filter { + name: LocalRateLimit::factory().name().into(), + label: None, + config: serde_yaml::from_str(yaml).unwrap(), + }]) + .map(std::sync::Arc::new) + .unwrap(), + ); + tracing::trace!("spawning server"); + let server_port = t.run_server(server_config, None, None).await; + let server_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, server_port)); - for _ in 0..3 { - tracing::trace!(%server_addr, %msg, "sending"); - socket.send_to(msg.as_bytes(), &server_addr).await.unwrap(); - } + let msg = "hello"; + let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; - for _ in 0..2 { - assert_eq!( - msg, - timeout(Duration::from_millis(500), rx.recv()) - .await - .unwrap() - .unwrap() - ); - } + for _ in 0..3 { + tracing::trace!(%server_addr, %msg, "sending"); + socket.send_to(msg.as_bytes(), &server_addr).await.unwrap(); + } + + for _ in 0..2 { + assert_eq!( + msg, + timeout(Duration::from_millis(500), rx.recv()) + .await + .unwrap() + .unwrap() + ); + } - // Allow enough time to have received any response. - tokio::time::sleep(Duration::from_millis(100)).await; - // Check that we do not get any response. - assert!(timeout(Duration::from_millis(500), rx.recv()) - .await - .is_err()); + // Allow enough time to have received any response. + tokio::time::sleep(Duration::from_millis(100)).await; + // Check that we do not get any response. + assert!(timeout(Duration::from_millis(500), rx.recv()) + .await + .is_err()); + }); } diff --git a/tests/match.rs b/tests/match.rs index d131f97c1..196fbd16e 100644 --- a/tests/match.rs +++ b/tests/match.rs @@ -29,15 +29,16 @@ use quilkin::{ #[cfg_attr(target_os = "macos", ignore)] async fn r#match() { let mut t = TestHelper::default(); - let echo = t.run_echo_server(AddressType::Random).await; + tokio::spawn(async move { + let echo = t.run_echo_server(AddressType::Random).await; - let capture_yaml = " + let capture_yaml = " suffix: size: 3 remove: true -"; + "; - let matches_yaml = " + let matches_yaml = " on_read: metadataKey: quilkin.dev/capture fallthrough: @@ -56,80 +57,81 @@ on_read: config: on_read: APPEND bytes: YWJj # abc -"; - - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - server_config.filters.store( - quilkin::filters::FilterChain::try_create([ - Filter { - name: Capture::NAME.into(), - label: None, - config: serde_yaml::from_str(capture_yaml).unwrap(), - }, - Filter { - name: Match::NAME.into(), - label: None, - config: serde_yaml::from_str(matches_yaml).unwrap(), - }, - ]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - let server_port = t.run_server(server_config, None, None).await; - - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); - - // abc packet - let msg = b"helloabc"; - socket.send_to(msg, &local_addr).await.unwrap(); - - assert_eq!( - "helloxyz", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); - - // send an xyz packet - let msg = b"helloxyz"; - socket.send_to(msg, &local_addr).await.unwrap(); - - assert_eq!( - "helloabc", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); - - // fallthrough packet - let msg = b"hellodef"; - socket.send_to(msg, &local_addr).await.unwrap(); - - assert_eq!( - "hellodef", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); - - // second fallthrough packet - let msg = b"hellofgh"; - socket.send_to(msg, &local_addr).await.unwrap(); - - assert_eq!( - "hellodef", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); + "; + + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + server_config.filters.store( + quilkin::filters::FilterChain::try_create([ + Filter { + name: Capture::NAME.into(), + label: None, + config: serde_yaml::from_str(capture_yaml).unwrap(), + }, + Filter { + name: Match::NAME.into(), + label: None, + config: serde_yaml::from_str(matches_yaml).unwrap(), + }, + ]) + .map(std::sync::Arc::new) + .unwrap(), + ); + + let server_port = t.run_server(server_config, None, None).await; + + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), server_port); + + // abc packet + let msg = b"helloabc"; + socket.send_to(msg, &local_addr).await.unwrap(); + + assert_eq!( + "helloxyz", + timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + + // send an xyz packet + let msg = b"helloxyz"; + socket.send_to(msg, &local_addr).await.unwrap(); + + assert_eq!( + "helloabc", + timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + + // fallthrough packet + let msg = b"hellodef"; + socket.send_to(msg, &local_addr).await.unwrap(); + + assert_eq!( + "hellodef", + timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + + // second fallthrough packet + let msg = b"hellofgh"; + socket.send_to(msg, &local_addr).await.unwrap(); + + assert_eq!( + "hellodef", + timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + }); } diff --git a/tests/metrics.rs b/tests/metrics.rs index 9a3b3eb91..85e4c9701 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -26,70 +26,75 @@ use quilkin::{ async fn metrics_server() { let mut t = TestHelper::default(); - // create an echo server as an endpoint. - let echo = t.run_echo_server(AddressType::Random).await; - let metrics_port = quilkin::test::available_addr(AddressType::Random) - .await - .port(); + tokio::spawn(async move { + // create an echo server as an endpoint. + let echo = t.run_echo_server(AddressType::Random).await; + let metrics_port = quilkin::test::available_addr(AddressType::Random) + .await + .port(); - // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config - .clusters - .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); - let server_port = t - .run_server( - server_config, - None, - Some(Some((std::net::Ipv4Addr::UNSPECIFIED, metrics_port).into())), - ) - .await; + // create server configuration + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config + .clusters + .modify(|clusters| clusters.insert_default([Endpoint::new(echo.clone())].into())); + let server_port = t + .run_server( + server_config, + None, + Some(Some((std::net::Ipv4Addr::UNSPECIFIED, metrics_port).into())), + ) + .await; - // create a local client - let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - client_config.clusters.modify(|clusters| { - clusters.insert_default( - [Endpoint::new( - (std::net::Ipv6Addr::LOCALHOST, server_port).into(), - )] - .into(), - ) - }); - let client_port = t.run_server(client_config, None, None).await; + // create a local client + let client_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + client_config.clusters.modify(|clusters| { + clusters.insert_default( + [Endpoint::new( + (std::net::Ipv6Addr::LOCALHOST, server_port).into(), + )] + .into(), + ) + }); + let client_port = t.run_server(client_config, None, None).await; - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + // let's send the packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - // game_client - let local_addr = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, client_port)); - tracing::info!(address = %local_addr, "Sending hello"); - socket.send_to(b"hello", &local_addr).await.unwrap(); + // game_client + let local_addr = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, client_port)); + tracing::info!(address = %local_addr, "Sending hello"); + socket.send_to(b"hello", &local_addr).await.unwrap(); - let _ = tokio::time::timeout(std::time::Duration::from_millis(100), recv_chan.recv()) - .await - .unwrap() - .unwrap(); + let _ = tokio::time::timeout(std::time::Duration::from_millis(100), recv_chan.recv()) + .await + .unwrap() + .unwrap(); - let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) - .build_http::>(); - use http_body_util::BodyExt; - let resp = client - .get( - format!("http://localhost:{metrics_port}/metrics") - .parse() - .unwrap(), - ) - .await - .unwrap() - .into_body() - .collect() - .await - .unwrap() - .to_bytes(); + let client = + hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .build_http::>(); + use http_body_util::BodyExt; + let resp = client + .get( + format!("http://localhost:{metrics_port}/metrics") + .parse() + .unwrap(), + ) + .await + .unwrap() + .into_body() + .collect() + .await + .unwrap() + .to_bytes(); - let response = String::from_utf8(resp.to_vec()).unwrap(); - let read_regex = regex::Regex::new(r#"quilkin_packets_total\{.*event="read".*\} 2"#).unwrap(); - let write_regex = regex::Regex::new(r#"quilkin_packets_total\{.*event="write".*\} 2"#).unwrap(); - assert!(read_regex.is_match(&response)); - assert!(write_regex.is_match(&response)); + let response = String::from_utf8(resp.to_vec()).unwrap(); + let read_regex = + regex::Regex::new(r#"quilkin_packets_total\{.*event="read".*\} 2"#).unwrap(); + let write_regex = + regex::Regex::new(r#"quilkin_packets_total\{.*event="write".*\} 2"#).unwrap(); + assert!(read_regex.is_match(&response)); + assert!(write_regex.is_match(&response)); + }); } diff --git a/tests/no_filter.rs b/tests/no_filter.rs index e159c890a..2cadc2b69 100644 --- a/tests/no_filter.rs +++ b/tests/no_filter.rs @@ -27,42 +27,44 @@ use quilkin::{ async fn echo() { let mut t = TestHelper::default(); - // create two echo servers as endpoints - let server1 = t.run_echo_server(AddressType::Random).await; - let server2 = t.run_echo_server(AddressType::Random).await; + tokio::spawn(async move { + // create two echo servers as endpoints + let server1 = t.run_echo_server(AddressType::Random).await; + let server2 = t.run_echo_server(AddressType::Random).await; - // create server configuration - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config.clusters.modify(|clusters| { - clusters.insert_default( - [ - Endpoint::new(server1.clone()), - Endpoint::new(server2.clone()), - ] - .into(), - ) - }); + // create server configuration + let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); + server_config.clusters.modify(|clusters| { + clusters.insert_default( + [ + Endpoint::new(server1.clone()), + Endpoint::new(server2.clone()), + ] + .into(), + ) + }); - let local_port = t.run_server(server_config, None, None).await; - let local_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, local_port)); + let local_port = t.run_server(server_config, None, None).await; + let local_addr = std::net::SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, local_port)); - // let's send the packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + // let's send the packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - socket.send_to(b"hello", &local_addr).await.unwrap(); - let value = timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .unwrap() - .unwrap(); - assert_eq!("hello", value); - let value = timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .unwrap() - .unwrap(); - assert_eq!("hello", value); + socket.send_to(b"hello", &local_addr).await.unwrap(); + let value = timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!("hello", value); + let value = timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!("hello", value); - // should only be two returned items - assert!(timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .is_err()); + // should only be two returned items + assert!(timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .is_err()); + }); } diff --git a/tests/qcmp.rs b/tests/qcmp.rs index ca089e4ab..9ad1fff38 100644 --- a/tests/qcmp.rs +++ b/tests/qcmp.rs @@ -18,24 +18,15 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::time::Duration; -use quilkin::{ - codec::qcmp::Protocol, - test::{AddressType, TestHelper}, -}; +use quilkin::{codec::qcmp::Protocol, test::AddressType}; #[tokio::test] #[cfg_attr(target_os = "macos", ignore)] async fn proxy_ping() { - let mut t = TestHelper::default(); let qcmp = quilkin::net::raw_socket_with_reuse(0).unwrap(); let qcmp_port = quilkin::net::socket_port(&qcmp); - let server_proxy = quilkin::components::proxy::Proxy { - qcmp, - to: vec![(Ipv4Addr::UNSPECIFIED, 0).into()], - ..<_>::default() - }; - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - t.run_server(server_config, Some(server_proxy), None).await; + let (_tx, rx) = quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing); + let _task = quilkin::codec::qcmp::spawn(qcmp, rx); ping(qcmp_port).await; } @@ -45,18 +36,15 @@ async fn agent_ping() { let qcmp_port = quilkin::test::available_addr(AddressType::Random) .await .port(); - let agent = quilkin::cli::Agent { - qcmp_port, - ..<_>::default() - }; let server_config = std::sync::Arc::new(quilkin::Config::default_agent()); let (_tx, rx) = quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing); - tokio::spawn(async move { - agent - .run(None, server_config, Default::default(), rx) - .await - .expect("Agent should run") - }); + tokio::spawn( + quilkin::cli::Publish::default() + .qcmp() + .qcmp_port(qcmp_port) + .spawn_publishers(&server_config, &rx) + .unwrap(), + ); ping(qcmp_port).await; } diff --git a/tests/token_router.rs b/tests/token_router.rs index a4a91a1dc..4e2b1139b 100644 --- a/tests/token_router.rs +++ b/tests/token_router.rs @@ -31,30 +31,32 @@ use tokio::time::{timeout, Duration}; async fn token_router() { let mut t = TestHelper::default(); - let local_addr = echo_server(&mut t).await; - - // valid packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - let msg = b"helloabc"; - tracing::trace!(%local_addr, "sending echo packet"); - socket.send_to(msg, &local_addr).await.unwrap(); - - tracing::trace!("awaiting echo packet"); - assert_eq!( - "hello", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); - - // send an invalid packet - let msg = b"helloxyz"; - socket.send_to(msg, &local_addr).await.unwrap(); - - let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; - assert!(result.is_err(), "should not have received a packet"); + tokio::spawn(async move { + let local_addr = echo_server(&mut t).await; + + // valid packet + let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; + + let msg = b"helloabc"; + tracing::trace!(%local_addr, "sending echo packet"); + socket.send_to(msg, &local_addr).await.unwrap(); + + tracing::trace!("awaiting echo packet"); + assert_eq!( + "hello", + timeout(Duration::from_millis(500), recv_chan.recv()) + .await + .expect("should have received a packet") + .unwrap() + ); + + // send an invalid packet + let msg = b"helloxyz"; + socket.send_to(msg, &local_addr).await.unwrap(); + + let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; + assert!(result.is_err(), "should not have received a packet"); + }); } // This test covers the scenario in https://github.com/googleforgames/quilkin/issues/988 @@ -64,59 +66,61 @@ async fn token_router() { async fn multiple_clients() { let limit = 10_000; let mut t = TestHelper::default(); - let local_addr = echo_server(&mut t).await; - - let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await; - let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await; - - tokio::spawn(async move { - // some room to breath - tokio::time::sleep(Duration::from_millis(50)).await; - for _ in 0..limit { - a_socket.send_to(b"Aabc", &local_addr).await.unwrap(); - tokio::time::sleep(Duration::from_nanos(5)).await; - } - }); tokio::spawn(async move { - // some room to breath - tokio::time::sleep(Duration::from_millis(50)).await; - for _ in 0..limit { - b_socket.send_to(b"Babc", &local_addr).await.unwrap(); - tokio::time::sleep(Duration::from_nanos(5)).await; - } - }); + let local_addr = echo_server(&mut t).await; - let mut success = 0; - let mut failed = 0; - for _ in 0..limit { - match timeout(Duration::from_millis(60), a_rx.recv()).await { - Ok(packet) => { - assert_eq!("A", packet.unwrap()); - success += 1; + let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await; + let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await; + + tokio::spawn(async move { + // some room to breath + tokio::time::sleep(Duration::from_millis(50)).await; + for _ in 0..limit { + a_socket.send_to(b"Aabc", &local_addr).await.unwrap(); + tokio::time::sleep(Duration::from_nanos(5)).await; } - Err(_) => { - failed += 1; + }); + tokio::spawn(async move { + // some room to breath + tokio::time::sleep(Duration::from_millis(50)).await; + for _ in 0..limit { + b_socket.send_to(b"Babc", &local_addr).await.unwrap(); + tokio::time::sleep(Duration::from_nanos(5)).await; } - } - match timeout(Duration::from_millis(60), b_rx.recv()).await { - Ok(packet) => { - assert_eq!("B", packet.unwrap()); - success += 1; + }); + + let mut success = 0; + let mut failed = 0; + for _ in 0..limit { + match timeout(Duration::from_millis(60), a_rx.recv()).await { + Ok(packet) => { + assert_eq!("A", packet.unwrap()); + success += 1; + } + Err(_) => { + failed += 1; + } } - Err(_) => { - failed += 1; + match timeout(Duration::from_millis(60), b_rx.recv()).await { + Ok(packet) => { + assert_eq!("B", packet.unwrap()); + success += 1; + } + Err(_) => { + failed += 1; + } } } - } - - // allow for some dropped packets, since UDP. - let threshold = 0.95 * (2 * limit) as f64; - assert!( - success as f64 > threshold, - "Success: {}, Failed: {}", - success, - failed - ); + + // allow for some dropped packets, since UDP. + let threshold = 0.95 * (2 * limit) as f64; + assert!( + success as f64 > threshold, + "Success: {}, Failed: {}", + success, + failed + ); + }); } // start an echo server and return what port it's on.