Skip to content

Fixing net_mana handling of Gdma Errors #1154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4292,6 +4292,7 @@ dependencies = [
"net_backend_resources",
"pal_async",
"parking_lot",
"thiserror 2.0.12",
"tracing",
"vm_resource",
"vm_topology",
Expand Down
1 change: 1 addition & 0 deletions vm/devices/net/gdma_defs/src/bnic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ pub const CQE_TX_VF_DISABLED: u8 = 38;
pub const CQE_TX_VPORT_IDX_OUT_OF_RANGE: u8 = 39;
pub const CQE_TX_VPORT_DISABLED: u8 = 40;
pub const CQE_TX_VLAN_TAGGING_VIOLATION: u8 = 41;
pub const CQE_TX_GDMA_ERR: u8 = 42;

pub const MANA_CQE_COMPLETION: u8 = 1;

Expand Down
1 change: 1 addition & 0 deletions vm/devices/net/net_backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ futures.workspace = true
futures-concurrency.workspace = true
parking_lot.workspace = true
tracing.workspace = true
thiserror.workspace = true

[lints]
workspace = true
11 changes: 10 additions & 1 deletion vm/devices/net/net_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::future::pending;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use thiserror::Error;

/// Per-queue configuration.
pub struct QueueConfig<'a> {
Expand Down Expand Up @@ -135,6 +136,14 @@ pub struct RssConfig<'a> {
pub flags: u32, // TODO
}

#[derive(Error, Debug)]
pub enum TxError {
#[error("error requiring queue restart. {0}")]
TryRestart(#[source] anyhow::Error),
#[error("unrecoverable error. {0}")]
Fatal(#[source] anyhow::Error),
}

/// A trait for sending and receiving network packets.
#[async_trait]
pub trait Queue: Send + InspectMut {
Expand All @@ -158,7 +167,7 @@ pub trait Queue: Send + InspectMut {
fn tx_avail(&mut self, segments: &[TxSegment]) -> anyhow::Result<(bool, usize)>;

/// Polls the device for transmit completions.
fn tx_poll(&mut self, done: &mut [TxId]) -> anyhow::Result<usize>;
fn tx_poll(&mut self, done: &mut [TxId]) -> Result<usize, TxError>;

/// Get the buffer access.
fn buffer_access(&mut self) -> Option<&mut dyn BufferAccess>;
Expand Down
3 changes: 2 additions & 1 deletion vm/devices/net/net_backend/src/loopback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::QueueConfig;
use crate::RssConfig;
use crate::RxId;
use crate::RxMetadata;
use crate::TxError;
use crate::TxId;
use crate::TxSegment;
use crate::linearize;
Expand Down Expand Up @@ -125,7 +126,7 @@ impl Queue for LoopbackQueue {
Ok((true, sent))
}

fn tx_poll(&mut self, _done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, _done: &mut [TxId]) -> Result<usize, TxError> {
Ok(0)
}

Expand Down
3 changes: 2 additions & 1 deletion vm/devices/net/net_backend/src/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::Queue;
use crate::QueueConfig;
use crate::RssConfig;
use crate::RxId;
use crate::TxError;
use crate::TxId;
use crate::TxOffloadSupport;
use crate::TxSegment;
Expand Down Expand Up @@ -114,7 +115,7 @@ impl Queue for NullQueue {
Ok((true, packets.len()))
}

fn tx_poll(&mut self, _done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, _done: &mut [TxId]) -> Result<usize, TxError> {
Ok(0)
}

Expand Down
3 changes: 2 additions & 1 deletion vm/devices/net/net_consomme/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use net_backend::RssConfig;
use net_backend::RxChecksumState;
use net_backend::RxId;
use net_backend::RxMetadata;
use net_backend::TxError;
use net_backend::TxId;
use net_backend::TxOffloadSupport;
use net_backend::TxSegment;
Expand Down Expand Up @@ -231,7 +232,7 @@ impl net_backend::Queue for ConsommeQueue {
Ok((false, segments.len()))
}

fn tx_poll(&mut self, done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, done: &mut [TxId]) -> Result<usize, TxError> {
let n = done.len().min(self.state.tx_ready.len());
for (x, y) in done.iter_mut().zip(self.state.tx_ready.drain(..n)) {
*x = y;
Expand Down
3 changes: 2 additions & 1 deletion vm/devices/net/net_dio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use net_backend::QueueConfig;
use net_backend::RssConfig;
use net_backend::RxId;
use net_backend::RxMetadata;
use net_backend::TxError;
use net_backend::TxId;
use net_backend::TxSegment;
use net_backend::next_packet;
Expand Down Expand Up @@ -191,7 +192,7 @@ impl Queue for DioQueue {
Ok((true, n))
}

fn tx_poll(&mut self, _done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, _done: &mut [TxId]) -> Result<usize, TxError> {
Ok(0)
}

Expand Down
45 changes: 44 additions & 1 deletion vm/devices/net/net_mana/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use gdma_defs::Cqe;
use gdma_defs::GDMA_EQE_COMPLETION;
use gdma_defs::Sge;
use gdma_defs::bnic::CQE_RX_OKAY;
use gdma_defs::bnic::CQE_TX_GDMA_ERR;
use gdma_defs::bnic::CQE_TX_OKAY;
use gdma_defs::bnic::MANA_LONG_PKT_FMT;
use gdma_defs::bnic::MANA_SHORT_PKT_FMT;
Expand Down Expand Up @@ -43,6 +44,7 @@ use net_backend::RssConfig;
use net_backend::RxChecksumState;
use net_backend::RxId;
use net_backend::RxMetadata;
use net_backend::TxError;
use net_backend::TxId;
use net_backend::TxOffloadSupport;
use net_backend::TxSegment;
Expand Down Expand Up @@ -607,6 +609,7 @@ struct QueueStats {
tx_packets: u64,
tx_errors: u64,
tx_dropped: u64,
tx_stuck: u64,

rx_events: u64,
rx_packets: u64,
Expand All @@ -622,6 +625,7 @@ impl Inspect for QueueStats {
.counter("tx_packets", self.tx_packets)
.counter("tx_errors", self.tx_errors)
.counter("tx_dropped", self.tx_dropped)
.counter("tx_stuck", self.tx_stuck)
.counter("rx_events", self.rx_events)
.counter("rx_packets", self.rx_packets)
.counter("rx_errors", self.rx_errors)
Expand Down Expand Up @@ -718,6 +722,32 @@ impl<T: DeviceBacking> ManaQueue<T> {
false
}
}

fn trace_tx_wqe(&mut self, tx_oob: ManaTxCompOob, done_length: usize) {
tracelimit::error_ratelimited!(
cqe_hdr_type = tx_oob.cqe_hdr.cqe_type(),
cqe_hdr_vendor_err = tx_oob.cqe_hdr.vendor_err(),
tx_oob_data_offset = tx_oob.tx_data_offset,
tx_oob_sgl_offset = tx_oob.offsets.tx_sgl_offset(),
tx_oob_wqe_offset = tx_oob.offsets.tx_wqe_offset(),
done_length,
posted_tx_len = self.posted_tx.len(),
"tx completion error"
);

// TODO: Use tx_wqe_offset to read the Wqe.
// Use Wqe.ClientOob to read the ManaTxOob.s_oob.
// Log properties of s_oob like checksum, etc.

if let Some(packet) = self.posted_tx.front() {
tracelimit::error_ratelimited!(
id = packet.id.0,
wqe_len = packet.wqe_len,
bounced_len_with_padding = packet.bounced_len_with_padding,
"posted tx"
);
}
}
}

#[async_trait]
Expand Down Expand Up @@ -901,20 +931,33 @@ impl<T: DeviceBacking + Send> Queue for ManaQueue<T> {
Ok((false, i))
}

fn tx_poll(&mut self, done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, done: &mut [TxId]) -> Result<usize, TxError> {
let mut i = 0;
let mut queue_stuck = false;
while i < done.len() {
let id = if let Some(cqe) = self.tx_cq.pop() {
let tx_oob = ManaTxCompOob::read_from_prefix(&cqe.data[..]).unwrap().0; // TODO: zerocopy: use-rest-of-range (https://github.com/microsoft/openvmm/issues/759)
match tx_oob.cqe_hdr.cqe_type() {
CQE_TX_OKAY => {
self.stats.tx_packets += 1;
}
CQE_TX_GDMA_ERR => {
queue_stuck = true;
}
ty => {
tracelimit::error_ratelimited!(ty, "tx completion error");
self.stats.tx_errors += 1;
}
}
if queue_stuck {
// Hardware hit an error with the packet coming from the Guest.
// CQE_TX_GDMA_ERR is how the Hardware indicates that it has disabled the queue.
self.stats.tx_errors += 1;
self.stats.tx_stuck += 1;
self.trace_tx_wqe(tx_oob, done.len());
// Return a TryRestart error to indicate that the queue needs to be restarted.
return Err(TxError::TryRestart(anyhow::anyhow!("GDMA error")));
}
let packet = self.posted_tx.pop_front().unwrap();
self.tx_wq.advance_head(packet.wqe_len);
if packet.bounced_len_with_padding > 0 {
Expand Down
3 changes: 2 additions & 1 deletion vm/devices/net/net_packet_capture/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use net_backend::Queue;
use net_backend::QueueConfig;
use net_backend::RssConfig;
use net_backend::RxId;
use net_backend::TxError;
use net_backend::TxId;
use net_backend::TxOffloadSupport;
use net_backend::TxSegment;
Expand Down Expand Up @@ -543,7 +544,7 @@ impl Queue for PacketCaptureQueue {
self.current_mut().tx_avail(segments)
}

fn tx_poll(&mut self, done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, done: &mut [TxId]) -> Result<usize, TxError> {
self.current_mut().tx_poll(done)
}

Expand Down
3 changes: 2 additions & 1 deletion vm/devices/net/net_tap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use net_backend::QueueConfig;
use net_backend::RssConfig;
use net_backend::RxId;
use net_backend::RxMetadata;
use net_backend::TxError;
use net_backend::TxId;
use net_backend::TxSegment;
use net_backend::linearize;
Expand Down Expand Up @@ -234,7 +235,7 @@ impl Queue for TapQueue {
Ok((completed_synchronously, n))
}

fn tx_poll(&mut self, _done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, _done: &mut [TxId]) -> Result<usize, TxError> {
// Packets are sent synchronously so there is no no need to check here if
// sending has been completed.
Ok(0)
Expand Down
82 changes: 65 additions & 17 deletions vm/devices/net/netvsp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use net_backend::EndpointAction;
use net_backend::L3Protocol;
use net_backend::QueueConfig;
use net_backend::RxId;
use net_backend::TxError;
use net_backend::TxId;
use net_backend::TxSegment;
use net_backend_resources::mac_address::MacAddress;
Expand Down Expand Up @@ -1850,6 +1851,8 @@ enum WorkerError {
Cancelled(task_control::Cancelled),
#[error("tearing down because send/receive buffer is revoked")]
BufferRevoked,
#[error("endpoint requires queue restart: {0}")]
EndpointRequiresQueueRestart(#[source] anyhow::Error),
}

impl From<task_control::Cancelled> for WorkerError {
Expand Down Expand Up @@ -4367,10 +4370,30 @@ impl<T: RingMem + 'static> Worker<T> {
stop.until_stopped(pending()).await?
};

let restart = self.channel.main_loop(stop, state, queue_state).await?;
let result = self.channel.main_loop(stop, state, queue_state).await;
match result {
Ok(restart) => {
assert_eq!(self.channel_idx, 0);
let _ = self.coordinator_send.try_send(restart);
}
Err(WorkerError::EndpointRequiresQueueRestart(err)) => {
tracelimit::warn_ratelimited!(
err = %err,
"Endpoint requires queues to restart",
);
if let Err(try_send_err) =
self.coordinator_send.try_send(CoordinatorMessage::Restart)
{
tracing::error!(
try_send_err = %try_send_err,
"failed to restart queues"
);
return Err(WorkerError::Endpoint(err));
}
}
Err(err) => return Err(err),
}

assert_eq!(self.channel_idx, 0);
let _ = self.coordinator_send.try_send(restart);
stop.until_stopped(pending()).await?
}
}
Expand Down Expand Up @@ -4905,23 +4928,48 @@ impl<T: 'static + RingMem> NetChannel<T> {
epqueue: &mut dyn net_backend::Queue,
) -> Result<bool, WorkerError> {
// Drain completed transmits.
let n = epqueue
.tx_poll(&mut data.tx_done)
.map_err(WorkerError::Endpoint)?;
if n == 0 {
return Ok(false);
}
let result = epqueue.tx_poll(&mut data.tx_done);

match result {
Ok(n) => {
if n == 0 {
return Ok(false);
}

for &id in &data.tx_done[..n] {
let tx_packet = &mut state.pending_tx_packets[id.0 as usize];
assert!(tx_packet.pending_packet_count > 0);
tx_packet.pending_packet_count -= 1;
if tx_packet.pending_packet_count == 0 {
self.complete_tx_packet(state, id)?;
for &id in &data.tx_done[..n] {
let tx_packet = &mut state.pending_tx_packets[id.0 as usize];
assert!(tx_packet.pending_packet_count > 0);
tx_packet.pending_packet_count -= 1;
if tx_packet.pending_packet_count == 0 {
self.complete_tx_packet(state, id)?;
}
}

Ok(true)
}
Err(TxError::TryRestart(err)) => {
// Complete any pending tx prior to restarting queues.
let pending_tx = state
.pending_tx_packets
.iter_mut()
.enumerate()
.filter_map(|(id, inflight)| {
if inflight.pending_packet_count > 0 {
inflight.pending_packet_count = 0;
Some(PendingTxCompletion {
transaction_id: inflight.transaction_id,
tx_id: Some(TxId(id as u32)),
})
} else {
None
}
})
.collect::<Vec<_>>();
state.pending_tx_completions.extend(pending_tx);
Err(WorkerError::EndpointRequiresQueueRestart(err))
}
Err(TxError::Fatal(err)) => Err(WorkerError::Endpoint(err)),
}

Ok(true)
}

fn switch_data_path(
Expand Down
3 changes: 2 additions & 1 deletion vm/devices/net/netvsp/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use net_backend::EndpointAction;
use net_backend::MultiQueueSupport;
use net_backend::Queue as NetQueue;
use net_backend::QueueConfig;
use net_backend::TxError;
use net_backend::TxOffloadSupport;
use net_backend::null::NullEndpoint;
use pal_async::DefaultDriver;
Expand Down Expand Up @@ -380,7 +381,7 @@ impl NetQueue for TestNicQueue {
Ok((true, packets.len()))
}

fn tx_poll(&mut self, _done: &mut [TxId]) -> anyhow::Result<usize> {
fn tx_poll(&mut self, _done: &mut [TxId]) -> Result<usize, TxError> {
Ok(0)
}

Expand Down
2 changes: 1 addition & 1 deletion vm/devices/virtio/virtio_net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ impl Worker {
// Drain completed transmits.
let n = epqueue
.tx_poll(&mut self.active_state.data.tx_done)
.map_err(WorkerError::Endpoint)?;
.map_err(|tx_error| WorkerError::Endpoint(tx_error.into()))?;
if n == 0 {
return Ok(false);
}
Expand Down