Skip to content

Commit 5220656

Browse files
committed
devices/net: Rewrite using new RxQueueProducer/TxQueueConsumer
Rewrite the all of the backend (unixstream, unixgram, tap) in terms of the new RxQueueProducer/TxQueueConsumer abstractions. Signed-off-by: Matej Hrica <mhrica@redhat.com>
1 parent 8aca2a6 commit 5220656

8 files changed

Lines changed: 784 additions & 666 deletions

File tree

src/devices/src/virtio/net/backend.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,39 @@ pub enum ConnectError {
1818
#[allow(dead_code)]
1919
#[derive(Debug)]
2020
pub enum ReadError {
21-
/// Nothing was written
22-
NothingRead,
23-
/// Another internal error occurred
21+
/// Backend process not running (EPIPE)
22+
ProcessNotRunning,
23+
/// Internal I/O error
2424
Internal(nix::Error),
2525
}
2626

2727
#[allow(dead_code)]
2828
#[derive(Debug)]
2929
pub enum WriteError {
30-
/// Nothing was written, you can drop the frame or try to resend it later
31-
NothingWritten,
32-
/// Part of the buffer was written, the write has to be finished using try_finish_write
33-
PartialWrite,
34-
/// Passt doesnt seem to be running (received EPIPE)
30+
/// Backend process not running (EPIPE)
3531
ProcessNotRunning,
36-
/// Another internal error occurred
32+
/// Internal I/O error
3733
Internal(nix::Error),
3834
}
3935

36+
/// Network backend trait.
37+
///
38+
/// Backends own both the socket and the queue consumers. The send/recv methods
39+
/// operate on internal queues. EAGAIN is not an error - it just means nothing
40+
/// happened this call.
4041
pub trait NetBackend {
41-
fn read_frame(&mut self, buf: &mut [u8]) -> Result<usize, ReadError>;
42-
fn write_frame(&mut self, hdr_len: usize, buf: &mut [u8]) -> Result<(), WriteError>;
43-
fn has_unfinished_write(&self) -> bool;
44-
fn try_finish_write(&mut self, hdr_len: usize, buf: &[u8]) -> Result<(), WriteError>;
42+
/// Send pending frames from the TX queue to the network.
43+
///
44+
/// Pulls frames from internal TxQueueConsumer and sends using batched I/O.
45+
/// EAGAIN returns Ok(()) - pending frames kept for retry.
46+
fn send(&mut self) -> Result<(), WriteError>;
47+
48+
/// Receive frames from the network into the RX queue.
49+
///
50+
/// Reads from socket into internal RxQueueProvider.
51+
/// EAGAIN returns Ok(()).
52+
fn recv(&mut self) -> Result<(), ReadError>;
53+
54+
/// Returns the raw socket fd for epoll registration.
4555
fn raw_socket_fd(&self) -> RawFd;
4656
}

src/devices/src/virtio/net/device.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,17 @@ use std::path::PathBuf;
2222
use utils::eventfd::{EventFd, EFD_NONBLOCK};
2323
use virtio_bindings::virtio_net::VIRTIO_NET_F_MAC;
2424
use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
25-
use vm_memory::{ByteValued, GuestMemoryError, GuestMemoryMmap};
25+
use vm_memory::{ByteValued, GuestMemoryMmap};
2626

2727
const VIRTIO_F_VERSION_1: u32 = 32;
2828

29-
#[derive(Debug)]
30-
pub enum FrontendError {
31-
DescriptorChainTooSmall,
32-
EmptyQueue,
33-
GuestMemory(GuestMemoryError),
34-
QueueError(QueueError),
35-
ReadOnlyDescriptor,
36-
}
29+
// FrontendError removed - no longer used with vectored I/O
3730

3831
#[derive(Debug)]
3932
pub enum RxError {
4033
Backend(ReadError),
4134
DeviceError(DeviceError),
35+
QueueError(QueueError),
4236
}
4337

4438
#[derive(Debug)]
@@ -96,7 +90,7 @@ impl Net {
9690
) -> Result<Self> {
9791
let avail_features = features as u64
9892
| (1 << VIRTIO_NET_F_MAC)
99-
| (1 << VIRTIO_RING_F_EVENT_IDX)
93+
// | (1 << VIRTIO_RING_F_EVENT_IDX) // TODO: re-enable after debugging
10094
| (1 << VIRTIO_F_VERSION_1);
10195

10296
let mut queue_evts = Vec::new();

src/devices/src/virtio/net/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub const TX_INDEX: usize = 1;
2020

2121
mod backend;
2222
pub mod device;
23+
#[cfg(target_os = "macos")]
24+
mod socket_x;
2325
#[cfg(target_os = "linux")]
2426
mod tap;
2527
mod unixgram;
@@ -30,13 +32,10 @@ fn vnet_hdr_len() -> usize {
3032
mem::size_of::<virtio_net_hdr_v1>()
3133
}
3234

33-
// This initializes to all 0 the virtio_net_hdr part of a buf and return the length of the header
34-
// https://docs.oasis-open.org/virtio/virtio/v1.1/csprd01/virtio-v1.1-csprd01.html#x1-2050006
35-
fn write_virtio_net_hdr(buf: &mut [u8]) -> usize {
36-
let len = vnet_hdr_len();
37-
buf[0..len].fill(0);
38-
len
39-
}
35+
/// Default zeroed virtio_net_hdr_v1 (12 bytes) - used as prefix when receiving from backends
36+
/// that don't include vnet headers (e.g., passt/unixstream)
37+
/// https://docs.oasis-open.org/virtio/virtio/v1.1/csprd01/virtio-v1.1-csprd01.html#x1-2050006
38+
static DEFAULT_VNET_HDR: [u8; 12] = [0u8; 12];
4039

4140
pub use self::device::Net;
4241
#[derive(Debug)]
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// macOS-specific batch message syscalls (sendmsg_x/recvmsg_x)
2+
//
3+
// These are private Apple APIs that allow sending/receiving multiple messages
4+
// in a single syscall, similar to Linux's sendmmsg/recvmmsg.
5+
//
6+
// Reference: https://github.com/nirs/vmnet-helper/blob/main/socket_x.h
7+
8+
#![allow(dead_code)]
9+
#![allow(non_camel_case_types)]
10+
11+
#[cfg(target_os = "macos")]
12+
pub mod macos {
13+
use libc::{c_int, c_uint, c_void, iovec, socklen_t};
14+
15+
/// Extended message header for batch operations.
16+
/// Similar to msghdr but includes msg_datalen for output.
17+
#[repr(C)]
18+
pub struct msghdr_x {
19+
pub msg_name: *mut c_void,
20+
pub msg_namelen: socklen_t,
21+
pub msg_iov: *mut iovec,
22+
pub msg_iovlen: c_int,
23+
pub msg_control: *mut c_void,
24+
pub msg_controllen: socklen_t,
25+
pub msg_flags: c_int,
26+
pub msg_datalen: usize, // out: bytes transferred for this message
27+
}
28+
29+
impl Default for msghdr_x {
30+
fn default() -> Self {
31+
Self {
32+
msg_name: std::ptr::null_mut(),
33+
msg_namelen: 0,
34+
msg_iov: std::ptr::null_mut(),
35+
msg_iovlen: 0,
36+
msg_control: std::ptr::null_mut(),
37+
msg_controllen: 0,
38+
msg_flags: 0,
39+
msg_datalen: 0,
40+
}
41+
}
42+
}
43+
44+
extern "C" {
45+
/// Send multiple datagrams in a single syscall.
46+
///
47+
/// # Arguments
48+
/// * `s` - Socket file descriptor
49+
/// * `msgp` - Pointer to array of msghdr_x structures
50+
/// * `cnt` - Number of messages to send
51+
/// * `flags` - Only MSG_DONTWAIT is supported
52+
///
53+
/// # Constraints
54+
/// For each msghdr_x: msg_name, msg_namelen, msg_control, msg_controllen,
55+
/// msg_flags, and msg_datalen must all be zero on input.
56+
///
57+
/// # Returns
58+
/// Number of datagrams sent, or -1 on error.
59+
/// Each msghdr_x.msg_datalen is set to bytes sent for that message.
60+
pub fn sendmsg_x(s: c_int, msgp: *const msghdr_x, cnt: c_uint, flags: c_int) -> isize;
61+
62+
/// Receive multiple datagrams in a single syscall.
63+
///
64+
/// # Arguments
65+
/// * `s` - Socket file descriptor
66+
/// * `msgp` - Pointer to array of msghdr_x structures
67+
/// * `cnt` - Maximum number of messages to receive
68+
/// * `flags` - Only MSG_DONTWAIT is supported
69+
///
70+
/// # Constraints
71+
/// For each msghdr_x: msg_flags must be zero on input.
72+
///
73+
/// # Returns
74+
/// Number of datagrams received (may be less than cnt), or -1 on error.
75+
/// Each msghdr_x.msg_datalen is set to bytes received for that message.
76+
pub fn recvmsg_x(s: c_int, msgp: *mut msghdr_x, cnt: c_uint, flags: c_int) -> isize;
77+
}
78+
}
79+
80+
#[cfg(target_os = "macos")]
81+
pub use macos::*;

src/devices/src/virtio/net/tap.rs

Lines changed: 72 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,46 @@ use libc::{
22
c_char, c_int, ifreq, IFF_NO_PI, IFF_TAP, IFF_VNET_HDR, TUN_F_CSUM, TUN_F_TSO4, TUN_F_TSO6,
33
TUN_F_UFO,
44
};
5-
use nix::fcntl::{fcntl, open, FcntlArg, OFlag};
5+
use nix::fcntl::{open, OFlag};
66
use nix::sys::stat::Mode;
7-
use nix::unistd::{read, write};
7+
use nix::sys::uio::{readv, writev};
88
use nix::{ioctl_write_int, ioctl_write_ptr};
9-
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
9+
use std::os::fd::{AsFd, AsRawFd, OwnedFd, RawFd};
1010
use std::{io, mem, ptr};
11+
use utils::fd::SetNonblockingExt;
1112
use virtio_bindings::virtio_net::{
1213
VIRTIO_NET_F_GUEST_CSUM, VIRTIO_NET_F_GUEST_TSO4, VIRTIO_NET_F_GUEST_TSO6,
1314
VIRTIO_NET_F_GUEST_UFO,
1415
};
16+
use vm_memory::GuestMemoryMmap;
1517

1618
use super::backend::{ConnectError, NetBackend, ReadError, WriteError};
17-
use super::{write_virtio_net_hdr, FRAME_HEADER_LEN};
19+
use crate::virtio::queue::Queue;
20+
use crate::virtio::rx_queue_producer::RxQueueProducer;
21+
use crate::virtio::tx_queue_consumer::{Consumed, TxQueueConsumer};
22+
use crate::virtio::InterruptTransport;
1823

1924
ioctl_write_ptr!(tunsetiff, b'T', 202, c_int);
2025
ioctl_write_int!(tunsetoffload, b'T', 208);
2126
ioctl_write_ptr!(tunsetvnethdrsz, b'T', 216, c_int);
2227

28+
const MAX_BATCH: usize = 256;
29+
2330
pub struct Tap {
2431
fd: OwnedFd,
25-
include_vnet_header: bool,
32+
tx_consumer: TxQueueConsumer,
33+
rx_producer: RxQueueProducer,
2634
}
2735

2836
impl Tap {
2937
/// Create an endpoint using the file descriptor of a tap device
3038
pub fn new(
3139
tap_name: String,
3240
vnet_features: u64,
33-
include_vnet_header: bool,
41+
tx_queue: Queue,
42+
rx_queue: Queue,
43+
mem: GuestMemoryMmap,
44+
interrupt: InterruptTransport,
3445
) -> Result<Self, ConnectError> {
3546
let fd = match open("/dev/net/tun", OFlag::O_RDWR, Mode::empty()) {
3647
Ok(fd) => fd,
@@ -47,10 +58,9 @@ impl Tap {
4758
);
4859
}
4960

50-
req.ifr_ifru.ifru_flags = IFF_TAP as i16 | IFF_NO_PI as i16;
51-
if include_vnet_header {
52-
req.ifr_ifru.ifru_flags |= IFF_VNET_HDR as i16;
53-
}
61+
req.ifr_ifru.ifru_flags = IFF_TAP as i16 | IFF_NO_PI as i16 | IFF_VNET_HDR as i16;
62+
63+
log::info!("Tap::new() fd={} tap={}", fd.as_raw_fd(), tap_name);
5464

5565
let mut offload_flags: u64 = 0;
5666
if (vnet_features & (1 << VIRTIO_NET_F_GUEST_CSUM)) != 0 {
@@ -71,7 +81,7 @@ impl Tap {
7181
return Err(ConnectError::TunSetIff(io::Error::from(err)));
7282
}
7383

74-
// TODO(slp): replace hardcoded vnet size with cons
84+
// TODO(slp): replace hardcoded vnet size with const
7585
if let Err(err) = tunsetvnethdrsz(fd.as_raw_fd(), &12) {
7686
return Err(ConnectError::TunSetVnetHdrSz(io::Error::from(err)));
7787
}
@@ -81,67 +91,71 @@ impl Tap {
8191
}
8292
}
8393

84-
match fcntl(&fd, FcntlArg::F_GETFL) {
85-
Ok(flags) => {
86-
if let Err(e) = fcntl(
87-
&fd,
88-
FcntlArg::F_SETFL(OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK),
89-
) {
90-
warn!("error switching to non-blocking: id={fd:?}, err={e}");
91-
}
92-
}
93-
Err(e) => error!("couldn't obtain fd flags id={fd:?}, err={e}"),
94-
};
94+
if let Err(e) = fd.set_nonblocking(true) {
95+
log::warn!("Failed to set O_NONBLOCK on tap: {e}");
96+
}
97+
98+
let tx_consumer = TxQueueConsumer::new(tx_queue, mem.clone(), interrupt.clone());
99+
let rx_provider = RxQueueProducer::new(rx_queue, mem, interrupt);
95100

96101
Ok(Self {
97102
fd,
98-
include_vnet_header,
103+
tx_consumer,
104+
rx_producer: rx_provider,
99105
})
100106
}
101107
}
102108

103109
impl NetBackend for Tap {
104-
/// Try to read a frame from the tap devie. If no bytes are available reports
105-
/// ReadError::NothingRead.
106-
fn read_frame(&mut self, buf: &mut [u8]) -> Result<usize, ReadError> {
107-
let buf_offset = if !self.include_vnet_header {
108-
write_virtio_net_hdr(buf)
109-
} else {
110-
0
111-
};
112-
113-
let frame_length = match read(&self.fd, &mut buf[buf_offset..]) {
114-
Ok(f) => f,
115-
#[allow(unreachable_patterns)]
116-
Err(nix::Error::EAGAIN | nix::Error::EWOULDBLOCK) => {
117-
return Err(ReadError::NothingRead)
118-
}
119-
Err(e) => {
120-
return Err(ReadError::Internal(e));
110+
fn send(&mut self) -> Result<(), WriteError> {
111+
let fd = self.fd.as_fd();
112+
113+
self.tx_consumer.feed(MAX_BATCH);
114+
115+
// Each descriptor chain is one packet. TAP's writev combines iovecs into
116+
// a single packet (scatter-gather), so we can use it directly without
117+
// flattening. One writev call per packet.
118+
let _ = self.tx_consumer.consume(|frames| {
119+
let mut total = 0usize;
120+
for frame in frames.iter() {
121+
if frame.is_empty() {
122+
continue;
123+
}
124+
match writev(fd, frame) {
125+
Ok(n) => total += n,
126+
Err(nix::errno::Errno::EAGAIN) => break,
127+
Err(nix::errno::Errno::EPIPE) => return Err(WriteError::ProcessNotRunning),
128+
Err(e) => {
129+
log::error!("Tap TX failed: {:?}", e);
130+
return Err(WriteError::Internal(e));
131+
}
132+
}
121133
}
122-
};
123-
debug!("Read eth frame from tap: {frame_length} bytes");
124-
Ok(buf_offset + frame_length)
125-
}
134+
Ok(Consumed::Bytes(total))
135+
})?;
126136

127-
/// Try to write a frame to the tap device.
128-
fn write_frame(&mut self, hdr_len: usize, buf: &mut [u8]) -> Result<(), WriteError> {
129-
let buf_offset = if !self.include_vnet_header {
130-
hdr_len
131-
} else {
132-
FRAME_HEADER_LEN
133-
};
134-
let ret = write(&self.fd, buf[buf_offset..]).map_err(WriteError::Internal)?;
135-
debug!("Written frame size={}, written={}", buf.len(), ret);
136137
Ok(())
137138
}
138139

139-
fn has_unfinished_write(&self) -> bool {
140-
false
141-
}
140+
fn recv(&mut self) -> Result<(), ReadError> {
141+
let fd = self.fd.as_fd();
142+
143+
self.rx_producer.feed(MAX_BATCH);
144+
145+
self.rx_producer.produce(|chains, completer| {
146+
for (i, chain) in chains.iter_mut().enumerate() {
147+
if chain.is_empty() {
148+
warn!("Chain {i} was empty");
149+
break;
150+
}
151+
152+
match readv(fd, chain) {
153+
Ok(n) => completer.complete(chain, i, n),
154+
Err(_) => break, // EAGAIN or error, stop receiving
155+
}
156+
}
157+
});
142158

143-
fn try_finish_write(&mut self, _hdr_len: usize, _buf: &[u8]) -> Result<(), WriteError> {
144-
// The tap backend doesn't do partial writes.
145159
Ok(())
146160
}
147161

0 commit comments

Comments
 (0)