Skip to content

Commit 8edf551

Browse files
committed
Add --max-storage quota enforcement and statfs() support
Signed-off-by: Cong Wang <cwang@multikernel.io>
1 parent aaa7ed7 commit 8edf551

6 files changed

Lines changed: 417 additions & 27 deletions

File tree

src/branch.rs

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
22
use std::fs::{self, File};
33
use std::io::{BufRead, BufReader, Write};
44
use std::path::{Path, PathBuf};
5-
use std::sync::atomic::{AtomicU64, Ordering};
5+
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
66
use std::sync::Arc;
77
use std::time::Instant;
88

@@ -13,6 +13,79 @@ use crate::error::{BranchError, Result};
1313
use crate::inode::ROOT_INO;
1414
use crate::storage;
1515

16+
/// Tracks storage usage across all branches and enforces an optional quota.
17+
/// Only counts delta files (the actual disk cost of branching).
18+
pub struct StorageQuota {
19+
/// Current total bytes used in the storage directory (all branches' deltas).
20+
used_bytes: AtomicI64,
21+
/// Maximum allowed bytes. None = unlimited.
22+
max_bytes: Option<u64>,
23+
}
24+
25+
impl StorageQuota {
26+
pub fn new(max_bytes: Option<u64>) -> Self {
27+
Self {
28+
used_bytes: AtomicI64::new(0),
29+
max_bytes,
30+
}
31+
}
32+
33+
/// Walk the storage directory to compute initial usage.
34+
pub fn scan_usage(storage_path: &Path, max_bytes: Option<u64>) -> Self {
35+
let quota = Self::new(max_bytes);
36+
let branches_dir = storage_path.join("branches");
37+
if branches_dir.exists() {
38+
let bytes = Self::dir_size(&branches_dir);
39+
quota.used_bytes.store(bytes as i64, Ordering::Relaxed);
40+
}
41+
quota
42+
}
43+
44+
fn dir_size(path: &Path) -> u64 {
45+
let mut total = 0u64;
46+
if let Ok(entries) = fs::read_dir(path) {
47+
for entry in entries.flatten() {
48+
let p = entry.path();
49+
if p.is_dir() {
50+
total += Self::dir_size(&p);
51+
} else if let Ok(meta) = p.symlink_metadata() {
52+
total += meta.len();
53+
}
54+
}
55+
}
56+
total
57+
}
58+
59+
/// Check if `additional` bytes can be allocated. Returns Err(ENOSPC) if not.
60+
pub fn check(&self, additional: u64) -> std::result::Result<(), i32> {
61+
if let Some(max) = self.max_bytes {
62+
let current = self.used_bytes.load(Ordering::Relaxed);
63+
if current as u64 + additional > max {
64+
return Err(libc::ENOSPC);
65+
}
66+
}
67+
Ok(())
68+
}
69+
70+
/// Record that `bytes` were added to storage.
71+
pub fn add(&self, bytes: u64) {
72+
self.used_bytes.fetch_add(bytes as i64, Ordering::Relaxed);
73+
}
74+
75+
/// Record that `bytes` were removed from storage.
76+
pub fn sub(&self, bytes: u64) {
77+
self.used_bytes.fetch_sub(bytes as i64, Ordering::Relaxed);
78+
}
79+
80+
pub fn used(&self) -> u64 {
81+
self.used_bytes.load(Ordering::Relaxed).max(0) as u64
82+
}
83+
84+
pub fn max(&self) -> Option<u64> {
85+
self.max_bytes
86+
}
87+
}
88+
1689
/// Remove a file or directory at `path`, following symlinks for the type check.
1790
/// Returns `Ok(())` even if the path doesn't exist; propagates real I/O errors.
1891
fn remove_entry(path: &Path) -> std::io::Result<()> {
@@ -162,12 +235,21 @@ pub struct BranchManager {
162235
opened_inodes: Mutex<HashMap<String, HashSet<u64>>>,
163236
/// Current branch per mount — single source of truth
164237
mount_branches: RwLock<HashMap<PathBuf, String>>,
238+
/// Storage quota enforcement
239+
pub quota: StorageQuota,
165240
}
166241

167242
impl BranchManager {
168-
pub fn new(storage_path: PathBuf, base_path: PathBuf, workspace_path: PathBuf) -> Result<Self> {
243+
pub fn new(
244+
storage_path: PathBuf,
245+
base_path: PathBuf,
246+
workspace_path: PathBuf,
247+
max_storage: Option<u64>,
248+
) -> Result<Self> {
169249
fs::create_dir_all(&storage_path)?;
170250

251+
let quota = StorageQuota::scan_usage(&storage_path, max_storage);
252+
171253
// Always start fresh with just the "main" branch
172254
let mut branches = HashMap::new();
173255
let main_branch = Branch::new("main", None, &storage_path, 0)?;
@@ -182,6 +264,7 @@ impl BranchManager {
182264
notifiers: Mutex::new(HashMap::new()),
183265
opened_inodes: Mutex::new(HashMap::new()),
184266
mount_branches: RwLock::new(HashMap::new()),
267+
quota,
185268
})
186269
}
187270

src/daemon.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ impl Daemon {
9292
base_path: PathBuf,
9393
storage_path: PathBuf,
9494
_workspace_path: PathBuf,
95+
max_storage: Option<u64>,
9596
) -> Result<Self> {
9697
let socket_path = storage_path.join("daemon.sock");
9798

@@ -121,6 +122,7 @@ impl Daemon {
121122
storage_path.clone(),
122123
base_path.clone(),
123124
base_path.clone(),
125+
max_storage,
124126
)?);
125127

126128
Ok(Self {
@@ -379,7 +381,11 @@ pub fn is_daemon_running(socket_path: &Path) -> bool {
379381
UnixStream::connect(socket_path).is_ok()
380382
}
381383

382-
pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io::Result<()> {
384+
pub fn start_daemon_background(
385+
base_path: &Path,
386+
storage_path: &Path,
387+
max_storage: Option<u64>,
388+
) -> std::io::Result<()> {
383389
let socket_path = storage_path.join("daemon.sock");
384390

385391
// Spawn the daemon as a detached child process. The env var tells
@@ -389,15 +395,18 @@ pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io
389395
// so callers that capture output won't block.
390396
let exe = std::env::current_exe()?;
391397

392-
Command::new(exe)
393-
.args([
394-
"run-daemon",
395-
"--base",
396-
&base_path.to_string_lossy(),
397-
"--storage",
398-
&storage_path.to_string_lossy(),
399-
])
400-
.stdin(Stdio::null())
398+
let mut cmd = Command::new(exe);
399+
cmd.args([
400+
"run-daemon",
401+
"--base",
402+
&base_path.to_string_lossy(),
403+
"--storage",
404+
&storage_path.to_string_lossy(),
405+
]);
406+
if let Some(max) = max_storage {
407+
cmd.args(["--max-storage", &max.to_string()]);
408+
}
409+
cmd.stdin(Stdio::null())
401410
.stdout(Stdio::null())
402411
.stderr(Stdio::null())
403412
.spawn()?;
@@ -416,7 +425,11 @@ pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io
416425
))
417426
}
418427

419-
pub fn ensure_daemon(base_path: Option<&Path>, storage_path: &Path) -> std::io::Result<()> {
428+
pub fn ensure_daemon(
429+
base_path: Option<&Path>,
430+
storage_path: &Path,
431+
max_storage: Option<u64>,
432+
) -> std::io::Result<()> {
420433
let socket_path = storage_path.join("daemon.sock");
421434

422435
if is_daemon_running(&socket_path) {
@@ -440,5 +453,5 @@ pub fn ensure_daemon(base_path: Option<&Path>, storage_path: &Path) -> std::io::
440453
}
441454
};
442455

443-
start_daemon_background(&base_path, storage_path)
456+
start_daemon_background(&base_path, storage_path, max_storage)
444457
}

src/fs.rs

Lines changed: 122 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
99

1010
use fuser::{
1111
BackingId, FileType, Filesystem, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry,
12-
ReplyIoctl, ReplyOpen, ReplyWrite, Request, TimeOrNow,
12+
ReplyIoctl, ReplyOpen, ReplyStatfs, ReplyWrite, Request, TimeOrNow,
1313
};
1414
use parking_lot::RwLock;
1515

@@ -714,12 +714,27 @@ impl Filesystem for BranchFs {
714714
// to the same inode (after COW is already done).
715715
if let Some(file) = self.write_cache.get(ino, epoch) {
716716
use std::io::{Seek, SeekFrom, Write};
717+
// Check quota for writes that extend the file
718+
let old_size = file.metadata().map(|m| m.len()).unwrap_or(0);
719+
let write_end = offset as u64 + data.len() as u64;
720+
if write_end > old_size {
721+
if self.manager.quota.check(write_end - old_size).is_err() {
722+
reply.error(libc::ENOSPC);
723+
return;
724+
}
725+
}
717726
if file.seek(SeekFrom::Start(offset as u64)).is_err() {
718727
reply.error(libc::EIO);
719728
return;
720729
}
721730
match file.write(data) {
722-
Ok(n) => reply.written(n as u32),
731+
Ok(n) => {
732+
let new_end = offset as u64 + n as u64;
733+
if new_end > old_size {
734+
self.manager.quota.add(new_end - old_size);
735+
}
736+
reply.written(n as u32)
737+
}
723738
Err(_) => reply.error(libc::EIO),
724739
}
725740
return;
@@ -780,6 +795,14 @@ impl Filesystem for BranchFs {
780795
// Serve from the just-cached write fd
781796
if let Some(file) = self.write_cache.get(ino, epoch) {
782797
use std::io::{Seek, SeekFrom, Write};
798+
let old_size = file.metadata().map(|m| m.len()).unwrap_or(0);
799+
let write_end = offset as u64 + data.len() as u64;
800+
if write_end > old_size {
801+
if self.manager.quota.check(write_end - old_size).is_err() {
802+
reply.error(libc::ENOSPC);
803+
return;
804+
}
805+
}
783806
if file.seek(SeekFrom::Start(offset as u64)).is_err() {
784807
reply.error(libc::EIO);
785808
return;
@@ -790,6 +813,10 @@ impl Filesystem for BranchFs {
790813
reply.error(libc::ESTALE);
791814
return;
792815
}
816+
let new_end = offset as u64 + n as u64;
817+
if new_end > old_size {
818+
self.manager.quota.add(new_end - old_size);
819+
}
793820
reply.written(n as u32)
794821
}
795822
Err(_) => reply.error(libc::EIO),
@@ -1076,7 +1103,9 @@ impl Filesystem for BranchFs {
10761103
b.add_tombstone(&rel_path)?;
10771104
let delta = b.delta_path(&rel_path);
10781105
if delta.exists() {
1106+
let freed = delta.symlink_metadata().map(|m| m.len()).unwrap_or(0);
10791107
std::fs::remove_file(&delta)?;
1108+
self.manager.quota.sub(freed);
10801109
}
10811110
Ok(())
10821111
});
@@ -1106,7 +1135,9 @@ impl Filesystem for BranchFs {
11061135
b.add_tombstone(&path)?;
11071136
let delta = b.delta_path(&path);
11081137
if delta.exists() {
1138+
let freed = delta.symlink_metadata().map(|m| m.len()).unwrap_or(0);
11091139
std::fs::remove_file(&delta)?;
1140+
self.manager.quota.sub(freed);
11101141
}
11111142
Ok(())
11121143
});
@@ -1450,11 +1481,30 @@ impl Filesystem for BranchFs {
14501481
return;
14511482
}
14521483
if let Some(new_size) = size {
1453-
if let Ok(delta) = self.ensure_cow_for_branch(&branch, &rel_path) {
1454-
let file = std::fs::OpenOptions::new().write(true).open(&delta);
1455-
if let Ok(f) = file {
1456-
let _ = f.set_len(new_size);
1484+
match self.ensure_cow_for_branch(&branch, &rel_path) {
1485+
Ok(delta) => {
1486+
let file = std::fs::OpenOptions::new().write(true).open(&delta);
1487+
if let Ok(f) = file {
1488+
let old_size = f.metadata().map(|m| m.len()).unwrap_or(0);
1489+
if new_size > old_size {
1490+
if self.manager.quota.check(new_size - old_size).is_err() {
1491+
reply.error(libc::ENOSPC);
1492+
return;
1493+
}
1494+
}
1495+
let _ = f.set_len(new_size);
1496+
if new_size > old_size {
1497+
self.manager.quota.add(new_size - old_size);
1498+
} else if old_size > new_size {
1499+
self.manager.quota.sub(old_size - new_size);
1500+
}
1501+
}
1502+
}
1503+
Err(e) if e.raw_os_error() == Some(libc::ENOSPC) => {
1504+
reply.error(libc::ENOSPC);
1505+
return;
14571506
}
1507+
Err(_) => {}
14581508
}
14591509
}
14601510
if mode.is_some()
@@ -1478,11 +1528,30 @@ impl Filesystem for BranchFs {
14781528
_ => {
14791529
// Root path (existing logic)
14801530
if let Some(new_size) = size {
1481-
if let Ok(delta) = self.ensure_cow(&path) {
1482-
let file = std::fs::OpenOptions::new().write(true).open(&delta);
1483-
if let Ok(f) = file {
1484-
let _ = f.set_len(new_size);
1531+
match self.ensure_cow(&path) {
1532+
Ok(delta) => {
1533+
let file = std::fs::OpenOptions::new().write(true).open(&delta);
1534+
if let Ok(f) = file {
1535+
let old_size = f.metadata().map(|m| m.len()).unwrap_or(0);
1536+
if new_size > old_size {
1537+
if self.manager.quota.check(new_size - old_size).is_err() {
1538+
reply.error(libc::ENOSPC);
1539+
return;
1540+
}
1541+
}
1542+
let _ = f.set_len(new_size);
1543+
if new_size > old_size {
1544+
self.manager.quota.add(new_size - old_size);
1545+
} else if old_size > new_size {
1546+
self.manager.quota.sub(old_size - new_size);
1547+
}
1548+
}
1549+
}
1550+
Err(e) if e.raw_os_error() == Some(libc::ENOSPC) => {
1551+
reply.error(libc::ENOSPC);
1552+
return;
14851553
}
1554+
Err(_) => {}
14861555
}
14871556
}
14881557
if mode.is_some()
@@ -1817,4 +1886,47 @@ impl Filesystem for BranchFs {
18171886
}
18181887
}
18191888
}
1889+
1890+
fn statfs(&mut self, _req: &Request, _ino: u64, reply: ReplyStatfs) {
1891+
let block_size = 4096u32;
1892+
let quota = &self.manager.quota;
1893+
1894+
if let Some(max_bytes) = quota.max() {
1895+
let used = quota.used();
1896+
let total_blocks = max_bytes / block_size as u64;
1897+
let used_blocks = used.min(max_bytes) / block_size as u64;
1898+
let avail_blocks = total_blocks.saturating_sub(used_blocks);
1899+
1900+
reply.statfs(
1901+
total_blocks, // total blocks
1902+
avail_blocks, // free blocks
1903+
avail_blocks, // available blocks (to unprivileged users)
1904+
0, // total inodes (0 = unspecified)
1905+
0, // free inodes
1906+
block_size, // block size
1907+
255, // max name length
1908+
block_size, // fragment size
1909+
);
1910+
} else {
1911+
// No quota — report from the storage filesystem
1912+
let storage_path = &self.manager.storage_path;
1913+
match nix::sys::statvfs::statvfs(storage_path) {
1914+
Ok(stat) => {
1915+
reply.statfs(
1916+
stat.blocks(),
1917+
stat.blocks_free(),
1918+
stat.blocks_available(),
1919+
stat.files(),
1920+
stat.files_free(),
1921+
stat.block_size() as u32,
1922+
stat.name_max() as u32,
1923+
stat.fragment_size() as u32,
1924+
);
1925+
}
1926+
Err(_) => {
1927+
reply.error(libc::EIO);
1928+
}
1929+
}
1930+
}
1931+
}
18201932
}

0 commit comments

Comments
 (0)