Skip to content

Commit 1efba4c

Browse files
committed
Add multi-reader FUSE support via FUSE_DEV_IOC_CLONE
Adds support for multi-threaded FUSE request processing using cloned file descriptors. This enables parallel request handling for improved throughput. API additions: - Session::clone_fd() - clones FUSE fd using FUSE_DEV_IOC_CLONE ioctl - Session::from_fd_initialized() - creates session from cloned fd, skips INIT handshake since primary session already completed it - Session::run() made public for external multi-reader use - Conditional handshake skip when proto_version is pre-set Usage pattern: let session = Session::new(fs, mountpoint, &Config::default())?; let cloned_fd = session.clone_fd()?; let reader = Session::from_fd_initialized(fs2, cloned_fd, SessionACL::All); thread::spawn(move || reader.run()); session.run()?; Platform: Linux only (#[cfg(target_os = "linux")]) Tests: Added clone_fd_multi_reader and from_fd_initialized_works integration tests verifying multi-reader request distribution.
1 parent 6840c83 commit 1efba4c

3 files changed

Lines changed: 351 additions & 2 deletions

File tree

src/channel.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
use std::io;
22
use std::os::fd::AsFd;
3+
use std::os::fd::AsRawFd;
34
use std::os::fd::BorrowedFd;
5+
use std::os::fd::FromRawFd;
6+
use std::os::fd::OwnedFd;
47
use std::sync::Arc;
58

69
use nix::errno::Errno;
710

811
use crate::dev_fuse::DevFuse;
912
use crate::passthrough::BackingId;
1013

14+
/// FUSE_DEV_IOC_CLONE ioctl number for cloning /dev/fuse file descriptors.
15+
/// This is _IOW('E', 0, __u32) = 0x8004e500 on Linux.
16+
#[cfg(all(target_os = "linux", target_env = "musl"))]
17+
const FUSE_DEV_IOC_CLONE: libc::c_int = 0x8004e500u32 as libc::c_int;
18+
#[cfg(all(target_os = "linux", not(target_env = "musl")))]
19+
const FUSE_DEV_IOC_CLONE: libc::c_ulong = 0x8004e500;
20+
1121
/// A raw communication channel to the FUSE kernel driver
1222
#[derive(Debug)]
1323
pub(crate) struct Channel(Arc<DevFuse>);
@@ -55,6 +65,32 @@ impl Channel {
5565
// a sender by using the same file and use it in other threads.
5666
ChannelSender(self.0.clone())
5767
}
68+
69+
/// Clone the FUSE file descriptor for multi-threaded request processing.
70+
///
71+
/// Uses the `FUSE_DEV_IOC_CLONE` ioctl to create a new fd that shares
72+
/// the same FUSE connection. Each cloned fd can independently read
73+
/// requests and must send responses on the same fd that read the request.
74+
#[cfg(target_os = "linux")]
75+
pub(crate) fn clone_fd(&self) -> io::Result<OwnedFd> {
76+
// Open a new /dev/fuse fd
77+
let fd = unsafe { libc::open(c"/dev/fuse".as_ptr(), libc::O_RDWR) };
78+
if fd < 0 {
79+
return Err(io::Error::last_os_error());
80+
}
81+
// SAFETY: fd is valid, we just opened it successfully
82+
let new_fd = unsafe { OwnedFd::from_raw_fd(fd) };
83+
84+
// Clone the session onto the new fd
85+
let original_fd = self.0.as_raw_fd() as u32;
86+
// SAFETY: ioctl with FUSE_DEV_IOC_CLONE expects a pointer to u32 containing the source fd
87+
let ret = unsafe { libc::ioctl(new_fd.as_raw_fd(), FUSE_DEV_IOC_CLONE, &original_fd) };
88+
if ret < 0 {
89+
return Err(io::Error::last_os_error());
90+
}
91+
92+
Ok(new_fd)
93+
}
5894
}
5995

6096
#[derive(Clone, Debug)]

src/session.rs

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,42 @@ impl<FS: Filesystem> Session<FS> {
197197
}
198198
}
199199

200+
/// Create a session from a cloned FUSE file descriptor for multi-reader setups.
201+
///
202+
/// Use this with fds obtained from [`Session::clone_fd()`] when the primary
203+
/// session has already completed the FUSE INIT handshake.
204+
///
205+
/// # Arguments
206+
/// * `filesystem` - The filesystem implementation to handle requests
207+
/// * `fd` - A cloned fd from [`Session::clone_fd()`]
208+
/// * `acl` - Access control settings for the session
209+
///
210+
/// # Important
211+
/// This session skips the FUSE INIT protocol. Using this with an uninitialized
212+
/// fd will cause all requests to fail with EIO.
213+
///
214+
/// Each cloned fd handles its own request/response pairs - the FUSE kernel
215+
/// requires that the fd which reads a request is the same fd that sends the response.
216+
pub fn from_fd_initialized(filesystem: FS, fd: OwnedFd, acl: SessionACL) -> Self {
217+
let ch = Channel::new(Arc::new(DevFuse(File::from(fd))));
218+
Session {
219+
filesystem: FilesystemHolder {
220+
fs: Some(filesystem),
221+
},
222+
ch,
223+
mount: UmountOnDrop {
224+
mount: Arc::new(Mutex::new(None)),
225+
},
226+
allowed: acl,
227+
session_owner: geteuid(),
228+
// Skip INIT - caller guarantees mount is initialized
229+
proto_version: Some(Version(
230+
abi::FUSE_KERNEL_VERSION,
231+
abi::FUSE_KERNEL_MINOR_VERSION,
232+
)),
233+
}
234+
}
235+
200236
/// Run the session loop in a background thread. If the returned handle is dropped,
201237
/// the filesystem is unmounted and the given session ends.
202238
pub fn spawn(self) -> io::Result<BackgroundSession> {
@@ -217,8 +253,11 @@ impl<FS: Filesystem> Session<FS> {
217253
/// may run concurrent by spawning threads.
218254
/// # Errors
219255
/// Returns any final error when the session comes to an end.
220-
pub(crate) fn run(mut self) -> io::Result<()> {
221-
self.handshake()?;
256+
pub fn run(mut self) -> io::Result<()> {
257+
// Skip handshake if session is already initialized (e.g., from_fd_initialized)
258+
if self.proto_version.is_none() {
259+
self.handshake()?;
260+
}
222261

223262
let ret = self.event_loop();
224263

@@ -429,6 +468,35 @@ impl<FS: Filesystem> Session<FS> {
429468
pub fn notifier(&self) -> Notifier {
430469
Notifier::new(self.ch.sender())
431470
}
471+
472+
/// Clone the FUSE file descriptor for multi-threaded request processing.
473+
///
474+
/// Creates a new fd that can independently read FUSE requests, enabling
475+
/// multi-threaded request handling. The cloned fd shares the same FUSE
476+
/// connection but can be used by a separate thread to read requests in parallel.
477+
///
478+
/// # Usage
479+
/// ```ignore
480+
/// // Primary session handles INIT and runs in main thread
481+
/// let session = Session::new(fs, mountpoint, options)?;
482+
///
483+
/// // Clone fd for additional reader threads
484+
/// let cloned_fd = session.clone_fd()?;
485+
/// let reader_session = Session::from_fd_initialized(fs_clone, cloned_fd, acl);
486+
/// std::thread::spawn(move || reader_session.run());
487+
///
488+
/// session.run()?;
489+
/// ```
490+
///
491+
/// # Platform Support
492+
/// This is only available on Linux.
493+
///
494+
/// # Errors
495+
/// Returns an error if `/dev/fuse` cannot be opened or the ioctl fails.
496+
#[cfg(target_os = "linux")]
497+
pub fn clone_fd(&self) -> io::Result<OwnedFd> {
498+
self.ch.clone_fd()
499+
}
432500
}
433501

434502
#[derive(Debug)]

tests/integration_tests.rs

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
use std::os::unix::fs::PermissionsExt;
2+
use std::sync::Arc;
3+
use std::sync::atomic::AtomicUsize;
4+
use std::sync::atomic::Ordering;
5+
use std::thread;
6+
use std::time::Duration;
7+
8+
use fuser::Config;
9+
use fuser::Errno;
10+
use fuser::FileHandle;
11+
use fuser::Filesystem;
12+
use fuser::INodeNo;
13+
use fuser::Session;
14+
use fuser::SessionACL;
15+
use tempfile::TempDir;
16+
17+
/// Test that clone_fd creates a working file descriptor for multi-reader setups.
18+
#[cfg(target_os = "linux")]
19+
#[test]
20+
fn clone_fd_multi_reader() {
21+
use std::os::fd::AsRawFd;
22+
23+
// Simple filesystem that tracks how many times getattr is called
24+
struct CountingFS {
25+
count: Arc<AtomicUsize>,
26+
}
27+
28+
impl Filesystem for CountingFS {
29+
fn getattr(
30+
&self,
31+
_req: &fuser::Request,
32+
ino: INodeNo,
33+
_fh: Option<FileHandle>,
34+
reply: fuser::ReplyAttr,
35+
) {
36+
self.count.fetch_add(1, Ordering::SeqCst);
37+
if ino == INodeNo::ROOT {
38+
// Root directory
39+
reply.attr(
40+
&Duration::from_secs(1),
41+
&fuser::FileAttr {
42+
ino: INodeNo::ROOT,
43+
size: 0,
44+
blocks: 0,
45+
atime: std::time::UNIX_EPOCH,
46+
mtime: std::time::UNIX_EPOCH,
47+
ctime: std::time::UNIX_EPOCH,
48+
crtime: std::time::UNIX_EPOCH,
49+
kind: fuser::FileType::Directory,
50+
perm: 0o755,
51+
nlink: 2,
52+
uid: 0,
53+
gid: 0,
54+
rdev: 0,
55+
blksize: 4096,
56+
flags: 0,
57+
},
58+
);
59+
} else {
60+
reply.error(Errno::ENOENT);
61+
}
62+
}
63+
}
64+
65+
let tmpdir: TempDir = tempfile::tempdir().unwrap();
66+
let count = Arc::new(AtomicUsize::new(0));
67+
68+
let session = Session::new(
69+
CountingFS {
70+
count: count.clone(),
71+
},
72+
tmpdir.path(),
73+
&Config::default(),
74+
)
75+
.unwrap();
76+
77+
// Clone the fd - this should succeed
78+
let cloned_fd = session.clone_fd().expect("clone_fd should succeed");
79+
80+
// Verify it's a valid fd (different from the original)
81+
assert!(cloned_fd.as_raw_fd() >= 0);
82+
83+
// Clean up
84+
drop(cloned_fd);
85+
drop(session);
86+
}
87+
88+
/// Test that from_fd_initialized creates a session that can process requests.
89+
/// Verifies both readers receive requests and metadata returns expected values.
90+
#[cfg(target_os = "linux")]
91+
#[test]
92+
fn from_fd_initialized_works() {
93+
// Filesystem that tracks request count per instance with artificial delay
94+
// to ensure kernel dispatches to both readers
95+
struct SlowCountingFS {
96+
count: Arc<AtomicUsize>,
97+
}
98+
99+
impl Filesystem for SlowCountingFS {
100+
fn getattr(
101+
&self,
102+
_req: &fuser::Request,
103+
ino: INodeNo,
104+
_fh: Option<FileHandle>,
105+
reply: fuser::ReplyAttr,
106+
) {
107+
self.count.fetch_add(1, Ordering::SeqCst);
108+
109+
// Add delay so while one reader is processing, the kernel
110+
// will dispatch concurrent requests to the other reader
111+
thread::sleep(Duration::from_millis(50));
112+
113+
if ino == INodeNo::ROOT {
114+
reply.attr(
115+
&Duration::from_secs(0), // No caching to ensure requests reach FUSE
116+
&fuser::FileAttr {
117+
ino: INodeNo::ROOT,
118+
size: 0,
119+
blocks: 0,
120+
atime: std::time::UNIX_EPOCH,
121+
mtime: std::time::UNIX_EPOCH,
122+
ctime: std::time::UNIX_EPOCH,
123+
crtime: std::time::UNIX_EPOCH,
124+
kind: fuser::FileType::Directory,
125+
perm: 0o755,
126+
nlink: 2,
127+
uid: 0,
128+
gid: 0,
129+
rdev: 0,
130+
blksize: 4096,
131+
flags: 0,
132+
},
133+
);
134+
} else {
135+
reply.error(Errno::ENOENT);
136+
}
137+
}
138+
}
139+
140+
let tmpdir: TempDir = tempfile::tempdir().unwrap();
141+
142+
// Separate counters to track which reader handled requests
143+
let primary_count = Arc::new(AtomicUsize::new(0));
144+
let reader_count = Arc::new(AtomicUsize::new(0));
145+
146+
let session = Session::new(
147+
SlowCountingFS {
148+
count: primary_count.clone(),
149+
},
150+
tmpdir.path(),
151+
&Config::default(),
152+
)
153+
.unwrap();
154+
155+
// Clone fd for second reader BEFORE spawning the primary (spawn takes ownership)
156+
let cloned_fd = session.clone_fd().expect("clone_fd should succeed");
157+
158+
// Save path for concurrent access (before session is moved)
159+
let path = tmpdir.path().to_path_buf();
160+
161+
// Spawn primary session in background
162+
let primary_bg = session.spawn().unwrap();
163+
164+
// Start second reader in a thread
165+
let reader_count_clone = reader_count.clone();
166+
let reader_handle = thread::spawn(move || {
167+
let reader_session = Session::from_fd_initialized(
168+
SlowCountingFS {
169+
count: reader_count_clone,
170+
},
171+
cloned_fd,
172+
SessionACL::All,
173+
);
174+
// Spawn in background - the thread will run until ENODEV when primary unmounts
175+
let bg = reader_session.spawn().unwrap();
176+
// Keep BackgroundSession alive - when dropped it will wait for the thread
177+
// The thread exits on ENODEV when primary unmounts
178+
drop(bg);
179+
});
180+
181+
// Give readers time to start processing
182+
thread::sleep(Duration::from_millis(100));
183+
184+
// Generate concurrent requests from multiple threads
185+
// With 50ms delay per request and concurrent threads, the kernel should
186+
// dispatch to both readers
187+
let request_threads: Vec<_> = (0..4)
188+
.map(|_| {
189+
let p = path.clone();
190+
thread::spawn(move || {
191+
for _ in 0..5 {
192+
let meta = std::fs::metadata(&p);
193+
// Verify metadata returns expected values
194+
if let Ok(m) = meta {
195+
assert!(m.is_dir(), "root should be a directory");
196+
assert_eq!(
197+
m.permissions().mode() & 0o777,
198+
0o755,
199+
"permissions should be 0o755"
200+
);
201+
}
202+
}
203+
})
204+
})
205+
.collect();
206+
207+
// Wait for all request threads
208+
for t in request_threads {
209+
t.join().unwrap();
210+
}
211+
212+
// Let any in-flight requests complete
213+
thread::sleep(Duration::from_millis(200));
214+
215+
// Unmount by dropping the primary BackgroundSession
216+
// This will cause the secondary to exit with ENODEV
217+
drop(primary_bg);
218+
219+
// Wait for reader thread to finish
220+
let _ = reader_handle.join();
221+
222+
// Verify both readers processed requests
223+
let primary = primary_count.load(Ordering::SeqCst);
224+
let reader = reader_count.load(Ordering::SeqCst);
225+
let total = primary + reader;
226+
227+
eprintln!(
228+
"Request distribution: primary={}, reader={}, total={}",
229+
primary, reader, total
230+
);
231+
232+
// Total should be > 0 (requests were processed)
233+
assert!(total > 0, "expected some requests to be processed");
234+
235+
// With 50ms delay per request and 4 concurrent threads, both readers
236+
// should handle some requests. The kernel dispatches to whichever
237+
// reader is blocked in read(), and with the delay, both should be available.
238+
assert!(
239+
primary > 0 && reader > 0,
240+
"expected both readers to process requests: primary={}, reader={}. \
241+
This verifies multi-threaded request handling works.",
242+
primary,
243+
reader
244+
);
245+
}

0 commit comments

Comments
 (0)