Skip to content

Commit 354b8db

Browse files
committed
refactor: Rename PendingSends to PacketQueue, move PacketQueue and io-uring to net module
1 parent 1ace05d commit 354b8db

File tree

15 files changed

+213
-175
lines changed

15 files changed

+213
-175
lines changed

crates/test/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ impl Pail {
503503
to: Vec::new(),
504504
to_tokens: None,
505505
management_servers,
506-
socket,
506+
socket: Some(socket),
507507
qcmp,
508508
phoenix,
509509
notifier: Some(rttx),

src/cli/proxy.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl Proxy {
123123
to: self.to,
124124
to_tokens,
125125
num_workers,
126-
socket,
126+
socket: Some(socket),
127127
qcmp,
128128
phoenix,
129129
notifier: None,

src/components/proxy.rs

+51-117
Original file line numberDiff line numberDiff line change
@@ -14,84 +14,10 @@
1414
* limitations under the License.
1515
*/
1616

17-
mod error;
17+
pub(crate) mod error;
1818
pub mod packet_router;
1919
mod sessions;
2020

21-
cfg_if::cfg_if! {
22-
if #[cfg(target_os = "linux")] {
23-
pub(crate) mod io_uring_shared;
24-
pub(crate) type PacketSendReceiver = io_uring_shared::EventFd;
25-
pub(crate) type PacketSendSender = io_uring_shared::EventFdWriter;
26-
} else {
27-
pub(crate) type PacketSendReceiver = tokio::sync::watch::Receiver<bool>;
28-
pub(crate) type PacketSendSender = tokio::sync::watch::Sender<bool>;
29-
}
30-
}
31-
32-
/// A simple packet queue that signals when a packet is pushed
33-
///
34-
/// For io_uring this notifies an eventfd that will be processed on the next
35-
/// completion loop
36-
#[derive(Clone)]
37-
pub struct PendingSends {
38-
packets: Arc<parking_lot::Mutex<Vec<SendPacket>>>,
39-
notify: PacketSendSender,
40-
}
41-
42-
impl PendingSends {
43-
pub fn new(capacity: usize) -> std::io::Result<(Self, PacketSendReceiver)> {
44-
#[cfg(target_os = "linux")]
45-
let (notify, rx) = {
46-
let rx = io_uring_shared::EventFd::new()?;
47-
(rx.writer(), rx)
48-
};
49-
#[cfg(not(target_os = "linux"))]
50-
let (notify, rx) = tokio::sync::watch::channel(true);
51-
52-
Ok((
53-
Self {
54-
packets: Arc::new(parking_lot::Mutex::new(Vec::with_capacity(capacity))),
55-
notify,
56-
},
57-
rx,
58-
))
59-
}
60-
61-
#[inline]
62-
pub(crate) fn capacity(&self) -> usize {
63-
self.packets.lock().capacity()
64-
}
65-
66-
/// Pushes a packet onto the queue to be sent, signalling a sender that
67-
/// it's available
68-
#[inline]
69-
pub(crate) fn push(&self, packet: SendPacket) {
70-
self.packets.lock().push(packet);
71-
#[cfg(target_os = "linux")]
72-
self.notify.write(1);
73-
#[cfg(not(target_os = "linux"))]
74-
let _ = self.notify.send(true);
75-
}
76-
77-
/// Called to shutdown the consumer side of the sends (ie the io loop that is
78-
/// actually dequing and sending packets)
79-
#[inline]
80-
pub(crate) fn shutdown_receiver(&self) {
81-
#[cfg(target_os = "linux")]
82-
self.notify.write(0xdeadbeef);
83-
#[cfg(not(target_os = "linux"))]
84-
let _ = self.notify.send(false);
85-
}
86-
87-
/// Swaps the current queue with an empty one so we only lock for a pointer swap
88-
#[inline]
89-
pub fn swap(&self, mut swap: Vec<SendPacket>) -> Vec<SendPacket> {
90-
swap.clear();
91-
std::mem::replace(&mut self.packets.lock(), swap)
92-
}
93-
}
94-
9521
use super::RunArgs;
9622
pub use error::{ErrorMap, PipelineError};
9723
pub use sessions::SessionPool;
@@ -103,20 +29,6 @@ use std::{
10329
},
10430
};
10531

106-
pub struct SendPacket {
107-
/// The destination address of the packet
108-
pub destination: socket2::SockAddr,
109-
/// The packet data being sent
110-
pub data: crate::collections::FrozenPoolBuffer,
111-
/// The asn info for the sender, used for metrics
112-
pub asn_info: Option<crate::net::maxmind_db::MetricsIpNetEntry>,
113-
}
114-
115-
pub struct RecvPacket {
116-
pub source: SocketAddr,
117-
pub data: crate::collections::PoolBuffer,
118-
}
119-
12032
#[derive(Clone, Debug)]
12133
pub struct Ready {
12234
pub idle_request_interval: std::time::Duration,
@@ -156,7 +68,7 @@ pub struct Proxy {
15668
pub management_servers: Vec<tonic::transport::Endpoint>,
15769
pub to: Vec<SocketAddr>,
15870
pub to_tokens: Option<ToTokens>,
159-
pub socket: socket2::Socket,
71+
pub socket: Option<socket2::Socket>,
16072
pub qcmp: socket2::Socket,
16173
pub phoenix: crate::net::TcpListener,
16274
pub notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
@@ -173,7 +85,7 @@ impl Default for Proxy {
17385
management_servers: Vec::new(),
17486
to: Vec::new(),
17587
to_tokens: None,
176-
socket: crate::net::raw_socket_with_reuse(0).unwrap(),
88+
socket: Some(crate::net::raw_socket_with_reuse(0).unwrap()),
17789
qcmp,
17890
phoenix,
17991
notifier: None,
@@ -183,15 +95,16 @@ impl Default for Proxy {
18395

18496
impl Proxy {
18597
pub async fn run(
186-
self,
98+
mut self,
18799
RunArgs {
188100
config,
189101
ready,
190102
mut shutdown_rx,
191103
}: RunArgs<Ready>,
192104
initialized: Option<tokio::sync::oneshot::Sender<()>>,
193105
) -> crate::Result<()> {
194-
let _mmdb_task = self.mmdb.map(|source| {
106+
let _mmdb_task = self.mmdb.as_ref().map(|source| {
107+
let source = source.clone();
195108
tokio::spawn(async move {
196109
while let Err(error) =
197110
tryhard::retry_fn(|| crate::MaxmindDb::update(source.clone()))
@@ -205,7 +118,7 @@ impl Proxy {
205118
});
206119

207120
if !self.to.is_empty() {
208-
let endpoints = if let Some(tt) = self.to_tokens {
121+
let endpoints = if let Some(tt) = &self.to_tokens {
209122
let (unique, overflow) = 256u64.overflowing_pow(tt.length as _);
210123
if overflow {
211124
panic!(
@@ -355,28 +268,7 @@ impl Proxy {
355268
.expect("failed to spawn proxy-subscription thread");
356269
}
357270

358-
let num_workers = self.num_workers.get();
359-
let buffer_pool = Arc::new(crate::collections::BufferPool::new(num_workers, 2 * 1024));
360-
361-
let mut worker_sends = Vec::with_capacity(num_workers);
362-
let mut session_sends = Vec::with_capacity(num_workers);
363-
for _ in 0..num_workers {
364-
let psends = PendingSends::new(15)?;
365-
session_sends.push(psends.0.clone());
366-
worker_sends.push(psends);
367-
}
368-
369-
let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone());
370-
371-
packet_router::spawn_receivers(
372-
config.clone(),
373-
self.socket,
374-
worker_sends,
375-
&sessions,
376-
buffer_pool,
377-
)
378-
.await?;
379-
271+
let router_shutdown = self.spawn_packet_router(config.clone()).await?;
380272
crate::codec::qcmp::spawn(self.qcmp, shutdown_rx.clone())?;
381273
crate::net::phoenix::spawn(
382274
self.phoenix,
@@ -395,8 +287,50 @@ impl Proxy {
395287
.await
396288
.map_err(|error| eyre::eyre!(error))?;
397289

398-
sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal);
290+
(router_shutdown)(shutdown_rx);
399291

400292
Ok(())
401293
}
294+
295+
pub async fn spawn_packet_router(
296+
&mut self,
297+
config: Arc<crate::config::Config>,
298+
) -> eyre::Result<impl FnOnce(crate::ShutdownRx)> {
299+
self.spawn_user_space_router(config).await
300+
}
301+
302+
/// Launches the user space implementation of the packet router using
303+
/// sockets. This implementation uses a pool of buffers and sockets to
304+
/// manage UDP sessions and sockets. On Linux this will use io-uring, where
305+
/// as it will use epoll interfaces on non-Linux platforms.
306+
pub async fn spawn_user_space_router(
307+
&mut self,
308+
config: Arc<crate::config::Config>,
309+
) -> eyre::Result<impl FnOnce(crate::ShutdownRx)> {
310+
let workers = self.num_workers.get();
311+
let buffer_pool = Arc::new(crate::collections::BufferPool::new(workers, 2 * 1024));
312+
313+
let mut worker_sends = Vec::with_capacity(workers);
314+
let mut session_sends = Vec::with_capacity(workers);
315+
for _ in 0..workers {
316+
let queue = crate::net::queue(15)?;
317+
session_sends.push(queue.0.clone());
318+
worker_sends.push(queue);
319+
}
320+
321+
let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone());
322+
323+
packet_router::spawn_receivers(
324+
config,
325+
self.socket.take().unwrap(),
326+
worker_sends,
327+
&sessions,
328+
buffer_pool,
329+
)
330+
.await?;
331+
332+
Ok(move |shutdown_rx: crate::ShutdownRx| {
333+
sessions.shutdown(*shutdown_rx.borrow() == crate::ShutdownKind::Normal);
334+
})
335+
}
402336
}

src/components/proxy/packet_router.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl<P: PacketMut> DownstreamPacket<P> {
143143

144144
/// Represents the required arguments to run a worker task that
145145
/// processes packets received downstream.
146-
pub struct DownstreamReceiveWorkerConfig {
146+
pub(crate) struct DownstreamReceiveWorkerConfig {
147147
/// ID of the worker.
148148
pub worker_id: usize,
149149
pub port: u16,
@@ -158,10 +158,10 @@ pub struct DownstreamReceiveWorkerConfig {
158158
/// This function also spawns the set of worker tasks responsible for consuming packets
159159
/// off the aforementioned queue and processing them through the filter chain and session
160160
/// pipeline.
161-
pub async fn spawn_receivers(
161+
pub(crate) async fn spawn_receivers(
162162
config: Arc<Config>,
163163
socket: socket2::Socket,
164-
worker_sends: Vec<(super::PendingSends, super::PacketSendReceiver)>,
164+
worker_sends: Vec<crate::net::PacketQueue>,
165165
sessions: &Arc<SessionPool>,
166166
buffer_pool: Arc<crate::collections::BufferPool>,
167167
) -> crate::Result<()> {

src/components/proxy/packet_router/io_uring.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
use crate::components::proxy;
1817
use eyre::Context as _;
1918

2019
impl super::DownstreamReceiveWorkerConfig {
21-
pub async fn spawn(
22-
self,
23-
pending_sends: (proxy::PendingSends, proxy::PacketSendReceiver),
24-
) -> eyre::Result<()> {
25-
use crate::components::proxy::io_uring_shared;
20+
pub async fn spawn(self, pending_sends: crate::net::PacketQueue) -> eyre::Result<()> {
21+
use crate::net::io_uring;
2622

2723
let Self {
2824
worker_id,
@@ -36,11 +32,11 @@ impl super::DownstreamReceiveWorkerConfig {
3632
let socket =
3733
crate::net::DualStackLocalSocket::new(port).context("failed to bind socket")?;
3834

39-
let io_loop = io_uring_shared::IoUringLoop::new(2000, socket)?;
35+
let io_loop = io_uring::IoUringLoop::new(2000, socket)?;
4036
io_loop
4137
.spawn(
4238
format!("packet-router-{worker_id}"),
43-
io_uring_shared::PacketProcessorCtx::Router {
39+
io_uring::PacketProcessorCtx::Router {
4440
config,
4541
sessions,
4642
error_acc: super::super::error::ErrorAccumulator::new(error_sender),

src/components/proxy/packet_router/reference.rs

+4-9
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,8 @@
1616

1717
//! The reference implementation is used for non-Linux targets
1818
19-
use crate::components::proxy;
20-
2119
impl super::DownstreamReceiveWorkerConfig {
22-
pub async fn spawn(
23-
self,
24-
pending_sends: (proxy::PendingSends, proxy::PacketSendReceiver),
25-
) -> eyre::Result<()> {
20+
pub async fn spawn(self, packet_queue: crate::net::PacketQueue) -> eyre::Result<()> {
2621
let Self {
2722
worker_id,
2823
port,
@@ -47,16 +42,16 @@ impl super::DownstreamReceiveWorkerConfig {
4742
let send_socket = socket.clone();
4843

4944
let inner_task = async move {
50-
let (pending_sends, mut sends_rx) = pending_sends;
51-
let mut sends_double_buffer = Vec::with_capacity(pending_sends.capacity());
45+
let (packet_queue, mut sends_rx) = packet_queue;
46+
let mut sends_double_buffer = Vec::with_capacity(packet_queue.capacity());
5247

5348
while sends_rx.changed().await.is_ok() {
5449
if !*sends_rx.borrow() {
5550
tracing::trace!("io loop shutdown requested");
5651
break;
5752
}
5853

59-
sends_double_buffer = pending_sends.swap(sends_double_buffer);
54+
sends_double_buffer = packet_queue.swap(sends_double_buffer);
6055

6156
for packet in sends_double_buffer.drain(..sends_double_buffer.len()) {
6257
let (result, _) = send_socket

0 commit comments

Comments
 (0)