-
Notifications
You must be signed in to change notification settings - Fork 177
Expand file tree
/
Copy pathprocess_tx.rs
More file actions
107 lines (99 loc) · 3.71 KB
/
process_tx.rs
File metadata and controls
107 lines (99 loc) · 3.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{io, thread};
use vm_memory::{GuestMemory, GuestMemoryError, GuestMemoryMmap, GuestMemoryRegion};
use crate::virtio::console::port_io::PortOutput;
use crate::virtio::{DescriptorChain, InterruptTransport, Queue};
pub(crate) fn process_tx(
mem: GuestMemoryMmap,
mut queue: Queue,
interrupt: InterruptTransport,
output: Arc<Mutex<Box<dyn PortOutput + Send>>>,
stop: Arc<AtomicBool>,
) {
loop {
let Some(head) = pop_head_blocking(&mut queue, &mem, &interrupt, &stop) else {
return;
};
let head_index = head.index;
let mut bytes_written = 0;
for desc in head.into_iter().readable() {
let desc_len = desc.len as usize;
match write_desc_to_output(desc, output.lock().unwrap().as_mut(), &interrupt) {
Ok(0) => {
break;
}
Ok(n) => {
assert_eq!(n, desc_len);
bytes_written += n;
}
Err(e) => {
log::error!("Failed to write output: {e}");
if matches!(e, GuestMemoryError::IOError(e) if e.kind() == io::ErrorKind::BrokenPipe)
{
// Errors could conceivably be spurious. Broken
// pipe is not and there is no point in attempting
// to write more.
return;
}
}
}
}
if bytes_written == 0 {
log::trace!("Tx Add used {bytes_written}");
queue.undo_pop();
} else {
log::trace!("Tx add used {bytes_written}");
if let Err(e) = queue.add_used(&mem, head_index, bytes_written as u32) {
error!("failed to add used elements to the queue: {e:?}");
}
}
}
}
fn pop_head_blocking<'mem>(
queue: &mut Queue,
mem: &'mem GuestMemoryMmap,
interrupt: &InterruptTransport,
stop: &AtomicBool,
) -> Option<DescriptorChain<'mem>> {
loop {
match queue.pop(mem) {
Some(descriptor) => break Some(descriptor),
None => {
interrupt.signal_used_queue();
if stop.load(Ordering::Acquire) {
break None;
}
thread::park();
log::trace!("tx unparked, queue len {}", queue.len(mem))
}
}
}
}
fn write_desc_to_output(
desc: DescriptorChain,
output: &mut (dyn PortOutput + Send),
interrupt: &InterruptTransport,
) -> Result<usize, GuestMemoryError> {
// TODO: Switch to using `get_slices()` with the next vm-memory
// bump.
#[allow(deprecated)]
desc.mem
.try_access(desc.len as usize, desc.addr, |_, len, addr, region| {
let src = region.get_slice(addr, len).unwrap();
loop {
log::trace!("Tx {src:?}, write_volatile {len} bytes");
match output.write_volatile(&src) {
// try_access seem to handle partial write for us (we will be invoked again with an offset)
Ok(n) => break Ok(n),
// We can't return an error otherwise we would not know how many bytes were processed before WouldBlock
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
log::trace!("Tx wait for output (would block)");
interrupt.signal_used_queue();
output.wait_until_writable();
}
Err(e) => break Err(GuestMemoryError::IOError(e)),
}
}
})
}