From e9e4faff2e33ff0d72ae5fe267bf01e4fd1a6bc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20C=C3=B4t=C3=A9?= Date: Sun, 1 Mar 2026 16:15:35 -0500 Subject: [PATCH 1/9] Fix DebugPort bugs and add multi-endpoint support Fix flush() writing to nonexistent attribute, fix endPointIndex unbound when endPoint is not None, add numberOfEndPoints parameter for USB devices, add defaultTimeout attribute, remove broken DebugDataCommand. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/communication/debugport.py | 63 +++++++--------------- 1 file changed, 20 insertions(+), 43 deletions(-) diff --git a/hardwarelibrary/communication/debugport.py b/hardwarelibrary/communication/debugport.py index b7ebe45..e13545e 100644 --- a/hardwarelibrary/communication/debugport.py +++ b/hardwarelibrary/communication/debugport.py @@ -4,79 +4,57 @@ from hardwarelibrary.communication import * from threading import Thread, Lock -class DebugDataCommand(Command): - def __init__(self, name, dataHexRegex = None, unpackingMask = None, endPoints = (None, None)): - Command.__init__(self, name, endPoints=endPoints) - self.data : bytearray = data - self.dataHexRegex: str = dataHexRegex - self.unpackingMask:str = unpackingMask - - def send(self, port) -> bool: - try: - self.isSent = True - nBytes = port.writeData(data=self.data, endPoint=self.endPoints[0]) - if self.replyDataLength > 0: - self.reply = port.readData(length=self.replyDataLength) - elif self.replyHexRegex is not None: - raise NotImplementedError("DataCommand reply pattern not implemented") - # self.reply = port.readData(length=self.replyDataLength) - self.isSentSuccessfully = True - except Exception as err: - - self.exceptions.append(err) - self.isSentSuccessfully = False - raise(err) - - return False - class DebugPort(CommunicationPort): - def __init__(self, delay=0): - self.inputBuffers = [bytearray()] - self.outputBuffers = [bytearray()] + def __init__(self, delay=0, numberOfEndPoints=1): + self.inputBuffers = [bytearray() for _ in range(numberOfEndPoints)] + self.outputBuffers = [bytearray() for _ in range(numberOfEndPoints)] self.delay = delay + self.defaultTimeout = 500 self._isOpen = False super(DebugPort, self).__init__() @property def isOpen(self): - return self._isOpen + return self._isOpen def open(self): if self._isOpen: - raise Exception() + raise Exception("Port already open") self._isOpen = True - return def close(self): self._isOpen = False - return def bytesAvailable(self, endPoint=0): - return len(self.outputBuffers[endPoint]) + endPointIndex = endPoint if endPoint is not None else 0 + return len(self.outputBuffers[endPointIndex]) def flush(self): - self.buffers = [bytearray(),bytearray()] + for i in range(len(self.inputBuffers)): + self.inputBuffers[i] = bytearray() + for i in range(len(self.outputBuffers)): + self.outputBuffers[i] = bytearray() def readData(self, length, endPoint=None): - if endPoint is None: - endPointIndex = 0 + endPointIndex = endPoint if endPoint is not None else 0 with self.portLock: - time.sleep(self.delay*random.random()) + if self.delay > 0: + time.sleep(self.delay * random.random()) + data = bytearray() - for i in range(0, length): + for i in range(length): if len(self.outputBuffers[endPointIndex]) > 0: byte = self.outputBuffers[endPointIndex].pop(0) data.append(byte) else: - raise CommunicationReadTimeout("Unable to read data") + raise CommunicationReadTimeout("Unable to read {0} bytes, only {1} available".format(length, len(data))) return data def writeData(self, data, endPoint=None): - if endPoint is None: - endPointIndex = 0 + endPointIndex = endPoint if endPoint is not None else 0 with self.portLock: self.inputBuffers[endPointIndex].extend(data) @@ -85,12 +63,11 @@ def writeData(self, data, endPoint=None): return len(data) - def writeToOutputBuffer(self, data, endPointIndex): + def writeToOutputBuffer(self, data, endPointIndex=0): self.outputBuffers[endPointIndex].extend(data) def processInputBuffers(self, endPointIndex): # We default to ECHO for simplicity - inputBytes = self.inputBuffers[endPointIndex] # Do something, here we do an Echo From e6e88adb1fa6a526cb3beddbbd8f2b0c2b7c1545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20C=C3=B4t=C3=A9?= Date: Sun, 1 Mar 2026 16:40:00 -0500 Subject: [PATCH 2/9] Add TableDrivenDebugPort for declarative mock device creation Replace manual if/elif byte-parsing in debug ports with a table-driven approach: declare command entries (binary or text) and implement only device logic in process_command(). Migrate Sutter's DebugSerialPort, create Intellidrive's missing DebugSerialPort, and add 12 new tests. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/communication/__init__.py | 2 +- hardwarelibrary/communication/debugport.py | 85 +++++++- hardwarelibrary/motion/intellidrivedevice.py | 28 ++- hardwarelibrary/motion/sutterdevice.py | 41 ++-- .../tests/testTableDrivenDebugPort.py | 198 ++++++++++++++++++ 5 files changed, 327 insertions(+), 27 deletions(-) create mode 100644 hardwarelibrary/tests/testTableDrivenDebugPort.py diff --git a/hardwarelibrary/communication/__init__.py b/hardwarelibrary/communication/__init__.py index bc0034e..72a9f89 100644 --- a/hardwarelibrary/communication/__init__.py +++ b/hardwarelibrary/communication/__init__.py @@ -2,7 +2,7 @@ from .serialport import SerialPort from .usbport import USBPort from .diagnostics import USBParameters, DeviceCommand, USBDeviceDescription -from .debugport import DebugPort +from .debugport import DebugPort, TableDrivenDebugPort, BinaryCommandEntry, TextCommandEntry from .echoport import DebugEchoPort import usb.backend.libusb1 import platform diff --git a/hardwarelibrary/communication/debugport.py b/hardwarelibrary/communication/debugport.py index e13545e..9391645 100644 --- a/hardwarelibrary/communication/debugport.py +++ b/hardwarelibrary/communication/debugport.py @@ -1,9 +1,28 @@ import time import random +import struct +import re +from dataclasses import dataclass from hardwarelibrary.communication import * from threading import Thread, Lock + +@dataclass +class BinaryCommandEntry: + name: str + prefix: bytes + request_format: str = None + request_length: int = None + response_format: str = None + + +@dataclass +class TextCommandEntry: + name: str + pattern: str + response: str = None + class DebugPort(CommunicationPort): def __init__(self, delay=0, numberOfEndPoints=1): self.inputBuffers = [bytearray() for _ in range(numberOfEndPoints)] @@ -72,4 +91,68 @@ def processInputBuffers(self, endPointIndex): # Do something, here we do an Echo self.writeToOutputBuffer(inputBytes, endPointIndex) - self.inputBuffers[endPointIndex] = bytearray() \ No newline at end of file + self.inputBuffers[endPointIndex] = bytearray() + + +class TableDrivenDebugPort(DebugPort): + def __init__(self, delay=0, numberOfEndPoints=1): + super().__init__(delay=delay, numberOfEndPoints=numberOfEndPoints) + self.binary_commands = [] + self.text_commands = [] + + def processInputBuffers(self, endPointIndex): + inputBytes = self.inputBuffers[endPointIndex] + if len(inputBytes) == 0: + return + + for cmd in self.binary_commands: + prefix_len = len(cmd.prefix) + if inputBytes[:prefix_len].upper() == cmd.prefix.upper(): + if cmd.request_format is not None: + unpack_len = struct.calcsize(cmd.request_format) + params = struct.unpack(cmd.request_format, bytes(inputBytes[:unpack_len])) + else: + params = () + + result = self.process_command(cmd.name, params, endPointIndex) + self._write_binary_response(result, cmd, endPointIndex) + self.inputBuffers[endPointIndex] = bytearray() + return + + inputStr = inputBytes.decode('utf-8', errors='replace') + for cmd in self.text_commands: + match = re.match(cmd.pattern, inputStr) + if match: + params = match.groups() + result = self.process_command(cmd.name, params, endPointIndex) + self._write_text_response(result, cmd, endPointIndex) + self.inputBuffers[endPointIndex] = bytearray() + return + + print("Unrecognized command: {0}".format(inputBytes)) + self.inputBuffers[endPointIndex] = bytearray() + + def _write_binary_response(self, result, cmd, endPointIndex): + if result is None: + return + elif isinstance(result, (bytes, bytearray)): + self.writeToOutputBuffer(bytearray(result), endPointIndex) + elif isinstance(result, str): + self.writeToOutputBuffer(bytearray(result.encode('utf-8')), endPointIndex) + elif isinstance(result, tuple) and cmd.response_format is not None: + data = struct.pack(cmd.response_format, *result) + self.writeToOutputBuffer(bytearray(data), endPointIndex) + + def _write_text_response(self, result, cmd, endPointIndex): + if result is None: + return + elif isinstance(result, (bytes, bytearray)): + self.writeToOutputBuffer(bytearray(result), endPointIndex) + elif isinstance(result, str): + self.writeToOutputBuffer(bytearray(result.encode('utf-8')), endPointIndex) + elif isinstance(result, tuple) and cmd.response is not None: + formatted = cmd.response.format(*result) + self.writeToOutputBuffer(bytearray(formatted.encode('utf-8')), endPointIndex) + + def process_command(self, name, params, endPointIndex): + raise NotImplementedError("Subclasses must implement process_command") \ No newline at end of file diff --git a/hardwarelibrary/motion/intellidrivedevice.py b/hardwarelibrary/motion/intellidrivedevice.py index d94a5b9..aa76f72 100644 --- a/hardwarelibrary/motion/intellidrivedevice.py +++ b/hardwarelibrary/motion/intellidrivedevice.py @@ -1,7 +1,7 @@ from hardwarelibrary.motion.rotationdevice import * from hardwarelibrary.communication.serialport import SerialPort from hardwarelibrary.communication.commands import DataCommand -from hardwarelibrary.communication.debugport import DebugPort +from hardwarelibrary.communication.debugport import TableDrivenDebugPort, TextCommandEntry import time from struct import * @@ -117,3 +117,29 @@ def doHome(self): if not self.isReferenced(): raise Exception("Homing failed") + + class DebugSerialPort(TableDrivenDebugPort): + def __init__(self): + super().__init__() + self.registers = {'0xc9': 0} + self.text_commands = [ + TextCommandEntry(name='set_register', pattern=r's r(0x[0-9a-fA-F]+) (-?\d+)[\r\n]'), + TextCommandEntry(name='get_register', pattern=r'g r(0x[0-9a-fA-F]+)[\r\n]'), + TextCommandEntry(name='trajectory', pattern=r't (\d+)[\r\n]'), + ] + + def process_command(self, name, params, endPointIndex): + if name == 'set_register': + register, value = params + self.registers[register] = int(value) + return "ok\r" + elif name == 'get_register': + register = params[0] + value = self.registers.get(register, 0) + return "v {}\r".format(value) + elif name == 'trajectory': + mode = int(params[0]) + if mode == 2: # home + self.registers['0xca'] = 0 + self.registers['0xc9'] = (1 << 12) # referenced bit + return "ok\r" diff --git a/hardwarelibrary/motion/sutterdevice.py b/hardwarelibrary/motion/sutterdevice.py index 5625bf1..556ad12 100644 --- a/hardwarelibrary/motion/sutterdevice.py +++ b/hardwarelibrary/motion/sutterdevice.py @@ -4,7 +4,7 @@ from hardwarelibrary.communication.usbport import USBPort from hardwarelibrary.communication.serialport import SerialPort from hardwarelibrary.communication.commands import DataCommand -from hardwarelibrary.communication.debugport import DebugPort +from hardwarelibrary.communication.debugport import TableDrivenDebugPort, BinaryCommandEntry import re import time @@ -143,31 +143,24 @@ def work(self): raise Exception(f"Expected carriage return, but got {replyBytes} instead.") - class DebugSerialPort(DebugPort): + class DebugSerialPort(TableDrivenDebugPort): def __init__(self): super().__init__() self.xSteps = 0 self.ySteps = 0 self.zSteps = 0 - - def processInputBuffers(self, endPointIndex): - inputBytes = self.inputBuffers[endPointIndex] - - if inputBytes[0] == b'm'[0] or inputBytes[0] == b'M'[0]: - x,y,z = unpack(" Date: Sun, 1 Mar 2026 17:18:01 -0500 Subject: [PATCH 3/9] Unify command recognition: Command objects now serve both send and mock sides Add matches(), extractParams(), formatResponse() to Command/TextCommand/DataCommand so the same objects define both the client protocol and debug port recognition. Remove BinaryCommandEntry/TextCommandEntry dataclasses from debugport.py, replacing them with a single polymorphic loop over Command objects in TableDrivenDebugPort. Update SutterDevice and IntellidriveDevice to use class-level commands dicts. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/communication/__init__.py | 2 +- hardwarelibrary/communication/commands.py | 99 ++++++++++++- hardwarelibrary/communication/debugport.py | 70 +-------- hardwarelibrary/motion/intellidrivedevice.py | 31 ++-- hardwarelibrary/motion/sutterdevice.py | 25 ++-- .../tests/testCommandRecognition.py | 139 ++++++++++++++++++ .../tests/testTableDrivenDebugPort.py | 46 +++--- 7 files changed, 297 insertions(+), 115 deletions(-) create mode 100644 hardwarelibrary/tests/testCommandRecognition.py diff --git a/hardwarelibrary/communication/__init__.py b/hardwarelibrary/communication/__init__.py index 72a9f89..69ef712 100644 --- a/hardwarelibrary/communication/__init__.py +++ b/hardwarelibrary/communication/__init__.py @@ -2,7 +2,7 @@ from .serialport import SerialPort from .usbport import USBPort from .diagnostics import USBParameters, DeviceCommand, USBDeviceDescription -from .debugport import DebugPort, TableDrivenDebugPort, BinaryCommandEntry, TextCommandEntry +from .debugport import DebugPort, TableDrivenDebugPort from .echoport import DebugEchoPort import usb.backend.libusb1 import platform diff --git a/hardwarelibrary/communication/commands.py b/hardwarelibrary/communication/commands.py index 99cd729..3d6d257 100644 --- a/hardwarelibrary/communication/commands.py +++ b/hardwarelibrary/communication/commands.py @@ -1,4 +1,5 @@ import re +import struct class Command: def __init__(self, name:str, endPoints = (None, None)): @@ -26,7 +27,7 @@ def matchAsFloat(self, index=0): if self.matchGroups is not None: return float(self.matchGroups[index]) return None - + @property def hasError(self): return len(self.exceptions) != 0 @@ -34,14 +35,33 @@ def hasError(self): def send(self, port) -> bool: raise NotImplementedError("Subclasses must implement the send() command") + def matches(self, inputBytes): + return False + + def extractParams(self, inputBytes): + return () + + def formatResponse(self, result): + if result is None: + return None + elif isinstance(result, (bytes, bytearray)): + return bytearray(result) + elif isinstance(result, str): + return bytearray(result.encode('utf-8')) + return None + class TextCommand(Command): - def __init__(self, name, text, replyPattern = None, - alternatePattern = None, - endPoints = (None, None)): + def __init__(self, name, text, replyPattern = None, + alternatePattern = None, + endPoints = (None, None), + matchPattern = None, + responseTemplate = None): Command.__init__(self, name, endPoints=endPoints) self.text : str = text self.replyPattern: str = replyPattern self.alternatePattern: str = alternatePattern + self.matchPattern: str = matchPattern + self.responseTemplate: str = responseTemplate @property def payload(self): @@ -54,6 +74,41 @@ def numberOfArguments(self): return 0 return len(match.groups()) + @property + def _autoMatchPattern(self): + parts = re.split(r'\{[^}]*\}', self.text) + escaped = [re.escape(p) for p in parts] + return '(.+?)'.join(escaped) + + @property + def effectiveMatchPattern(self): + if self.matchPattern is not None: + return self.matchPattern + return self._autoMatchPattern + + def matches(self, inputBytes): + try: + inputStr = inputBytes.decode('utf-8', errors='replace') + except Exception: + return False + return re.match(self.effectiveMatchPattern, inputStr) is not None + + def extractParams(self, inputBytes): + try: + inputStr = inputBytes.decode('utf-8', errors='replace') + except Exception: + return () + match = re.match(self.effectiveMatchPattern, inputStr) + if match: + return match.groups() + return () + + def formatResponse(self, result): + if isinstance(result, tuple) and self.responseTemplate is not None: + formatted = self.responseTemplate.format(*result) + return bytearray(formatted.encode('utf-8')) + return super().formatResponse(result) + def send(self, port, params=None) -> bool: try: if params is not None: @@ -154,17 +209,51 @@ def send(self, port, params=None) -> bool: class DataCommand(Command): - def __init__(self, name, data, replyHexRegex = None, replyDataLength = 0, unpackingMask = None, endPoints = (None, None)): + def __init__(self, name, data=None, replyHexRegex = None, replyDataLength = 0, unpackingMask = None, endPoints = (None, None), + prefix = None, requestFormat = None, responseFormat = None): Command.__init__(self, name, endPoints=endPoints) self.data : bytearray = data self.replyHexRegex: str = replyHexRegex self.replyDataLength: int = replyDataLength self.unpackingMask:str = unpackingMask + self._prefix: bytes = prefix + self.requestFormat: str = requestFormat + self.responseFormat: str = responseFormat @property def payload(self): return self.data + @property + def effectivePrefix(self): + if self._prefix is not None: + return self._prefix + if self.data is not None and len(self.data) > 0: + return self.data[0:1] + return None + + def matches(self, inputBytes): + prefix = self.effectivePrefix + if prefix is None: + return False + prefixLen = len(prefix) + if len(inputBytes) < prefixLen: + return False + return inputBytes[:prefixLen].upper() == prefix.upper() + + def extractParams(self, inputBytes): + if self.requestFormat is not None: + unpackLen = struct.calcsize(self.requestFormat) + if len(inputBytes) >= unpackLen: + return struct.unpack(self.requestFormat, bytes(inputBytes[:unpackLen])) + return () + + def formatResponse(self, result): + if isinstance(result, tuple) and self.responseFormat is not None: + data = struct.pack(self.responseFormat, *result) + return bytearray(data) + return super().formatResponse(result) + def send(self, port) -> bool: try: nBytes = port.writeData(data=self.data, endPoint=self.endPoints[0]) diff --git a/hardwarelibrary/communication/debugport.py b/hardwarelibrary/communication/debugport.py index 9391645..6036132 100644 --- a/hardwarelibrary/communication/debugport.py +++ b/hardwarelibrary/communication/debugport.py @@ -2,27 +2,10 @@ import random import struct import re -from dataclasses import dataclass from hardwarelibrary.communication import * from threading import Thread, Lock - -@dataclass -class BinaryCommandEntry: - name: str - prefix: bytes - request_format: str = None - request_length: int = None - response_format: str = None - - -@dataclass -class TextCommandEntry: - name: str - pattern: str - response: str = None - class DebugPort(CommunicationPort): def __init__(self, delay=0, numberOfEndPoints=1): self.inputBuffers = [bytearray() for _ in range(numberOfEndPoints)] @@ -95,64 +78,27 @@ def processInputBuffers(self, endPointIndex): class TableDrivenDebugPort(DebugPort): - def __init__(self, delay=0, numberOfEndPoints=1): + def __init__(self, delay=0, numberOfEndPoints=1, commands=None): super().__init__(delay=delay, numberOfEndPoints=numberOfEndPoints) - self.binary_commands = [] - self.text_commands = [] + self.commands = commands if commands is not None else {} def processInputBuffers(self, endPointIndex): inputBytes = self.inputBuffers[endPointIndex] if len(inputBytes) == 0: return - for cmd in self.binary_commands: - prefix_len = len(cmd.prefix) - if inputBytes[:prefix_len].upper() == cmd.prefix.upper(): - if cmd.request_format is not None: - unpack_len = struct.calcsize(cmd.request_format) - params = struct.unpack(cmd.request_format, bytes(inputBytes[:unpack_len])) - else: - params = () - + for cmd in self.commands.values(): + if cmd.matches(inputBytes): + params = cmd.extractParams(inputBytes) result = self.process_command(cmd.name, params, endPointIndex) - self._write_binary_response(result, cmd, endPointIndex) - self.inputBuffers[endPointIndex] = bytearray() - return - - inputStr = inputBytes.decode('utf-8', errors='replace') - for cmd in self.text_commands: - match = re.match(cmd.pattern, inputStr) - if match: - params = match.groups() - result = self.process_command(cmd.name, params, endPointIndex) - self._write_text_response(result, cmd, endPointIndex) + response = cmd.formatResponse(result) + if response is not None: + self.writeToOutputBuffer(response, endPointIndex) self.inputBuffers[endPointIndex] = bytearray() return print("Unrecognized command: {0}".format(inputBytes)) self.inputBuffers[endPointIndex] = bytearray() - def _write_binary_response(self, result, cmd, endPointIndex): - if result is None: - return - elif isinstance(result, (bytes, bytearray)): - self.writeToOutputBuffer(bytearray(result), endPointIndex) - elif isinstance(result, str): - self.writeToOutputBuffer(bytearray(result.encode('utf-8')), endPointIndex) - elif isinstance(result, tuple) and cmd.response_format is not None: - data = struct.pack(cmd.response_format, *result) - self.writeToOutputBuffer(bytearray(data), endPointIndex) - - def _write_text_response(self, result, cmd, endPointIndex): - if result is None: - return - elif isinstance(result, (bytes, bytearray)): - self.writeToOutputBuffer(bytearray(result), endPointIndex) - elif isinstance(result, str): - self.writeToOutputBuffer(bytearray(result.encode('utf-8')), endPointIndex) - elif isinstance(result, tuple) and cmd.response is not None: - formatted = cmd.response.format(*result) - self.writeToOutputBuffer(bytearray(formatted.encode('utf-8')), endPointIndex) - def process_command(self, name, params, endPointIndex): raise NotImplementedError("Subclasses must implement process_command") \ No newline at end of file diff --git a/hardwarelibrary/motion/intellidrivedevice.py b/hardwarelibrary/motion/intellidrivedevice.py index aa76f72..328bf3d 100644 --- a/hardwarelibrary/motion/intellidrivedevice.py +++ b/hardwarelibrary/motion/intellidrivedevice.py @@ -1,7 +1,7 @@ from hardwarelibrary.motion.rotationdevice import * from hardwarelibrary.communication.serialport import SerialPort -from hardwarelibrary.communication.commands import DataCommand -from hardwarelibrary.communication.debugport import TableDrivenDebugPort, TextCommandEntry +from hardwarelibrary.communication.commands import TextCommand +from hardwarelibrary.communication.debugport import TableDrivenDebugPort import time from struct import * @@ -21,7 +21,19 @@ class State(Enum): class IntellidriveDevice(RotationDevice): classIdVendor = 0x0403 - classIdProduct = 0x6001 + classIdProduct = 0x6001 + + commands = { + "SET_REGISTER": TextCommand(name="SET_REGISTER", text="s r{0} {1}\r", + matchPattern=r's r(0x[0-9a-fA-F]+) (-?\d+)[\r\n]', + replyPattern="ok", responseTemplate="ok\r"), + "GET_REGISTER": TextCommand(name="GET_REGISTER", text="g r{0}\n", + matchPattern=r'g r(0x[0-9a-fA-F]+)[\r\n]', + replyPattern=r'v\s(-?\d+)', responseTemplate="v {0}\r"), + "TRAJECTORY": TextCommand(name="TRAJECTORY", text="t {0}\n", + matchPattern=r't (\d+)[\r\n]', + replyPattern="ok", responseTemplate="ok\r"), + } def __init__(self, serialNumber): super().__init__(serialNumber=serialNumber, idVendor=self.classIdVendor, idProduct=self.classIdProduct) @@ -120,24 +132,19 @@ def doHome(self): class DebugSerialPort(TableDrivenDebugPort): def __init__(self): - super().__init__() + super().__init__(commands=IntellidriveDevice.commands) self.registers = {'0xc9': 0} - self.text_commands = [ - TextCommandEntry(name='set_register', pattern=r's r(0x[0-9a-fA-F]+) (-?\d+)[\r\n]'), - TextCommandEntry(name='get_register', pattern=r'g r(0x[0-9a-fA-F]+)[\r\n]'), - TextCommandEntry(name='trajectory', pattern=r't (\d+)[\r\n]'), - ] def process_command(self, name, params, endPointIndex): - if name == 'set_register': + if name == 'SET_REGISTER': register, value = params self.registers[register] = int(value) return "ok\r" - elif name == 'get_register': + elif name == 'GET_REGISTER': register = params[0] value = self.registers.get(register, 0) return "v {}\r".format(value) - elif name == 'trajectory': + elif name == 'TRAJECTORY': mode = int(params[0]) if mode == 2: # home self.registers['0xca'] = 0 diff --git a/hardwarelibrary/motion/sutterdevice.py b/hardwarelibrary/motion/sutterdevice.py index 556ad12..0ae312e 100644 --- a/hardwarelibrary/motion/sutterdevice.py +++ b/hardwarelibrary/motion/sutterdevice.py @@ -4,7 +4,7 @@ from hardwarelibrary.communication.usbport import USBPort from hardwarelibrary.communication.serialport import SerialPort from hardwarelibrary.communication.commands import DataCommand -from hardwarelibrary.communication.debugport import TableDrivenDebugPort, BinaryCommandEntry +from hardwarelibrary.communication.debugport import TableDrivenDebugPort import re import time @@ -16,6 +16,16 @@ class SutterDevice(LinearMotionDevice): classIdVendor = 4930 classIdProduct = 1 + commands = { + "MOVE": DataCommand(name="MOVE", prefix=b'M', requestFormat=' Date: Sun, 1 Mar 2026 17:37:16 -0500 Subject: [PATCH 4/9] Add debug-mode tests for IntellidriveDevice and fix terminator Add TestIntellidriveDebugDevice with 9 tests exercising init, home, move, orientation, register setup, and status flags without hardware connected. Set terminator to b'\r' on DebugSerialPort so readString stops correctly on carriage-return-terminated responses. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/motion/intellidrivedevice.py | 1 + hardwarelibrary/tests/testIntellidrive.py | 50 ++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/hardwarelibrary/motion/intellidrivedevice.py b/hardwarelibrary/motion/intellidrivedevice.py index 328bf3d..2cf1cde 100644 --- a/hardwarelibrary/motion/intellidrivedevice.py +++ b/hardwarelibrary/motion/intellidrivedevice.py @@ -133,6 +133,7 @@ def doHome(self): class DebugSerialPort(TableDrivenDebugPort): def __init__(self): super().__init__(commands=IntellidriveDevice.commands) + self.terminator = b'\r' self.registers = {'0xc9': 0} def process_command(self, name, params, endPointIndex): diff --git a/hardwarelibrary/tests/testIntellidrive.py b/hardwarelibrary/tests/testIntellidrive.py index 00eeeb4..d06c101 100644 --- a/hardwarelibrary/tests/testIntellidrive.py +++ b/hardwarelibrary/tests/testIntellidrive.py @@ -129,6 +129,56 @@ def testHomeWithCommands(self): self.assertFalse( status & (1 << 14) ) port.close() +class TestIntellidriveDebugDevice(unittest.TestCase): + def setUp(self): + self.device = IntellidriveDevice(serialNumber="debug") + self.device.initializeDevice() + + def tearDown(self): + self.device.shutdownDevice() + self.device = None + + def testDeviceInitializes(self): + self.assertIsNotNone(self.device) + self.assertEqual(self.device.internalState.name, 'ready') + + def testDeviceHome(self): + self.device.home() + orientation = self.device.orientation() + self.assertAlmostEqual(orientation, 0, places=2) + + def testDeviceMove(self): + angle = 90 + self.device.moveTo(angle) + orientation = self.device.orientation() + self.assertAlmostEqual(orientation, angle, places=0) + + def testDeviceMoveToZero(self): + self.device.moveTo(180) + self.device.moveTo(0) + orientation = self.device.orientation() + self.assertAlmostEqual(orientation, 0, places=2) + + def testDeviceOrientation(self): + orientation = self.device.orientation() + self.assertIsNotNone(orientation) + self.assertAlmostEqual(orientation, 0, places=2) + + def testRegistersSetDuringInit(self): + port = self.device.port + self.assertEqual(port.registers.get('0x24'), 31) + self.assertEqual(port.registers.get('0xc2'), 514) + + def testIsReferenced(self): + self.assertTrue(self.device.isReferenced()) + + def testIsNotMoving(self): + self.assertFalse(self.device.isMoving()) + + def testIsNotHoming(self): + self.assertFalse(self.device.isHoming()) + + class TestIntellidrivePhysicalDevice(unittest.TestCase): def setUp(self): try: From 4ca4aa17c1915ef5be64bb6c028662b94b3e955c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20C=C3=B4t=C3=A9?= Date: Sun, 1 Mar 2026 17:57:12 -0500 Subject: [PATCH 5/9] Support named regex groups and add documentation to TableDrivenDebugPort TextCommand.extractParams() now returns a dict when the matchPattern uses named groups (?P...), and formatResponse() accepts dicts for named template parameters. Anonymous groups and positional tuples still work. Add class and method docstrings to TableDrivenDebugPort with examples showing named parameters for self-documenting command definitions. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/communication/commands.py | 5 ++ hardwarelibrary/communication/debugport.py | 59 +++++++++++++++++++ .../tests/testCommandRecognition.py | 15 +++++ 3 files changed, 79 insertions(+) diff --git a/hardwarelibrary/communication/commands.py b/hardwarelibrary/communication/commands.py index 3d6d257..d7091a1 100644 --- a/hardwarelibrary/communication/commands.py +++ b/hardwarelibrary/communication/commands.py @@ -100,10 +100,15 @@ def extractParams(self, inputBytes): return () match = re.match(self.effectiveMatchPattern, inputStr) if match: + if match.groupdict(): + return match.groupdict() return match.groups() return () def formatResponse(self, result): + if isinstance(result, dict) and self.responseTemplate is not None: + formatted = self.responseTemplate.format(**result) + return bytearray(formatted.encode('utf-8')) if isinstance(result, tuple) and self.responseTemplate is not None: formatted = self.responseTemplate.format(*result) return bytearray(formatted.encode('utf-8')) diff --git a/hardwarelibrary/communication/debugport.py b/hardwarelibrary/communication/debugport.py index 6036132..401fd7c 100644 --- a/hardwarelibrary/communication/debugport.py +++ b/hardwarelibrary/communication/debugport.py @@ -78,6 +78,47 @@ def processInputBuffers(self, endPointIndex): class TableDrivenDebugPort(DebugPort): + """A mock port that recognizes incoming commands and produces responses, + driven by a dict of Command objects (TextCommand or DataCommand). + + Each Command object knows how to recognize itself in raw input bytes + (matches/extractParams) and how to format a response (formatResponse). + This means the same Command objects used by a real device to *send* + commands can also be reused here to *recognize* them, eliminating + protocol duplication between device code and mock code. + + To create a mock port for a device: + + 1. Pass the device's commands dict to __init__: + + commands = { + "GET": TextCommand(name="GET", text="GET {key}\\r", + matchPattern=r'GET (?P\\w+)\\r', + responseTemplate="VAL {value}\\r"), + "SET": DataCommand(name="SET", prefix=b'S', + requestFormat='...) + patterns, a positional tuple for anonymous groups or struct.unpack. + The return value is passed to formatResponse: dicts use named template + parameters, tuples use positional parameters, raw bytes/strings are + returned directly, None sends nothing. + """ + def __init__(self, delay=0, numberOfEndPoints=1, commands=None): super().__init__(delay=delay, numberOfEndPoints=numberOfEndPoints) self.commands = commands if commands is not None else {} @@ -101,4 +142,22 @@ def processInputBuffers(self, endPointIndex): self.inputBuffers[endPointIndex] = bytearray() def process_command(self, name, params, endPointIndex): + """Process a recognized command and return a result. + + Subclasses must override this to implement device behavior. + + Args: + name: the Command's name (matches a key in self.commands) + params: extracted parameters — a dict when using named regex + groups (?P...), a tuple for anonymous groups or + struct.unpack fields + endPointIndex: the endpoint the command arrived on + + Returns: + The return value is passed to the command's formatResponse(): + - dict: formatted using named responseTemplate parameters + - tuple: formatted using positional responseTemplate/responseFormat + - bytes/str: written to the output buffer as-is + - None: no response is sent + """ raise NotImplementedError("Subclasses must implement process_command") \ No newline at end of file diff --git a/hardwarelibrary/tests/testCommandRecognition.py b/hardwarelibrary/tests/testCommandRecognition.py index b37786f..237e5d0 100644 --- a/hardwarelibrary/tests/testCommandRecognition.py +++ b/hardwarelibrary/tests/testCommandRecognition.py @@ -68,6 +68,21 @@ def testFormatResponseNone(self): cmd = TextCommand(name="test", text="cmd\r") self.assertIsNone(cmd.formatResponse(None)) + def testNamedGroupExtractParams(self): + cmd = TextCommand(name="test", text="SET {key} {value}\r", + matchPattern=r'SET (?P\w+) (?P-?\d+)\r') + params = cmd.extractParams(b'SET foo 42\r') + self.assertIsInstance(params, dict) + self.assertEqual(params["key"], "foo") + self.assertEqual(params["value"], "42") + + def testNamedGroupFormatResponse(self): + cmd = TextCommand(name="test", text="GET {key}\r", + matchPattern=r'GET (?P\w+)\r', + responseTemplate="VAL {value}\r") + result = cmd.formatResponse({"value": "99"}) + self.assertEqual(result, bytearray(b'VAL 99\r')) + def testEffectiveMatchPatternUsesExplicit(self): cmd = TextCommand(name="test", text="SET {0}\r", matchPattern=r'SET (\w+)\r') From d68f613eef24cd4a79c61bacb0de5d2c353d883e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20C=C3=B4t=C3=A9?= Date: Sun, 1 Mar 2026 17:58:00 -0500 Subject: [PATCH 6/9] Convert Intellidrive commands to named regex parameters Use (?P...), (?P...), (?P...) in matchPatterns and {register}, {value}, {mode} in text/responseTemplate fields. process_command now uses params["register"] instead of params[0]. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/motion/intellidrivedevice.py | 24 +++++++++----------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/hardwarelibrary/motion/intellidrivedevice.py b/hardwarelibrary/motion/intellidrivedevice.py index 2cf1cde..c276e00 100644 --- a/hardwarelibrary/motion/intellidrivedevice.py +++ b/hardwarelibrary/motion/intellidrivedevice.py @@ -24,14 +24,14 @@ class IntellidriveDevice(RotationDevice): classIdProduct = 0x6001 commands = { - "SET_REGISTER": TextCommand(name="SET_REGISTER", text="s r{0} {1}\r", - matchPattern=r's r(0x[0-9a-fA-F]+) (-?\d+)[\r\n]', + "SET_REGISTER": TextCommand(name="SET_REGISTER", text="s r{register} {value}\r", + matchPattern=r's r(?P0x[0-9a-fA-F]+) (?P-?\d+)[\r\n]', replyPattern="ok", responseTemplate="ok\r"), - "GET_REGISTER": TextCommand(name="GET_REGISTER", text="g r{0}\n", - matchPattern=r'g r(0x[0-9a-fA-F]+)[\r\n]', - replyPattern=r'v\s(-?\d+)', responseTemplate="v {0}\r"), - "TRAJECTORY": TextCommand(name="TRAJECTORY", text="t {0}\n", - matchPattern=r't (\d+)[\r\n]', + "GET_REGISTER": TextCommand(name="GET_REGISTER", text="g r{register}\n", + matchPattern=r'g r(?P0x[0-9a-fA-F]+)[\r\n]', + replyPattern=r'v\s(-?\d+)', responseTemplate="v {value}\r"), + "TRAJECTORY": TextCommand(name="TRAJECTORY", text="t {mode}\n", + matchPattern=r't (?P\d+)[\r\n]', replyPattern="ok", responseTemplate="ok\r"), } @@ -138,15 +138,13 @@ def __init__(self): def process_command(self, name, params, endPointIndex): if name == 'SET_REGISTER': - register, value = params - self.registers[register] = int(value) + self.registers[params["register"]] = int(params["value"]) return "ok\r" elif name == 'GET_REGISTER': - register = params[0] - value = self.registers.get(register, 0) - return "v {}\r".format(value) + value = self.registers.get(params["register"], 0) + return {"value": str(value)} elif name == 'TRAJECTORY': - mode = int(params[0]) + mode = int(params["mode"]) if mode == 2: # home self.registers['0xca'] = 0 self.registers['0xc9'] = (1 << 12) # referenced bit From b021358794cca12683f48970d689fcc037cbb344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20C=C3=B4t=C3=A9?= Date: Sun, 1 Mar 2026 18:06:55 -0500 Subject: [PATCH 7/9] Convert Sutter commands to named parameters Add requestFields and responseFields to DataCommand, mapping struct fields to names so extractParams returns a dict and formatResponse accepts a dict. Sutter process_command now uses params['x'], params['y'], params['z'] and returns {'header': ..., 'x': ..., 'y': ..., 'z': ..., 'terminator': ...}. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/communication/commands.py | 14 ++++++++++++-- hardwarelibrary/motion/sutterdevice.py | 12 +++++++++--- hardwarelibrary/tests/testCommandRecognition.py | 17 +++++++++++++++++ 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/hardwarelibrary/communication/commands.py b/hardwarelibrary/communication/commands.py index d7091a1..47e26c4 100644 --- a/hardwarelibrary/communication/commands.py +++ b/hardwarelibrary/communication/commands.py @@ -215,7 +215,8 @@ def send(self, port, params=None) -> bool: class DataCommand(Command): def __init__(self, name, data=None, replyHexRegex = None, replyDataLength = 0, unpackingMask = None, endPoints = (None, None), - prefix = None, requestFormat = None, responseFormat = None): + prefix = None, requestFormat = None, responseFormat = None, + requestFields = None, responseFields = None): Command.__init__(self, name, endPoints=endPoints) self.data : bytearray = data self.replyHexRegex: str = replyHexRegex @@ -224,6 +225,8 @@ def __init__(self, name, data=None, replyHexRegex = None, replyDataLength = 0, u self._prefix: bytes = prefix self.requestFormat: str = requestFormat self.responseFormat: str = responseFormat + self.requestFields: tuple = requestFields + self.responseFields: tuple = responseFields @property def payload(self): @@ -250,10 +253,17 @@ def extractParams(self, inputBytes): if self.requestFormat is not None: unpackLen = struct.calcsize(self.requestFormat) if len(inputBytes) >= unpackLen: - return struct.unpack(self.requestFormat, bytes(inputBytes[:unpackLen])) + values = struct.unpack(self.requestFormat, bytes(inputBytes[:unpackLen])) + if self.requestFields is not None: + return dict(zip(self.requestFields, values)) + return values return () def formatResponse(self, result): + if isinstance(result, dict) and self.responseFormat is not None and self.responseFields is not None: + values = tuple(result[f] for f in self.responseFields) + data = struct.pack(self.responseFormat, *values) + return bytearray(data) if isinstance(result, tuple) and self.responseFormat is not None: data = struct.pack(self.responseFormat, *result) return bytearray(data) diff --git a/hardwarelibrary/motion/sutterdevice.py b/hardwarelibrary/motion/sutterdevice.py index 0ae312e..254cee8 100644 --- a/hardwarelibrary/motion/sutterdevice.py +++ b/hardwarelibrary/motion/sutterdevice.py @@ -18,10 +18,12 @@ class SutterDevice(LinearMotionDevice): commands = { "MOVE": DataCommand(name="MOVE", prefix=b'M', requestFormat=' Date: Sun, 1 Mar 2026 18:15:35 -0500 Subject: [PATCH 8/9] Add docstrings to commands.py Document the module, all four classes (Command, TextCommand, MultilineTextCommand, DataCommand), and their key methods. Covers both the send side and recognition side roles, with parameter descriptions and examples. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/communication/commands.py | 148 ++++++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/hardwarelibrary/communication/commands.py b/hardwarelibrary/communication/commands.py index 47e26c4..a90ccf9 100644 --- a/hardwarelibrary/communication/commands.py +++ b/hardwarelibrary/communication/commands.py @@ -1,7 +1,41 @@ +"""Command classes that describe a device's communication protocol. + +Each Command object serves two roles: + +1. **Send side** (real device): build a payload, send it through a port, + and parse the reply. This is done via the `send()` method. + +2. **Recognition side** (mock/debug port): examine raw incoming bytes, + decide if they match this command, extract parameters, and format + a response. This is done via `matches()`, `extractParams()`, and + `formatResponse()`. + +By defining the protocol once in Command objects, the same definitions +drive both real communication and mock ports (TableDrivenDebugPort), +eliminating protocol duplication. + +Subclasses: + TextCommand — text-based protocols (e.g. "s r0x24 31\\r" → "ok\\r") + DataCommand — binary protocols (e.g. struct-packed position commands) + MultilineTextCommand — text protocols that return multiple lines +""" + import re import struct class Command: + """Base class for all device commands. + + A Command has a name and tracks send/reply state. Subclasses override + `send()` for the client side, and `matches()`/`extractParams()`/ + `formatResponse()` for the mock port side. + + Args: + name: identifier for this command (e.g. "GET_POSITION", "SET_REGISTER") + endPoints: tuple of (writeEndPoint, readEndPoint) for USB devices + that use separate endpoints for sending and receiving + """ + def __init__(self, name:str, endPoints = (None, None)): self.name = name self.reply = None @@ -33,15 +67,33 @@ def hasError(self): return len(self.exceptions) != 0 def send(self, port) -> bool: + """Send this command through a port. Subclasses must override.""" raise NotImplementedError("Subclasses must implement the send() command") def matches(self, inputBytes): + """Return True if inputBytes represents this command. + Base implementation always returns False.""" return False def extractParams(self, inputBytes): + """Extract parameters from recognized input bytes. + Returns a dict (named params) or tuple (positional params). + Base implementation returns an empty tuple.""" return () def formatResponse(self, result): + """Convert a process_command() return value into bytes for the output buffer. + + Args: + result: the value returned by process_command(): + - None → returns None (no response sent) + - bytes/bytearray → returned as-is + - str → encoded to UTF-8 + - tuple or dict → handled by subclass (template/struct formatting) + + Returns: + bytearray to write to the output buffer, or None. + """ if result is None: return None elif isinstance(result, (bytes, bytearray)): @@ -51,6 +103,31 @@ def formatResponse(self, result): return None class TextCommand(Command): + """A command that communicates via text strings (UTF-8). + + Used for devices with text-based protocols like "s r0x24 31\\r" → "ok\\r". + + Send-side parameters (used by send()): + text: format string sent to the device, e.g. "g r{register}\\n" + replyPattern: regex to validate the device's reply + alternatePattern: secondary regex if replyPattern doesn't match + + Recognition-side parameters (used by matches/extractParams/formatResponse): + matchPattern: regex to recognize incoming bytes in a mock port. + Use named groups for readable code: + r'g r(?P0x[0-9a-fA-F]+)[\\r\\n]' + If None, an auto-derived pattern from `text` is used. + responseTemplate: format string for the mock response, e.g. "v {value}\\r". + Filled with named (dict) or positional (tuple) params + returned by process_command(). + + Example: + TextCommand(name="GET_REGISTER", text="g r{register}\\n", + matchPattern=r'g r(?P0x[0-9a-fA-F]+)[\\r\\n]', + replyPattern=r'v\\s(-?\\d+)', + responseTemplate="v {value}\\r") + """ + def __init__(self, name, text, replyPattern = None, alternatePattern = None, endPoints = (None, None), @@ -76,17 +153,21 @@ def numberOfArguments(self): @property def _autoMatchPattern(self): + """Derive a regex from the text format string by replacing {…} + placeholders with (.+?) capture groups.""" parts = re.split(r'\{[^}]*\}', self.text) escaped = [re.escape(p) for p in parts] return '(.+?)'.join(escaped) @property def effectiveMatchPattern(self): + """Return the explicit matchPattern if set, otherwise the auto-derived one.""" if self.matchPattern is not None: return self.matchPattern return self._autoMatchPattern def matches(self, inputBytes): + """Return True if inputBytes matches this text command's pattern.""" try: inputStr = inputBytes.decode('utf-8', errors='replace') except Exception: @@ -94,6 +175,10 @@ def matches(self, inputBytes): return re.match(self.effectiveMatchPattern, inputStr) is not None def extractParams(self, inputBytes): + """Extract parameters from the input bytes using the match pattern. + + Returns a dict if the pattern uses named groups (?P...), + otherwise a tuple of positional groups.""" try: inputStr = inputBytes.decode('utf-8', errors='replace') except Exception: @@ -106,6 +191,12 @@ def extractParams(self, inputBytes): return () def formatResponse(self, result): + """Format a mock response using the responseTemplate. + + - dict result → template.format(**result), e.g. {"value": "42"} → "v 42\\r" + - tuple result → template.format(*result), e.g. ("42",) → "v 42\\r" + - other types → delegated to base class (bytes/str as-is, None → None) + """ if isinstance(result, dict) and self.responseTemplate is not None: formatted = self.responseTemplate.format(**result) return bytearray(formatted.encode('utf-8')) @@ -145,6 +236,17 @@ def send(self, port, params=None) -> bool: class MultilineTextCommand(Command): + """A text command that expects a multi-line reply. + + The reply is read either a fixed number of times (lineCount > 1) or + until a line matches lastLinePattern. Each line is matched against + replyPattern and the results are collected into lists. + + Args: + lineCount: number of reply lines to read (if > 1) + lastLinePattern: regex that signals the final line (alternative to lineCount) + """ + def __init__(self, name, text, replyPattern=None, alternatePattern=None, @@ -214,6 +316,40 @@ def send(self, port, params=None) -> bool: class DataCommand(Command): + """A command that communicates via binary data (struct-packed bytes). + + Used for devices with binary protocols like the Sutter micromanipulator + where commands are single-byte prefixes followed by packed integers. + + Send-side parameters (used by send()): + data: raw bytes to send to the device + replyDataLength: number of bytes to read back + unpackingMask: struct format to unpack the reply + + Recognition-side parameters (used by matches/extractParams/formatResponse): + prefix: byte(s) that identify this command in incoming data. + Matched case-insensitively. If None, derived from data[0:1]. + requestFormat: struct format to unpack incoming request parameters. + Padding bytes (x) are skipped automatically by struct. + requestFields: tuple of field names for the unpacked values. + When set, extractParams() returns a dict: + e.g. requestFields=('x','y','z') → {'x': 10, 'y': 20, 'z': 30} + When None, extractParams() returns a positional tuple. + responseFormat: struct format to pack the mock response. + responseFields: tuple of field names expected in the response dict. + Defines the order for struct.pack when process_command() + returns a dict. + + Example: + DataCommand(name="MOVE", prefix=b'M', + requestFormat=' 0: @@ -241,6 +378,7 @@ def effectivePrefix(self): return None def matches(self, inputBytes): + """Return True if inputBytes starts with this command's prefix (case-insensitive).""" prefix = self.effectivePrefix if prefix is None: return False @@ -250,6 +388,10 @@ def matches(self, inputBytes): return inputBytes[:prefixLen].upper() == prefix.upper() def extractParams(self, inputBytes): + """Unpack parameters from the input bytes using requestFormat. + + Returns a dict if requestFields is set (e.g. {'x': 10, 'y': 20}), + otherwise a positional tuple from struct.unpack.""" if self.requestFormat is not None: unpackLen = struct.calcsize(self.requestFormat) if len(inputBytes) >= unpackLen: @@ -260,6 +402,12 @@ def extractParams(self, inputBytes): return () def formatResponse(self, result): + """Pack a mock response using responseFormat. + + - dict result + responseFields → values ordered by responseFields, then packed + - tuple result → packed directly with struct.pack + - other types → delegated to base class (bytes/str as-is, None → None) + """ if isinstance(result, dict) and self.responseFormat is not None and self.responseFields is not None: values = tuple(result[f] for f in self.responseFields) data = struct.pack(self.responseFormat, *values) From 91e7a8f444e79ef3da2c0c4a11e6c67feec93a4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20C=C3=B4t=C3=A9?= Date: Sun, 1 Mar 2026 18:19:21 -0500 Subject: [PATCH 9/9] Rewrite module docstring to explain the two-way role of commands dicts Every PhysicalDevice defines a commands dict that serves both sides: sending commands to real hardware and creating mock debug ports that receive those same commands and reply in the correct format. Co-Authored-By: Claude Opus 4.6 --- hardwarelibrary/communication/commands.py | 35 ++++++++++++++--------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/hardwarelibrary/communication/commands.py b/hardwarelibrary/communication/commands.py index a90ccf9..f39071d 100644 --- a/hardwarelibrary/communication/commands.py +++ b/hardwarelibrary/communication/commands.py @@ -1,18 +1,27 @@ """Command classes that describe a device's communication protocol. -Each Command object serves two roles: - -1. **Send side** (real device): build a payload, send it through a port, - and parse the reply. This is done via the `send()` method. - -2. **Recognition side** (mock/debug port): examine raw incoming bytes, - decide if they match this command, extract parameters, and format - a response. This is done via `matches()`, `extractParams()`, and - `formatResponse()`. - -By defining the protocol once in Command objects, the same definitions -drive both real communication and mock ports (TableDrivenDebugPort), -eliminating protocol duplication. +Every PhysicalDevice can define a ``commands`` dictionary — a dict of +Command objects that fully describes its protocol. For example, a +Sutter micromanipulator defines MOVE, GET_POSITION, and HOME commands, +while an Intellidrive rotation stage defines SET_REGISTER, GET_REGISTER, +and TRAJECTORY commands. + +This dictionary is useful in two complementary ways: + +1. **Talking to a real device**: each Command knows how to build a + payload, send it through a port, and parse the reply. The device + code calls ``send()`` and reads back ``reply`` / ``matchGroups``. + +2. **Creating a mock debug port**: the same Command objects can be + passed to a ``TableDrivenDebugPort``, which reverses the roles — + it *receives* those commands instead of sending them, extracts the + parameters from the incoming bytes, and replies in the correct + format. This makes it trivial to write a mock port for any device: + just pass its ``commands`` dict and implement ``process_command()`` + with the device-specific logic. + +Because both sides share the same Command definitions, the protocol is +defined once and cannot drift between real and mock implementations. Subclasses: TextCommand — text-based protocols (e.g. "s r0x24 31\\r" → "ok\\r")