You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Implement USB device monitoring and control directly in the Rust daemon (bastion-rs). This is Option B from the architecture analysis - a native Rust implementation for best performance and unified architecture.
Reference: PR #6 contains a complete Python implementation that was never merged to master (it targets the legacy Python daemon). The design patterns, data structures, and security model from that PR should be used as the specification for this Rust implementation.
Goals
Default-deny USB policy: New USB devices are blocked until explicitly approved
User prompts: Show popup dialog when new USB device is inserted
Scoped rules: Allow/block by device (exact), model (vendor:product), or vendor
Risk classification: Warn users about high-risk devices (HID, wireless)
Persistent rules: Store decisions in /etc/bastion/usb_rules.json
Sysfs authorization: Control device access via /sys/bus/usb/devices/{bus_id}/authorized
Architecture
flowchart LR
subgraph Kernel
A[USB Device Inserted] --> B[udev event]
end
subgraph bastion-daemon Rust
B -->|udev crate| C[UsbMonitor]
C --> D[UsbDeviceInfo]
D --> E{Check Rules}
E -->|known| F[Apply Verdict]
E -->|unknown| G[Send Prompt to GUI]
F --> H[sysfs authorize/deauthorize]
end
subgraph GUI
G -->|IPC| I[USBPromptDialog]
I -->|user decision| J[Response via IPC]
J --> K[UsbRuleManager]
K --> F
end
Loading
USB Device Classification
From Python implementation (usb_device.py):
/// USB Device Class codes (bDeviceClass / bInterfaceClass)#[repr(u8)]pubenumUsbClass{PerInterface = 0x00,// Class defined at interface levelAudio = 0x01,CdcComm = 0x02,// CommunicationsHid = 0x03,// Human Interface Device (keyboard, mouse)Physical = 0x05,Image = 0x06,// Still image capturePrinter = 0x07,MassStorage = 0x08,// USB drivesHub = 0x09,CdcData = 0x0A,SmartCard = 0x0B,Video = 0x0E,AudioVideo = 0x10,Billboard = 0x11,UsbCBridge = 0x12,Wireless = 0xE0,// Bluetooth, WiFi adaptersMisc = 0xEF,Application = 0xFE,VendorSpec = 0xFF,}/// High-risk classes that can execute commands or create network interfacesconstHIGH_RISK_CLASSES:&[UsbClass] = &[UsbClass::Hid,// Can type commands (BadUSB attack vector)UsbClass::Wireless,// Can create network interfaces];/// Low-risk classes that are generally safeconstLOW_RISK_CLASSES:&[UsbClass] = &[UsbClass::Hub,// Just provides more portsUsbClass::Audio,// Speakers, microphonesUsbClass::Video,// WebcamsUsbClass::Printer,// Printers];
USB Device Info Structure
From Python implementation (usb_device.py):
pubstructUsbDeviceInfo{// Core identificationpubvendor_id:String,// e.g., "046d" (Logitech)pubproduct_id:String,// e.g., "c52b"pubvendor_name:String,// e.g., "Logitech, Inc."pubproduct_name:String,// e.g., "Unifying Receiver"// Classificationpubdevice_class:u8,// USB class code (e.g., 0x03 for HID)pubinterface_classes:Vec<u8>,// Classes at interface level// Unique identificationpubserial:Option<String>,// Unique serial number (if available)pubbus_id:String,// e.g., "1-2.3" (sysfs path component)// Computed at creationpubbus_num:u32,// USB bus numberpubdev_num:u32,// Device number on bus}implUsbDeviceInfo{/// Unique identifier for this exact device (includes serial)pubfnunique_id(&self) -> String{let serial_part = self.serial.as_deref().unwrap_or("no-serial");format!("{}:{}:{}",self.vendor_id,self.product_id, serial_part)}/// Identifier for this model (ignores serial)pubfnmodel_id(&self) -> String{format!("{}:{}",self.vendor_id,self.product_id)}pubfnis_high_risk(&self) -> bool{HIGH_RISK_CLASSES.contains(&UsbClass::from(self.device_class))
|| self.interface_classes.iter().any(|c| HIGH_RISK_CLASSES.contains(&UsbClass::from(*c)))}}
USB Monitoring with udev Crate
The Rust udev crate provides native access to udev events:
use udev::{MonitorBuilder,MonitorSocket};pubstructUsbMonitor{socket:MonitorSocket,}implUsbMonitor{pubfnnew() -> Result<Self,Error>{let socket = MonitorBuilder::new()?
.match_subsystem("usb")?
.listen()?;Ok(Self{ socket })}/// Process events (call in a loop or thread)pubfnpoll_event(&mutself) -> Option<(UsbDeviceInfo,UsbAction)>{ifletSome(event) = self.socket.iter().next(){let action = match event.action(){Some(a)if a == "add" => UsbAction::Add,Some(a)if a == "remove" => UsbAction::Remove,
_ => returnNone,};// Skip interfaces (e.g., "3-4:1.0") - only handle devicesif event.sysname().to_string_lossy().contains(':'){returnNone;}let device_info = self.extract_device_info(&event)?;returnSome((device_info, action));}None}fnextract_device_info(&self,device:&udev::Device) -> Option<UsbDeviceInfo>{let vendor_id = device.property_value("ID_VENDOR_ID")?.to_string_lossy().to_string();let product_id = device.property_value("ID_MODEL_ID")?.to_string_lossy().to_string();let vendor_name = device.property_value("ID_VENDOR_FROM_DATABASE").or_else(|| device.property_value("ID_VENDOR")).map(|v| v.to_string_lossy().replace('_'," ")).unwrap_or_else(|| "Unknown Vendor".to_string());let product_name = device.property_value("ID_MODEL_FROM_DATABASE").or_else(|| device.property_value("ID_MODEL")).map(|v| v.to_string_lossy().replace('_'," ")).unwrap_or_else(|| "Unknown Device".to_string());// Parse device class from sysfs attributelet device_class = device.attribute_value("bDeviceClass").and_then(|v| u8::from_str_radix(&v.to_string_lossy(),16).ok()).unwrap_or(0);// Serial number (sanitize!)let serial = device.property_value("ID_SERIAL").or_else(|| device.property_value("ID_SERIAL_SHORT")).map(|v| sanitize_serial(&v.to_string_lossy()));Some(UsbDeviceInfo{vendor_id:sanitize_hex_id(&vendor_id),product_id:sanitize_hex_id(&product_id),
vendor_name,
product_name,
device_class,interface_classes:vec![],// Populate by iterating children
serial,bus_id: device.sysname().to_string_lossy().to_string(),bus_num: device.property_value("BUSNUM").and_then(|v| v.to_string_lossy().parse().ok()).unwrap_or(0),dev_num: device.property_value("DEVNUM").and_then(|v| v.to_string_lossy().parse().ok()).unwrap_or(0),})}}pubenumUsbAction{Add,Remove,}
USB Rule Storage
From Python implementation (usb_rules.py):
Rule Structure
#[derive(Serialize,Deserialize,Clone)]pubstructUsbRule{pubverdict:Verdict,// "allow" or "block"pubvendor_id:String,// e.g., "046d"pubproduct_id:String,// e.g., "c52b"pubvendor_name:String,// e.g., "Logitech, Inc."pubproduct_name:String,// e.g., "Unifying Receiver"pubscope:Scope,// "device", "model", or "vendor"pubadded:String,// ISO timestamppublast_seen:Option<String>,// ISO timestamppubserial:Option<String>,// Only for device scope}#[derive(Serialize,Deserialize,Clone,PartialEq)]#[serde(rename_all = "lowercase")]pubenumVerdict{Allow,Block,}#[derive(Serialize,Deserialize,Clone,PartialEq)]#[serde(rename_all = "lowercase")]pubenumScope{Device,// Exact device (by serial)Model,// All devices of this model (vendor:product)Vendor,// All devices from this vendor}
Rule Key Format
Scope
Key Format
Example
Device
{vendor_id}:{product_id}:{serial}
046d:c52b:ABC123
Model
{vendor_id}:{product_id}:*
046d:c52b:*
Vendor
{vendor_id}:*:*
046d:*:*
Rule Lookup Priority
Device (most specific) - exact match by serial
Model - match by vendor:product
Vendor (least specific) - match by vendor only
Storage File
Path: /etc/bastion/usb_rules.json
Permissions: 0640 (owner rw, group r)
Atomic writes: Write to temp file, then rename
USB Authorization via sysfs
From Python implementation (usb_rules.py - USBAuthorizer class):
constSYSFS_USB_PATH:&str = "/sys/bus/usb/devices";pubstructUsbAuthorizer;implUsbAuthorizer{/// Get authorization file path for devicefnget_auth_path(bus_id:&str) -> Result<PathBuf,Error>{// Validate bus_id format (e.g., "1-2.3")if !bus_id.chars().all(|c| c.is_ascii_digit() || c == '-' || c == '.'){returnErr(Error::InvalidBusId);}if bus_id.contains(".."){returnErr(Error::PathTraversal);}Ok(PathBuf::from(SYSFS_USB_PATH).join(bus_id).join("authorized"))}/// Authorize (enable) a USB devicepubfnauthorize(bus_id:&str) -> Result<(),Error>{let path = Self::get_auth_path(bus_id)?;
std::fs::write(&path,"1")?;info!("Authorized USB device: {}", bus_id);Ok(())}/// Deauthorize (disable) a USB device/// WARNING: This immediately disconnects the device!pubfndeauthorize(bus_id:&str) -> Result<(),Error>{let path = Self::get_auth_path(bus_id)?;
std::fs::write(&path,"0")?;info!("Deauthorized USB device: {}", bus_id);Ok(())}/// Set default policy for new USB devicespubfnset_default_policy(authorize:bool) -> Result<(),Error>{let value = if authorize {"1"}else{"0"};let sysfs_path = PathBuf::from(SYSFS_USB_PATH);for entry in std::fs::read_dir(&sysfs_path)? {let entry = entry?;let name = entry.file_name();if name.to_string_lossy().starts_with("usb"){let auth_default = entry.path().join("authorized_default");if auth_default.exists(){
std::fs::write(&auth_default, value)?;}}}Ok(())}}
Overview
Implement USB device monitoring and control directly in the Rust daemon (
bastion-rs). This is Option B from the architecture analysis - a native Rust implementation for best performance and unified architecture.Reference: PR #6 contains a complete Python implementation that was never merged to master (it targets the legacy Python daemon). The design patterns, data structures, and security model from that PR should be used as the specification for this Rust implementation.
Goals
/etc/bastion/usb_rules.json/sys/bus/usb/devices/{bus_id}/authorizedArchitecture
flowchart LR subgraph Kernel A[USB Device Inserted] --> B[udev event] end subgraph bastion-daemon Rust B -->|udev crate| C[UsbMonitor] C --> D[UsbDeviceInfo] D --> E{Check Rules} E -->|known| F[Apply Verdict] E -->|unknown| G[Send Prompt to GUI] F --> H[sysfs authorize/deauthorize] end subgraph GUI G -->|IPC| I[USBPromptDialog] I -->|user decision| J[Response via IPC] J --> K[UsbRuleManager] K --> F endUSB Device Classification
From Python implementation (
usb_device.py):USB Device Info Structure
From Python implementation (
usb_device.py):USB Monitoring with udev Crate
The Rust
udevcrate provides native access to udev events:USB Rule Storage
From Python implementation (
usb_rules.py):Rule Structure
Rule Key Format
{vendor_id}:{product_id}:{serial}046d:c52b:ABC123{vendor_id}:{product_id}:*046d:c52b:*{vendor_id}:*:*046d:*:*Rule Lookup Priority
Storage File
/etc/bastion/usb_rules.json0640(owner rw, group r)USB Authorization via sysfs
From Python implementation (
usb_rules.py-USBAuthorizerclass):Input Validation
From Python implementation (
usb_validation.py):[0-9a-f]046d[0-9a-f]c52b[A-Za-z0-9._-]ABC123-456046d:c52b:*Logitech, Inc.IPC Protocol Extension
Extend the existing daemon-GUI IPC to support USB messages:
USB Prompt Request (Daemon → GUI)
{ "type": "usb_request", "nonce": "abc123", "device": { "vendor_id": "046d", "product_id": "c52b", "vendor_name": "Logitech, Inc.", "product_name": "Unifying Receiver", "device_class": 3, "is_high_risk": true, "serial": "ABC123", "bus_id": "1-2.3" } }USB Prompt Response (GUI → Daemon)
{ "type": "usb_response", "nonce": "abc123", "verdict": "allow", "scope": "device", "permanent": true }GUI Integration
Reuse the PyQt6 dialog from PR #6 (
usb_gui.py):USBPromptDialog: Popup for new device allow/block decision
USBControlWidget: Control panel page for USB management
Security Model
Threat Model
..and symlinksPrivilege Separation
Implementation Plan
Phase 1: Core USB Module (~4 hours)
bastion-rs/src/usb/mod.rsmoduleUsbDeviceInfostruct with classificationUsbClassenum with risk levelsUsbMonitorusingudevcratePhase 2: Rule Management (~3 hours)
UsbRuleandUsbRuleManagerPhase 3: sysfs Authorization (~2 hours)
UsbAuthorizerwith path validationPhase 4: Daemon Integration (~3 hours)
Phase 5: GUI Integration (~3 hours)
USBPromptDialogfrom PR USB Device Control - Fresh Implementation #6 (already Qt6)USBControlWidgetfrom PR USB Device Control - Fresh Implementation #6Phase 6: Packaging & Testing (~2 hours)
libudev-devto build dependenciesDependencies
Rust Crates
System Dependencies
Estimated Effort
References
feature/usb-device-control-v2- Contains all Python source code