Skip to content

Commit 70bf887

Browse files
authored
Merge pull request #1522 from microsoft/enhancement-xdp-refactor
[catpowder] Enhancement: XDP refactor and bug fixes
2 parents e6d0a67 + 60dec43 commit 70bf887

File tree

13 files changed

+930
-558
lines changed

13 files changed

+930
-558
lines changed

scripts/config/azure.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ raw_socket:
99
xdp_interface_index: 0
1010
xdp_cohost_mode: false
1111
xdp_always_poke_tx: false
12+
# If true, will send all packets out on the interface identified by xdp_vf_interface_index;
13+
# otherwise, or if xdp_vf_interface_index is not set, will send packets out on the interface
14+
# identified by xdp_interface_index.
15+
xdp_always_send_on_vf: false
1216
# Enable the following for XDP cohosting mode, or override in environment:
1317
# xdp_tcp_ports: [80, 443]
1418
# xdp_udp_ports: [53]

scripts/config/default.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ raw_socket:
99
xdp_interface_index: 0
1010
xdp_cohost_mode: false
1111
xdp_always_poke_tx: false
12+
# If true, will send all packets out on the interface identified by xdp_vf_interface_index;
13+
# otherwise, or if xdp_vf_interface_index is not set, will send packets out on the interface
14+
# identified by xdp_interface_index.
15+
xdp_always_send_on_vf: false
1216
# Enable the following for XDP cohosting mode, or override in environment:
1317
# xdp_tcp_ports: [80, 443]
1418
# xdp_udp_ports: [53]

src/rust/catpowder/win/cohosting.rs

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
//======================================================================================================================
5+
// Imports
6+
//======================================================================================================================
7+
8+
use std::rc::Rc;
9+
10+
use windows::Win32::Networking::WinSock::{
11+
closesocket, socket, WSACleanup, WSAIoctl, WSAStartup, AF_INET, INET_PORT_RANGE, INET_PORT_RESERVATION_INSTANCE,
12+
INVALID_SOCKET, IN_ADDR, IPPROTO_TCP, IPPROTO_UDP, SIO_ACQUIRE_PORT_RESERVATION, SOCKET, SOCK_DGRAM, SOCK_STREAM,
13+
WSADATA,
14+
};
15+
16+
use crate::{
17+
catnap::transport::error::expect_last_wsa_error,
18+
catpowder::win::ring::RuleSet,
19+
demikernel::config::Config,
20+
inetstack::protocols::{layer4::ephemeral::EphemeralPorts, Protocol},
21+
runtime::fail::Fail,
22+
};
23+
24+
//======================================================================================================================
25+
// Structures
26+
//======================================================================================================================
27+
28+
/// State to track port sharing state with the kernel for cohosting.
29+
pub struct PortSharingState {
30+
/// The local IP address of the host, from the config.
31+
local_ip: IN_ADDR,
32+
/// All TCP ports to be redirected.
33+
tcp_ports: Vec<u16>,
34+
/// All UDP ports to be redirected.
35+
udp_ports: Vec<u16>,
36+
/// The Winsock socket used to reserve ephemeral ports with the kernel.
37+
reserved_socket: SOCKET,
38+
/// The list of reserved ephemeral ports.
39+
reserved_ports: Vec<u16>,
40+
}
41+
42+
/// The state of cohosting.
43+
pub enum CohostingMode {
44+
/// No cohosting mode is enabled.
45+
None,
46+
/// Port sharing mode is enabled.
47+
PortSharing(PortSharingState),
48+
}
49+
50+
//======================================================================================================================
51+
// Implementations
52+
//======================================================================================================================
53+
54+
impl CohostingMode {
55+
/// Creates a new instance of `CohostingMode` based on the provided configuration.
56+
pub fn new(config: &Config) -> Result<Self, Fail> {
57+
if config.xdp_cohost_mode()? == false {
58+
return Ok(CohostingMode::None);
59+
}
60+
61+
let local_ip: IN_ADDR = IN_ADDR::from(config.local_ipv4_addr()?);
62+
63+
let (mut tcp_ports, mut udp_ports) = config.xdp_cohost_ports()?;
64+
65+
let reserved_protocol: Option<Protocol> = config.xdp_reserved_port_protocol()?;
66+
let reserved_port_count: Option<u16> = config.xdp_reserved_port_count()?;
67+
68+
let (reserved_socket, reserved_ports): (SOCKET, Vec<u16>) =
69+
if reserved_protocol.is_some() && reserved_port_count.is_some() {
70+
let protocol: Protocol = reserved_protocol.unwrap();
71+
let port_count: u16 = reserved_port_count.unwrap();
72+
73+
let mut data: WSADATA = WSADATA::default();
74+
if unsafe { WSAStartup(0x202u16, &mut data as *mut WSADATA) } != 0 {
75+
return Err(expect_last_wsa_error());
76+
}
77+
78+
trace!("reserving {} ports with protocol {:?}", port_count, protocol);
79+
80+
let (socket, ports) = reserve_port_blocks(port_count, protocol).or_else(|f: Fail| {
81+
let _ = unsafe { WSACleanup() };
82+
Err(f)
83+
})?;
84+
85+
match protocol {
86+
Protocol::Tcp => tcp_ports.extend(ports.iter().cloned()),
87+
Protocol::Udp => udp_ports.extend(ports.iter().cloned()),
88+
}
89+
90+
(socket, ports)
91+
} else {
92+
trace!("reserved port options not set; no ports reserved");
93+
(INVALID_SOCKET, vec![])
94+
};
95+
96+
trace!(
97+
"XDP cohost mode enabled. TCP ports: {:?}, UDP ports: {:?}",
98+
tcp_ports,
99+
udp_ports
100+
);
101+
102+
Ok(CohostingMode::PortSharing(PortSharingState {
103+
local_ip,
104+
tcp_ports,
105+
udp_ports,
106+
reserved_socket,
107+
reserved_ports,
108+
}))
109+
}
110+
111+
pub fn create_ruleset(&self) -> Rc<RuleSet> {
112+
match self {
113+
CohostingMode::None => return RuleSet::new_redirect_all(),
114+
CohostingMode::PortSharing(state) => {
115+
RuleSet::new_cohost(state.local_ip, state.tcp_ports.as_slice(), state.udp_ports.as_slice())
116+
},
117+
}
118+
}
119+
120+
pub fn ephemeral_ports(&self) -> EphemeralPorts {
121+
match self {
122+
CohostingMode::None => return EphemeralPorts::default(),
123+
CohostingMode::PortSharing(state) => {
124+
if state.reserved_ports.is_empty() {
125+
EphemeralPorts::default()
126+
} else {
127+
EphemeralPorts::new(state.reserved_ports.as_slice()).unwrap()
128+
}
129+
},
130+
}
131+
}
132+
}
133+
134+
fn reserve_port_blocks(port_count: u16, protocol: Protocol) -> Result<(SOCKET, Vec<u16>), Fail> {
135+
const MAX_HALVINGS: usize = 5;
136+
let mut ports: Vec<u16> = Vec::with_capacity(port_count as usize);
137+
138+
let mut reservation_len: u16 = port_count;
139+
let mut halvings: usize = 0;
140+
141+
let (sock_type, protocol) = match protocol {
142+
Protocol::Tcp => (SOCK_STREAM, IPPROTO_TCP.0),
143+
Protocol::Udp => (SOCK_DGRAM, IPPROTO_UDP.0),
144+
};
145+
146+
let s: SOCKET = unsafe { socket(AF_INET.0.into(), sock_type, protocol) };
147+
if s == INVALID_SOCKET {
148+
return Err(expect_last_wsa_error());
149+
}
150+
151+
while ports.len() < port_count as usize {
152+
trace!("reserve_port_blocks(): trying reservation length: {}", reservation_len);
153+
match reserve_ports(reservation_len, s) {
154+
Ok((start, count, _)) if count > 0 => {
155+
let end: u16 = start + (count - 1);
156+
trace!("reserve_port_blocks(): reserved ports: {}-{}", start, end);
157+
ports.extend(start..=end);
158+
},
159+
Ok(_) => {
160+
panic!("reserve_port_blocks(): reserved zero ports");
161+
},
162+
Err(e) => {
163+
halvings += 1;
164+
if halvings >= MAX_HALVINGS || reservation_len == 1 {
165+
error!("reserve_port_blocks(): failed to reserve ports; giving up: {:?}", e);
166+
let _ = unsafe { closesocket(s) };
167+
return Err(e);
168+
} else {
169+
trace!(
170+
"reserve_port_blocks(): failed to reserve ports; halving reservation size: {:?}",
171+
e
172+
);
173+
reservation_len /= 2;
174+
}
175+
},
176+
}
177+
}
178+
179+
Ok((s, ports))
180+
}
181+
182+
fn reserve_ports(port_count: u16, s: SOCKET) -> Result<(u16, u16, u64), Fail> {
183+
let port_range: INET_PORT_RANGE = INET_PORT_RANGE {
184+
StartPort: 0,
185+
NumberOfPorts: port_count,
186+
};
187+
188+
let mut reservation: INET_PORT_RESERVATION_INSTANCE = INET_PORT_RESERVATION_INSTANCE::default();
189+
let mut bytes_out: u32 = 0;
190+
191+
let result: i32 = unsafe {
192+
WSAIoctl(
193+
s,
194+
SIO_ACQUIRE_PORT_RESERVATION,
195+
Some(&port_range as *const INET_PORT_RANGE as *mut libc::c_void),
196+
std::mem::size_of::<INET_PORT_RANGE>() as u32,
197+
Some(&mut reservation as *mut INET_PORT_RESERVATION_INSTANCE as *mut libc::c_void),
198+
std::mem::size_of::<INET_PORT_RESERVATION_INSTANCE>() as u32,
199+
&mut bytes_out,
200+
None,
201+
None,
202+
)
203+
};
204+
205+
if result != 0 {
206+
return Err(expect_last_wsa_error());
207+
}
208+
209+
Ok((
210+
u16::from_be(reservation.Reservation.StartPort),
211+
reservation.Reservation.NumberOfPorts,
212+
reservation.Token.Token,
213+
))
214+
}
215+
216+
impl Drop for PortSharingState {
217+
fn drop(&mut self) {
218+
if self.reserved_socket != INVALID_SOCKET {
219+
let _ = unsafe { closesocket(self.reserved_socket) };
220+
let _ = unsafe { WSACleanup() };
221+
}
222+
}
223+
}

src/rust/catpowder/win/interface.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
//======================================================================================================================
5+
// Imports
6+
//======================================================================================================================
7+
8+
use std::{num::NonZeroU32, rc::Rc};
9+
10+
use crate::{
11+
catpowder::win::{
12+
api::XdpApi,
13+
ring::{RuleSet, RxRing, TxRing},
14+
socket::XdpSocket,
15+
},
16+
demikernel::config::Config,
17+
runtime::fail::Fail,
18+
};
19+
20+
//======================================================================================================================
21+
// Structures
22+
//======================================================================================================================
23+
24+
/// State for the XDP interface.
25+
pub struct Interface {
26+
/// Currently only one TX is created for all sends on the interface.
27+
pub tx_ring: TxRing,
28+
/// RX rings for the interface, one per RSS queue.
29+
pub rx_rings: Vec<RxRing>,
30+
/// Sockets for the interface, one for each *Ring member above, with a description of the socket.
31+
pub sockets: Vec<(String, XdpSocket)>,
32+
}
33+
34+
//======================================================================================================================
35+
// Implementations
36+
//======================================================================================================================
37+
38+
impl Interface {
39+
/// Creates a new interface for the given configuration. The interface creates [queue_count] RX
40+
/// rings.
41+
pub fn new(
42+
api: &mut XdpApi,
43+
ifindex: u32,
44+
queue_count: NonZeroU32,
45+
ruleset: Rc<RuleSet>,
46+
config: &Config,
47+
) -> Result<Self, Fail> {
48+
let (tx_buffer_count, tx_ring_size) = config.tx_buffer_config()?;
49+
let (rx_buffer_count, rx_ring_size) = config.rx_buffer_config()?;
50+
let mtu: u16 = config.mtu()?;
51+
52+
let (tx_ring_size, tx_buffer_count): (NonZeroU32, NonZeroU32) =
53+
validate_ring_config(tx_ring_size, tx_buffer_count, "tx")?;
54+
let (rx_ring_size, rx_buffer_count): (NonZeroU32, NonZeroU32) =
55+
validate_ring_config(rx_ring_size, rx_buffer_count, "rx")?;
56+
57+
let always_poke: bool = config.xdp_always_poke_tx()?;
58+
59+
let mut rx_rings: Vec<RxRing> = Vec::with_capacity(queue_count.get() as usize);
60+
let mut sockets: Vec<(String, XdpSocket)> = Vec::new();
61+
62+
let tx_ring: TxRing = TxRing::new(
63+
api,
64+
tx_ring_size.get(),
65+
tx_buffer_count.get(),
66+
mtu,
67+
ifindex,
68+
0,
69+
always_poke,
70+
)?;
71+
sockets.push((format!("if {} tx 0", ifindex), tx_ring.socket().clone()));
72+
73+
for queueid in 0..queue_count.get() {
74+
let mut ring: RxRing = RxRing::new(
75+
api,
76+
rx_ring_size.get(),
77+
rx_buffer_count.get(),
78+
mtu,
79+
ifindex,
80+
queueid,
81+
ruleset.clone(),
82+
)?;
83+
ring.provide_buffers();
84+
sockets.push((format!("if {} rx {}", ifindex, queueid), ring.socket().clone()));
85+
rx_rings.push(ring);
86+
}
87+
88+
trace!("Created {} RX rings on interface {}", rx_rings.len(), ifindex);
89+
90+
Ok(Self {
91+
tx_ring,
92+
rx_rings,
93+
sockets,
94+
})
95+
}
96+
97+
pub fn return_tx_buffers(&mut self) {
98+
self.tx_ring.return_buffers();
99+
}
100+
101+
pub fn provide_rx_buffers(&mut self) {
102+
for ring in self.rx_rings.iter_mut() {
103+
ring.provide_buffers();
104+
}
105+
}
106+
}
107+
108+
//======================================================================================================================
109+
// Functions
110+
//======================================================================================================================
111+
112+
/// Validates the ring size and buffer count for the given configuration.
113+
fn validate_ring_config(ring_size: u32, buf_count: u32, config: &str) -> Result<(NonZeroU32, NonZeroU32), Fail> {
114+
let ring_size: NonZeroU32 = NonZeroU32::try_from(ring_size)
115+
.map_err(Fail::from)
116+
.and_then(|v: NonZeroU32| {
117+
if !v.is_power_of_two() {
118+
let cause: String = format!("{}_ring_size must be a power of two: {}", config, v.get());
119+
Err(Fail::new(libc::EINVAL, &cause))
120+
} else {
121+
Ok(v)
122+
}
123+
})?;
124+
125+
let buf_count: NonZeroU32 = if buf_count < ring_size.get() {
126+
let cause: String = format!(
127+
"{}_buffer_count must be greater than or equal to {}_ring_size",
128+
config, config
129+
);
130+
return Err(Fail::new(libc::EINVAL, &cause));
131+
} else {
132+
// Safety: since buffer_count >= ring_size, we can safely create a NonZeroU32.
133+
unsafe { NonZeroU32::new_unchecked(buf_count) }
134+
};
135+
136+
Ok((ring_size, buf_count))
137+
}

src/rust/catpowder/win/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
//======================================================================================================================
77

88
mod api;
9+
mod cohosting;
10+
mod interface;
11+
mod observability;
912
mod ring;
13+
mod rss;
1014
mod socket;
1115

1216
//======================================================================================================================

0 commit comments

Comments
 (0)