|
4 | 4 | import gc |
5 | 5 | import importlib |
6 | 6 | import sys |
| 7 | +from typing import Optional |
| 8 | + |
| 9 | +class UsbError(Exception): |
| 10 | + """Base class for USB-related errors.""" |
| 11 | + pass |
| 12 | + |
| 13 | +class UsbDevice: |
| 14 | + """USB device wrapper class.""" |
| 15 | + |
| 16 | + def __init__(self, device: usb.core.Device): |
| 17 | + """Initialize USB device. |
| 18 | + |
| 19 | + Args: |
| 20 | + device: PyUSB device object |
| 21 | + """ |
| 22 | + self.device = device |
| 23 | + |
| 24 | + # Set default configuration |
| 25 | + try: |
| 26 | + self.device.set_configuration() |
| 27 | + except usb.core.USBError as e: |
| 28 | + raise UsbError(f"Failed to set configuration: {e}") |
| 29 | + |
| 30 | + def write(self, data: bytes) -> None: |
| 31 | + """Write data to device. |
| 32 | + |
| 33 | + Args: |
| 34 | + data: Data to write |
| 35 | + |
| 36 | + Raises: |
| 37 | + UsbError: On write error |
| 38 | + """ |
| 39 | + try: |
| 40 | + self.device.write(0x1, data) # EP1 OUT |
| 41 | + except usb.core.USBError as e: |
| 42 | + raise UsbError(f"Write failed: {e}") |
| 43 | + |
| 44 | + def read(self, buffer: bytearray) -> None: |
| 45 | + """Read data from device into buffer. |
| 46 | + |
| 47 | + Args: |
| 48 | + buffer: Buffer to read into |
| 49 | + |
| 50 | + Raises: |
| 51 | + UsbError: On read error |
| 52 | + """ |
| 53 | + try: |
| 54 | + read_data = self.device.read(0x81, len(buffer)) # EP1 IN |
| 55 | + buffer[:len(read_data)] = read_data |
| 56 | + except usb.core.USBError as e: |
| 57 | + raise UsbError(f"Read failed: {e}") |
7 | 58 |
|
8 | 59 | class SnagbootUSBContext(): |
9 | 60 | """ |
@@ -67,4 +118,3 @@ def find(**args): |
67 | 118 | tests = (hasattr(dev, key) and val == getattr(dev, key) for key, val in args.items()) |
68 | 119 | if all(tests): |
69 | 120 | yield weakref.proxy(dev) |
70 | | - |
|
0 commit comments