Skip to content

Commit

Permalink
Add XDP CLI options and initialization code (#1069)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle authored Jan 21, 2025
1 parent e3075a0 commit 7bfb3a6
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 76 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,14 +499,12 @@ impl Pail {
let task = tokio::spawn(async move {
components::proxy::Proxy {
num_workers: NonZeroUsize::new(1).unwrap(),
mmdb: None,
to: Vec::new(),
to_tokens: None,
management_servers,
socket: Some(socket),
qcmp,
phoenix,
notifier: Some(rttx),
..Default::default()
}
.run(
RunArgs {
Expand Down
26 changes: 23 additions & 3 deletions crates/xdp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,24 @@
pub use aya;
pub use xdp::{self, nic::NicIndex};

const PROGRAM: &[u8] = include_bytes!("../bin/packet-router.bin");
// object unfortunately has alignment requirements, so we need to make sure
// the raw bytes are aligned for a 64-bit ELF (8 bytes)

// https://users.rust-lang.org/t/can-i-conveniently-compile-bytes-into-a-rust-program-with-a-specific-alignment/24049/2
// This struct is generic in Bytes to admit unsizing coercions.
#[repr(C)] // guarantee 'bytes' comes after '_align'
struct AlignedTo<Align, Bytes: ?Sized> {
_align: [Align; 0],
bytes: Bytes,
}

// dummy static used to create aligned data
static ALIGNED: &AlignedTo<u64, [u8]> = &AlignedTo {
_align: [],
bytes: *include_bytes!("../bin/packet-router.bin"),
};

static PROGRAM: &[u8] = &ALIGNED.bytes;

#[derive(thiserror::Error, Debug)]
pub enum BindError {
Expand Down Expand Up @@ -90,6 +107,7 @@ impl EbpfProgram {
let port_range = std::fs::read_to_string("/proc/sys/net/ipv4/ip_local_port_range")?;
let (start, end) =
port_range
.trim()
.split_once(char::is_whitespace)
.ok_or(std::io::Error::new(
std::io::ErrorKind::InvalidData,
Expand Down Expand Up @@ -163,8 +181,10 @@ impl EbpfProgram {
nic: NicIndex,
flags: aya::programs::XdpFlags,
) -> Result<aya::programs::xdp::XdpLinkId, aya::programs::ProgramError> {
if let Err(error) = aya_log::EbpfLogger::init(&mut self.bpf) {
tracing::warn!(%error, "failed to initialize eBPF logging");
// Would be good to enable this if we do end up adding log messages to
// the eBPF program
if let Err(_error) = aya_log::EbpfLogger::init(&mut self.bpf) {
//tracing::warn!(%error, "failed to initialize eBPF logging");
}

// We use this entrypoint for now, but in the future we could also use
Expand Down
59 changes: 57 additions & 2 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,63 @@ pub struct Proxy {
/// to an management server after receiving no updates.
#[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")]
pub idle_request_interval_secs: Option<u64>,
/// Number of worker threads used to process packets. If not specified defaults
/// to number of cpus.
/// Number of worker threads used to process packets.
///
/// If not specified defaults to number of cpus. Has no effect if XDP is used,
/// as the number of workers is always the same as the NIC queue size.
#[clap(short, long, env = "QUILKIN_WORKERS")]
pub workers: Option<std::num::NonZeroUsize>,
#[clap(flatten)]
pub xdp_opts: XdpOptions,
}

/// XDP (eXpress Data Path) options
#[derive(clap::Args, Clone, Debug)]
pub struct XdpOptions {
/// The name of the network interface to bind the XDP socket(s) to.
///
/// If not specified quilkin will attempt to determine the most appropriate
/// network interface to use. Quilkin will exit with an error if the network
/// interface does not exist, or a suitable default cannot be determined.
#[clap(long = "publish.udp.xdp.network-interface")]
pub network_interface: Option<String>,
/// Forces the use of XDP.
///
/// If XDP is not available on the chosen NIC, Quilkin exits with an error.
/// If false, io-uring will be used as the fallback implementation.
#[clap(long = "publish.udp.xdp")]
pub force_xdp: bool,
/// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags)
///
/// If zero copy is not available on the chosen NIC, Quilkin exits with an error
#[clap(long = "publish.udp.xdp.zerocopy")]
pub force_zerocopy: bool,
/// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html)
///
/// TX checksum offload is an optional feature allowing the data portion of
/// a packet to have its internet checksum calculation offloaded to the NIC,
/// as otherwise this is done in software
#[clap(long = "publish.udp.xdp.tco")]
pub force_tx_checksum_offload: bool,
/// The maximum amount of memory mapped for packet buffers, in bytes
///
/// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time)
/// per NIC queue, ie 128MiB on a 32 queue NIC
#[clap(long = "publish.udp.xdp.memory-limit")]
pub maximum_memory: Option<u64>,
}

#[allow(clippy::derivable_impls)]
impl Default for XdpOptions {
fn default() -> Self {
Self {
network_interface: None,
force_xdp: false,
force_zerocopy: false,
force_tx_checksum_offload: false,
maximum_memory: None,
}
}
}

impl Default for Proxy {
Expand All @@ -72,6 +125,7 @@ impl Default for Proxy {
to_tokens: None,
idle_request_interval_secs: None,
workers: None,
xdp_opts: Default::default(),
}
}
}
Expand Down Expand Up @@ -127,6 +181,7 @@ impl Proxy {
qcmp,
phoenix,
notifier: None,
xdp: self.xdp_opts,
}
.run(
crate::components::RunArgs {
Expand Down
67 changes: 64 additions & 3 deletions src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct Proxy {
pub qcmp: socket2::Socket,
pub phoenix: crate::net::TcpListener,
pub notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
pub xdp: crate::cli::proxy::XdpOptions,
}

impl Default for Proxy {
Expand All @@ -89,6 +90,7 @@ impl Default for Proxy {
qcmp,
phoenix,
notifier: None,
xdp: Default::default(),
}
}
}
Expand Down Expand Up @@ -295,7 +297,26 @@ impl Proxy {
pub async fn spawn_packet_router(
&mut self,
config: Arc<crate::config::Config>,
) -> eyre::Result<impl FnOnce(crate::ShutdownRx)> {
) -> eyre::Result<Box<dyn FnOnce(crate::ShutdownRx) + Send>> {
#[cfg(target_os = "linux")]
{
match self.spawn_xdp(config.clone()) {
Ok(xdp) => {
return Ok(xdp);
}
Err(err) => {
if self.xdp.force_xdp {
return Err(err);
}

tracing::warn!(
?err,
"failed to spawn XDP I/O loop, falling back to io-uring"
);
}
}
}

self.spawn_user_space_router(config).await
}

Expand All @@ -306,7 +327,7 @@ impl Proxy {
pub async fn spawn_user_space_router(
&mut self,
config: Arc<crate::config::Config>,
) -> eyre::Result<impl FnOnce(crate::ShutdownRx)> {
) -> eyre::Result<Box<dyn FnOnce(crate::ShutdownRx) + Send>> {
let workers = self.num_workers.get();
let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024));

Expand All @@ -329,8 +350,48 @@ impl Proxy {
)
.await?;

Ok(move |shutdown_rx: crate::ShutdownRx| {
Ok(Box::new(move |shutdown_rx: crate::ShutdownRx| {
sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal);
}))
}

#[cfg(target_os = "linux")]
fn spawn_xdp(
&mut self,
config: Arc<crate::config::Config>,
) -> eyre::Result<Box<dyn FnOnce(crate::ShutdownRx) + Send>> {
use crate::net::xdp;
use eyre::Context as _;

// TODO: remove this once it's been more stabilized
if true {
eyre::bail!("temporarily disabled");
}

let Some(external_port) = self.socket.as_ref().and_then(|s| {
s.local_addr()
.ok()
.and_then(|la| la.as_socket().map(|sa| sa.port()))
}) else {
eyre::bail!("unable to determine port");
};

let workers = xdp::setup_xdp_io(xdp::XdpConfig {
nic: self
.xdp
.network_interface
.as_deref()
.map_or(xdp::NicConfig::Default, xdp::NicConfig::Name),
external_port,
maximum_packet_memory: self.xdp.maximum_memory,
require_zero_copy: self.xdp.force_zerocopy,
require_tx_checksum: self.xdp.force_tx_checksum_offload,
})
.context("failed to setup XDP")?;

let io_loop = xdp::spawn(workers, config).context("failed to spawn XDP I/O loop")?;
Ok(Box::new(move |srx: crate::ShutdownRx| {
io_loop.shutdown(*srx.borrow() == crate::ShutdownKind::Normal);
}))
}
}
Loading

0 comments on commit 7bfb3a6

Please sign in to comment.