Skip to content

USB Device Control - Rust Native Implementation (Option B) #29

@shipdocs

Description

@shipdocs

Overview

Implement USB device monitoring and control directly in the Rust daemon (bastion-rs). This is Option B from the architecture analysis - a native Rust implementation for best performance and unified architecture.

Reference: PR #6 contains a complete Python implementation that was never merged to master (it targets the legacy Python daemon). The design patterns, data structures, and security model from that PR should be used as the specification for this Rust implementation.


Goals

  • Default-deny USB policy: New USB devices are blocked until explicitly approved
  • User prompts: Show popup dialog when new USB device is inserted
  • Scoped rules: Allow/block by device (exact), model (vendor:product), or vendor
  • Risk classification: Warn users about high-risk devices (HID, wireless)
  • Persistent rules: Store decisions in /etc/bastion/usb_rules.json
  • Sysfs authorization: Control device access via /sys/bus/usb/devices/{bus_id}/authorized

Architecture

flowchart LR
  subgraph Kernel
    A[USB Device Inserted] --> B[udev event]
  end
  
  subgraph bastion-daemon Rust
    B -->|udev crate| C[UsbMonitor]
    C --> D[UsbDeviceInfo]
    D --> E{Check Rules}
    E -->|known| F[Apply Verdict]
    E -->|unknown| G[Send Prompt to GUI]
    F --> H[sysfs authorize/deauthorize]
  end
  
  subgraph GUI
    G -->|IPC| I[USBPromptDialog]
    I -->|user decision| J[Response via IPC]
    J --> K[UsbRuleManager]
    K --> F
  end
Loading

USB Device Classification

From Python implementation (usb_device.py):

/// USB Device Class codes (bDeviceClass / bInterfaceClass)
#[repr(u8)]
pub enum UsbClass {
    PerInterface = 0x00,  // Class defined at interface level
    Audio = 0x01,
    CdcComm = 0x02,       // Communications
    Hid = 0x03,           // Human Interface Device (keyboard, mouse)
    Physical = 0x05,
    Image = 0x06,         // Still image capture
    Printer = 0x07,
    MassStorage = 0x08,   // USB drives
    Hub = 0x09,
    CdcData = 0x0A,
    SmartCard = 0x0B,
    Video = 0x0E,
    AudioVideo = 0x10,
    Billboard = 0x11,
    UsbCBridge = 0x12,
    Wireless = 0xE0,      // Bluetooth, WiFi adapters
    Misc = 0xEF,
    Application = 0xFE,
    VendorSpec = 0xFF,
}

/// High-risk classes that can execute commands or create network interfaces
const HIGH_RISK_CLASSES: &[UsbClass] = &[
    UsbClass::Hid,       // Can type commands (BadUSB attack vector)
    UsbClass::Wireless,  // Can create network interfaces
];

/// Low-risk classes that are generally safe
const LOW_RISK_CLASSES: &[UsbClass] = &[
    UsbClass::Hub,     // Just provides more ports
    UsbClass::Audio,   // Speakers, microphones
    UsbClass::Video,   // Webcams
    UsbClass::Printer, // Printers
];

USB Device Info Structure

From Python implementation (usb_device.py):

pub struct UsbDeviceInfo {
    // Core identification
    pub vendor_id: String,      // e.g., "046d" (Logitech)
    pub product_id: String,     // e.g., "c52b"
    pub vendor_name: String,    // e.g., "Logitech, Inc."
    pub product_name: String,   // e.g., "Unifying Receiver"
    
    // Classification
    pub device_class: u8,           // USB class code (e.g., 0x03 for HID)
    pub interface_classes: Vec<u8>, // Classes at interface level
    
    // Unique identification
    pub serial: Option<String>,     // Unique serial number (if available)
    pub bus_id: String,              // e.g., "1-2.3" (sysfs path component)
    
    // Computed at creation
    pub bus_num: u32,                // USB bus number
    pub dev_num: u32,                // Device number on bus
}

impl UsbDeviceInfo {
    /// Unique identifier for this exact device (includes serial)
    pub fn unique_id(&self) -> String {
        let serial_part = self.serial.as_deref().unwrap_or("no-serial");
        format!("{}:{}:{}", self.vendor_id, self.product_id, serial_part)
    }
    
    /// Identifier for this model (ignores serial)
    pub fn model_id(&self) -> String {
        format!("{}:{}", self.vendor_id, self.product_id)
    }
    
    pub fn is_high_risk(&self) -> bool {
        HIGH_RISK_CLASSES.contains(&UsbClass::from(self.device_class))
            || self.interface_classes.iter().any(|c| HIGH_RISK_CLASSES.contains(&UsbClass::from(*c)))
    }
}

USB Monitoring with udev Crate

The Rust udev crate provides native access to udev events:

use udev::{MonitorBuilder, MonitorSocket};

pub struct UsbMonitor {
    socket: MonitorSocket,
}

impl UsbMonitor {
    pub fn new() -> Result<Self, Error> {
        let socket = MonitorBuilder::new()?
            .match_subsystem("usb")?
            .listen()?;
        Ok(Self { socket })
    }
    
    /// Process events (call in a loop or thread)
    pub fn poll_event(&mut self) -> Option<(UsbDeviceInfo, UsbAction)> {
        if let Some(event) = self.socket.iter().next() {
            let action = match event.action() {
                Some(a) if a == "add" => UsbAction::Add,
                Some(a) if a == "remove" => UsbAction::Remove,
                _ => return None,
            };
            
            // Skip interfaces (e.g., "3-4:1.0") - only handle devices
            if event.sysname().to_string_lossy().contains(':') {
                return None;
            }
            
            let device_info = self.extract_device_info(&event)?;
            return Some((device_info, action));
        }
        None
    }
    
    fn extract_device_info(&self, device: &udev::Device) -> Option<UsbDeviceInfo> {
        let vendor_id = device.property_value("ID_VENDOR_ID")?.to_string_lossy().to_string();
        let product_id = device.property_value("ID_MODEL_ID")?.to_string_lossy().to_string();
        
        let vendor_name = device.property_value("ID_VENDOR_FROM_DATABASE")
            .or_else(|| device.property_value("ID_VENDOR"))
            .map(|v| v.to_string_lossy().replace('_', " "))
            .unwrap_or_else(|| "Unknown Vendor".to_string());
        
        let product_name = device.property_value("ID_MODEL_FROM_DATABASE")
            .or_else(|| device.property_value("ID_MODEL"))
            .map(|v| v.to_string_lossy().replace('_', " "))
            .unwrap_or_else(|| "Unknown Device".to_string());
        
        // Parse device class from sysfs attribute
        let device_class = device.attribute_value("bDeviceClass")
            .and_then(|v| u8::from_str_radix(&v.to_string_lossy(), 16).ok())
            .unwrap_or(0);
        
        // Serial number (sanitize!)
        let serial = device.property_value("ID_SERIAL")
            .or_else(|| device.property_value("ID_SERIAL_SHORT"))
            .map(|v| sanitize_serial(&v.to_string_lossy()));
        
        Some(UsbDeviceInfo {
            vendor_id: sanitize_hex_id(&vendor_id),
            product_id: sanitize_hex_id(&product_id),
            vendor_name,
            product_name,
            device_class,
            interface_classes: vec![], // Populate by iterating children
            serial,
            bus_id: device.sysname().to_string_lossy().to_string(),
            bus_num: device.property_value("BUSNUM").and_then(|v| v.to_string_lossy().parse().ok()).unwrap_or(0),
            dev_num: device.property_value("DEVNUM").and_then(|v| v.to_string_lossy().parse().ok()).unwrap_or(0),
        })
    }
}

pub enum UsbAction {
    Add,
    Remove,
}

USB Rule Storage

From Python implementation (usb_rules.py):

Rule Structure

#[derive(Serialize, Deserialize, Clone)]
pub struct UsbRule {
    pub verdict: Verdict,           // "allow" or "block"
    pub vendor_id: String,          // e.g., "046d"
    pub product_id: String,         // e.g., "c52b"
    pub vendor_name: String,        // e.g., "Logitech, Inc."
    pub product_name: String,       // e.g., "Unifying Receiver"
    pub scope: Scope,               // "device", "model", or "vendor"
    pub added: String,              // ISO timestamp
    pub last_seen: Option<String>,  // ISO timestamp
    pub serial: Option<String>,     // Only for device scope
}

#[derive(Serialize, Deserialize, Clone, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Verdict {
    Allow,
    Block,
}

#[derive(Serialize, Deserialize, Clone, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
    Device,  // Exact device (by serial)
    Model,   // All devices of this model (vendor:product)
    Vendor,  // All devices from this vendor
}

Rule Key Format

Scope Key Format Example
Device {vendor_id}:{product_id}:{serial} 046d:c52b:ABC123
Model {vendor_id}:{product_id}:* 046d:c52b:*
Vendor {vendor_id}:*:* 046d:*:*

Rule Lookup Priority

  1. Device (most specific) - exact match by serial
  2. Model - match by vendor:product
  3. Vendor (least specific) - match by vendor only

Storage File

  • Path: /etc/bastion/usb_rules.json
  • Permissions: 0640 (owner rw, group r)
  • Atomic writes: Write to temp file, then rename

USB Authorization via sysfs

From Python implementation (usb_rules.py - USBAuthorizer class):

const SYSFS_USB_PATH: &str = "/sys/bus/usb/devices";

pub struct UsbAuthorizer;

impl UsbAuthorizer {
    /// Get authorization file path for device
    fn get_auth_path(bus_id: &str) -> Result<PathBuf, Error> {
        // Validate bus_id format (e.g., "1-2.3")
        if !bus_id.chars().all(|c| c.is_ascii_digit() || c == '-' || c == '.') {
            return Err(Error::InvalidBusId);
        }
        if bus_id.contains("..") {
            return Err(Error::PathTraversal);
        }
        Ok(PathBuf::from(SYSFS_USB_PATH).join(bus_id).join("authorized"))
    }
    
    /// Authorize (enable) a USB device
    pub fn authorize(bus_id: &str) -> Result<(), Error> {
        let path = Self::get_auth_path(bus_id)?;
        std::fs::write(&path, "1")?;
        info!("Authorized USB device: {}", bus_id);
        Ok(())
    }
    
    /// Deauthorize (disable) a USB device
    /// WARNING: This immediately disconnects the device!
    pub fn deauthorize(bus_id: &str) -> Result<(), Error> {
        let path = Self::get_auth_path(bus_id)?;
        std::fs::write(&path, "0")?;
        info!("Deauthorized USB device: {}", bus_id);
        Ok(())
    }
    
    /// Set default policy for new USB devices
    pub fn set_default_policy(authorize: bool) -> Result<(), Error> {
        let value = if authorize { "1" } else { "0" };
        let sysfs_path = PathBuf::from(SYSFS_USB_PATH);
        
        for entry in std::fs::read_dir(&sysfs_path)? {
            let entry = entry?;
            let name = entry.file_name();
            if name.to_string_lossy().starts_with("usb") {
                let auth_default = entry.path().join("authorized_default");
                if auth_default.exists() {
                    std::fs::write(&auth_default, value)?;
                }
            }
        }
        Ok(())
    }
}

Input Validation

From Python implementation (usb_validation.py):

Field Max Length Allowed Characters Example
Vendor ID 4 [0-9a-f] 046d
Product ID 4 [0-9a-f] c52b
Serial 128 [A-Za-z0-9._-] ABC123-456
Rule Key 256 Pattern validated 046d:c52b:*
Vendor/Product Name 256 Printable (no control chars) Logitech, Inc.
use regex::Regex;
use lazy_static::lazy_static;

lazy_static! {
    static ref SERIAL_SAFE_CHARS: Regex = Regex::new(r"[^0-9A-Za-z._-]").unwrap();
    static ref HEX_CHARS: Regex = Regex::new(r"[^0-9a-fA-F]").unwrap();
    static ref KEY_PATTERN: Regex = Regex::new(
        r"^[0-9a-f]{4}:[0-9a-f]{4}:[A-Za-z0-9._-]+$|^[0-9a-f]{4}:[0-9a-f]{4}:\*$|^[0-9a-f]{4}:\*:\*$"
    ).unwrap();
}

pub fn sanitize_hex_id(hex_id: &str) -> String {
    let clean: String = HEX_CHARS.replace_all(hex_id, "").to_lowercase();
    let truncated = &clean[..clean.len().min(4)];
    format!("{:0>4}", truncated)
}

pub fn sanitize_serial(serial: &str) -> String {
    let clean: String = SERIAL_SAFE_CHARS.replace_all(serial, "").to_string();
    if clean.is_empty() {
        "no-serial".to_string()
    } else {
        clean[..clean.len().min(128)].to_string()
    }
}

pub fn validate_key(key: &str) -> bool {
    key.len() <= 256 && KEY_PATTERN.is_match(key)
}

IPC Protocol Extension

Extend the existing daemon-GUI IPC to support USB messages:

USB Prompt Request (Daemon → GUI)

{
  "type": "usb_request",
  "nonce": "abc123",
  "device": {
    "vendor_id": "046d",
    "product_id": "c52b",
    "vendor_name": "Logitech, Inc.",
    "product_name": "Unifying Receiver",
    "device_class": 3,
    "is_high_risk": true,
    "serial": "ABC123",
    "bus_id": "1-2.3"
  }
}

USB Prompt Response (GUI → Daemon)

{
  "type": "usb_response",
  "nonce": "abc123",
  "verdict": "allow",
  "scope": "device",
  "permanent": true
}

GUI Integration

Reuse the PyQt6 dialog from PR #6 (usb_gui.py):

  • USBPromptDialog: Popup for new device allow/block decision

    • Shows device name, vendor, type, ID
    • High-risk warning for HID/wireless devices
    • Scope selection (device/model/vendor)
    • Allow Once / Allow Always / Block Once / Block Always buttons
    • 30-second timeout → auto-block (configurable)
  • USBControlWidget: Control panel page for USB management

    • Protection status toggle (enabled/disabled)
    • Allowed devices table
    • Blocked devices table
    • Delete rule functionality

Security Model

Threat Model

Threat Mitigation
BadUSB (malicious keyboard) HID devices flagged as high-risk, require explicit approval
Unauthorized USB access Default-deny policy, all devices blocked until approved
Symlink attacks on sysfs Path validation, reject .. and symlinks
Injection in rule keys Strict regex validation, sanitize all inputs
TOCTOU on rule files Atomic writes (temp file + rename)
Privilege escalation Daemon runs as root; GUI as user

Privilege Separation

┌─────────────────────────────────┐
│ USER SPACE (normal user)        │
│  bastion-gui (PyQt6)            │
│    └── Shows prompts            │
│    └── Sends decisions via IPC  │
└─────────────────────────────────┘
         │ Unix socket (0666)
         ▼
┌─────────────────────────────────┐
│ ROOT SPACE                       │
│  bastion-daemon (Rust)          │
│    └── Monitors USB (udev)      │
│    └── Manages rules (/etc/)    │
│    └── Controls sysfs           │
└─────────────────────────────────┘

Implementation Plan

Phase 1: Core USB Module (~4 hours)

  • Create bastion-rs/src/usb/mod.rs module
  • Implement UsbDeviceInfo struct with classification
  • Implement UsbClass enum with risk levels
  • Implement UsbMonitor using udev crate
  • Add unit tests for device classification

Phase 2: Rule Management (~3 hours)

  • Implement UsbRule and UsbRuleManager
  • Implement key generation and validation
  • Implement atomic file storage with proper permissions
  • Add rule lookup with scope priority
  • Add unit tests for rule matching and persistence

Phase 3: sysfs Authorization (~2 hours)

  • Implement UsbAuthorizer with path validation
  • Implement authorize/deauthorize device functions
  • Implement set_default_policy for boot-time config
  • Add integration tests

Phase 4: Daemon Integration (~3 hours)

  • Add USB monitor thread to daemon
  • Extend IPC protocol for USB messages
  • Handle USB events: check rules → prompt GUI if unknown
  • Apply verdict via sysfs authorization
  • Add SIGHUP handler to reload USB rules

Phase 5: GUI Integration (~3 hours)

Phase 6: Packaging & Testing (~2 hours)

  • Add libudev-dev to build dependencies
  • Update systemd service for USB access
  • Add integration tests
  • Update documentation

Dependencies

Rust Crates

[dependencies]
udev = "0.8"           # Native udev bindings
regex = "1"            # Input validation
lazy_static = "1"      # Static regex compilation
chrono = "0.4"         # Timestamps

System Dependencies

# For building
sudo apt install libudev-dev

# Runtime (already present on most systems)
# - udev
# - sysfs mounted at /sys

Estimated Effort

Phase Hours
Core USB Module 4
Rule Management 3
sysfs Authorization 2
Daemon Integration 3
GUI Integration 3
Packaging & Testing 2
Total ~17 hours

References

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions