diff --git a/hardwarelibrary/communication/__init__.py b/hardwarelibrary/communication/__init__.py index bc0034e..69ef712 100644 --- a/hardwarelibrary/communication/__init__.py +++ b/hardwarelibrary/communication/__init__.py @@ -2,7 +2,7 @@ from .serialport import SerialPort from .usbport import USBPort from .diagnostics import USBParameters, DeviceCommand, USBDeviceDescription -from .debugport import DebugPort +from .debugport import DebugPort, TableDrivenDebugPort from .echoport import DebugEchoPort import usb.backend.libusb1 import platform diff --git a/hardwarelibrary/communication/commands.py b/hardwarelibrary/communication/commands.py index 99cd729..f39071d 100644 --- a/hardwarelibrary/communication/commands.py +++ b/hardwarelibrary/communication/commands.py @@ -1,6 +1,50 @@ +"""Command classes that describe a device's communication protocol. + +Every PhysicalDevice can define a ``commands`` dictionary — a dict of +Command objects that fully describes its protocol. For example, a +Sutter micromanipulator defines MOVE, GET_POSITION, and HOME commands, +while an Intellidrive rotation stage defines SET_REGISTER, GET_REGISTER, +and TRAJECTORY commands. + +This dictionary is useful in two complementary ways: + +1. **Talking to a real device**: each Command knows how to build a + payload, send it through a port, and parse the reply. The device + code calls ``send()`` and reads back ``reply`` / ``matchGroups``. + +2. **Creating a mock debug port**: the same Command objects can be + passed to a ``TableDrivenDebugPort``, which reverses the roles — + it *receives* those commands instead of sending them, extracts the + parameters from the incoming bytes, and replies in the correct + format. This makes it trivial to write a mock port for any device: + just pass its ``commands`` dict and implement ``process_command()`` + with the device-specific logic. + +Because both sides share the same Command definitions, the protocol is +defined once and cannot drift between real and mock implementations. + +Subclasses: + TextCommand — text-based protocols (e.g. "s r0x24 31\\r" → "ok\\r") + DataCommand — binary protocols (e.g. struct-packed position commands) + MultilineTextCommand — text protocols that return multiple lines +""" + import re +import struct class Command: + """Base class for all device commands. + + A Command has a name and tracks send/reply state. Subclasses override + `send()` for the client side, and `matches()`/`extractParams()`/ + `formatResponse()` for the mock port side. + + Args: + name: identifier for this command (e.g. "GET_POSITION", "SET_REGISTER") + endPoints: tuple of (writeEndPoint, readEndPoint) for USB devices + that use separate endpoints for sending and receiving + """ + def __init__(self, name:str, endPoints = (None, None)): self.name = name self.reply = None @@ -26,22 +70,84 @@ def matchAsFloat(self, index=0): if self.matchGroups is not None: return float(self.matchGroups[index]) return None - + @property def hasError(self): return len(self.exceptions) != 0 def send(self, port) -> bool: + """Send this command through a port. Subclasses must override.""" raise NotImplementedError("Subclasses must implement the send() command") + def matches(self, inputBytes): + """Return True if inputBytes represents this command. + Base implementation always returns False.""" + return False + + def extractParams(self, inputBytes): + """Extract parameters from recognized input bytes. + Returns a dict (named params) or tuple (positional params). + Base implementation returns an empty tuple.""" + return () + + def formatResponse(self, result): + """Convert a process_command() return value into bytes for the output buffer. + + Args: + result: the value returned by process_command(): + - None → returns None (no response sent) + - bytes/bytearray → returned as-is + - str → encoded to UTF-8 + - tuple or dict → handled by subclass (template/struct formatting) + + Returns: + bytearray to write to the output buffer, or None. + """ + if result is None: + return None + elif isinstance(result, (bytes, bytearray)): + return bytearray(result) + elif isinstance(result, str): + return bytearray(result.encode('utf-8')) + return None + class TextCommand(Command): - def __init__(self, name, text, replyPattern = None, - alternatePattern = None, - endPoints = (None, None)): + """A command that communicates via text strings (UTF-8). + + Used for devices with text-based protocols like "s r0x24 31\\r" → "ok\\r". + + Send-side parameters (used by send()): + text: format string sent to the device, e.g. "g r{register}\\n" + replyPattern: regex to validate the device's reply + alternatePattern: secondary regex if replyPattern doesn't match + + Recognition-side parameters (used by matches/extractParams/formatResponse): + matchPattern: regex to recognize incoming bytes in a mock port. + Use named groups for readable code: + r'g r(?P0x[0-9a-fA-F]+)[\\r\\n]' + If None, an auto-derived pattern from `text` is used. + responseTemplate: format string for the mock response, e.g. "v {value}\\r". + Filled with named (dict) or positional (tuple) params + returned by process_command(). + + Example: + TextCommand(name="GET_REGISTER", text="g r{register}\\n", + matchPattern=r'g r(?P0x[0-9a-fA-F]+)[\\r\\n]', + replyPattern=r'v\\s(-?\\d+)', + responseTemplate="v {value}\\r") + """ + + def __init__(self, name, text, replyPattern = None, + alternatePattern = None, + endPoints = (None, None), + matchPattern = None, + responseTemplate = None): Command.__init__(self, name, endPoints=endPoints) self.text : str = text self.replyPattern: str = replyPattern self.alternatePattern: str = alternatePattern + self.matchPattern: str = matchPattern + self.responseTemplate: str = responseTemplate @property def payload(self): @@ -54,6 +160,60 @@ def numberOfArguments(self): return 0 return len(match.groups()) + @property + def _autoMatchPattern(self): + """Derive a regex from the text format string by replacing {…} + placeholders with (.+?) capture groups.""" + parts = re.split(r'\{[^}]*\}', self.text) + escaped = [re.escape(p) for p in parts] + return '(.+?)'.join(escaped) + + @property + def effectiveMatchPattern(self): + """Return the explicit matchPattern if set, otherwise the auto-derived one.""" + if self.matchPattern is not None: + return self.matchPattern + return self._autoMatchPattern + + def matches(self, inputBytes): + """Return True if inputBytes matches this text command's pattern.""" + try: + inputStr = inputBytes.decode('utf-8', errors='replace') + except Exception: + return False + return re.match(self.effectiveMatchPattern, inputStr) is not None + + def extractParams(self, inputBytes): + """Extract parameters from the input bytes using the match pattern. + + Returns a dict if the pattern uses named groups (?P...), + otherwise a tuple of positional groups.""" + try: + inputStr = inputBytes.decode('utf-8', errors='replace') + except Exception: + return () + match = re.match(self.effectiveMatchPattern, inputStr) + if match: + if match.groupdict(): + return match.groupdict() + return match.groups() + return () + + def formatResponse(self, result): + """Format a mock response using the responseTemplate. + + - dict result → template.format(**result), e.g. {"value": "42"} → "v 42\\r" + - tuple result → template.format(*result), e.g. ("42",) → "v 42\\r" + - other types → delegated to base class (bytes/str as-is, None → None) + """ + if isinstance(result, dict) and self.responseTemplate is not None: + formatted = self.responseTemplate.format(**result) + return bytearray(formatted.encode('utf-8')) + if isinstance(result, tuple) and self.responseTemplate is not None: + formatted = self.responseTemplate.format(*result) + return bytearray(formatted.encode('utf-8')) + return super().formatResponse(result) + def send(self, port, params=None) -> bool: try: if params is not None: @@ -85,6 +245,17 @@ def send(self, port, params=None) -> bool: class MultilineTextCommand(Command): + """A text command that expects a multi-line reply. + + The reply is read either a fixed number of times (lineCount > 1) or + until a line matches lastLinePattern. Each line is matched against + replyPattern and the results are collected into lists. + + Args: + lineCount: number of reply lines to read (if > 1) + lastLinePattern: regex that signals the final line (alternative to lineCount) + """ + def __init__(self, name, text, replyPattern=None, alternatePattern=None, @@ -154,17 +325,107 @@ def send(self, port, params=None) -> bool: class DataCommand(Command): - def __init__(self, name, data, replyHexRegex = None, replyDataLength = 0, unpackingMask = None, endPoints = (None, None)): + """A command that communicates via binary data (struct-packed bytes). + + Used for devices with binary protocols like the Sutter micromanipulator + where commands are single-byte prefixes followed by packed integers. + + Send-side parameters (used by send()): + data: raw bytes to send to the device + replyDataLength: number of bytes to read back + unpackingMask: struct format to unpack the reply + + Recognition-side parameters (used by matches/extractParams/formatResponse): + prefix: byte(s) that identify this command in incoming data. + Matched case-insensitively. If None, derived from data[0:1]. + requestFormat: struct format to unpack incoming request parameters. + Padding bytes (x) are skipped automatically by struct. + requestFields: tuple of field names for the unpacked values. + When set, extractParams() returns a dict: + e.g. requestFields=('x','y','z') → {'x': 10, 'y': 20, 'z': 30} + When None, extractParams() returns a positional tuple. + responseFormat: struct format to pack the mock response. + responseFields: tuple of field names expected in the response dict. + Defines the order for struct.pack when process_command() + returns a dict. + + Example: + DataCommand(name="MOVE", prefix=b'M', + requestFormat=' 0: + return self.data[0:1] + return None + + def matches(self, inputBytes): + """Return True if inputBytes starts with this command's prefix (case-insensitive).""" + prefix = self.effectivePrefix + if prefix is None: + return False + prefixLen = len(prefix) + if len(inputBytes) < prefixLen: + return False + return inputBytes[:prefixLen].upper() == prefix.upper() + + def extractParams(self, inputBytes): + """Unpack parameters from the input bytes using requestFormat. + + Returns a dict if requestFields is set (e.g. {'x': 10, 'y': 20}), + otherwise a positional tuple from struct.unpack.""" + if self.requestFormat is not None: + unpackLen = struct.calcsize(self.requestFormat) + if len(inputBytes) >= unpackLen: + values = struct.unpack(self.requestFormat, bytes(inputBytes[:unpackLen])) + if self.requestFields is not None: + return dict(zip(self.requestFields, values)) + return values + return () + + def formatResponse(self, result): + """Pack a mock response using responseFormat. + + - dict result + responseFields → values ordered by responseFields, then packed + - tuple result → packed directly with struct.pack + - other types → delegated to base class (bytes/str as-is, None → None) + """ + if isinstance(result, dict) and self.responseFormat is not None and self.responseFields is not None: + values = tuple(result[f] for f in self.responseFields) + data = struct.pack(self.responseFormat, *values) + return bytearray(data) + if isinstance(result, tuple) and self.responseFormat is not None: + data = struct.pack(self.responseFormat, *result) + return bytearray(data) + return super().formatResponse(result) + def send(self, port) -> bool: try: nBytes = port.writeData(data=self.data, endPoint=self.endPoints[0]) diff --git a/hardwarelibrary/communication/debugport.py b/hardwarelibrary/communication/debugport.py index b7ebe45..401fd7c 100644 --- a/hardwarelibrary/communication/debugport.py +++ b/hardwarelibrary/communication/debugport.py @@ -1,82 +1,62 @@ import time import random +import struct +import re from hardwarelibrary.communication import * from threading import Thread, Lock -class DebugDataCommand(Command): - def __init__(self, name, dataHexRegex = None, unpackingMask = None, endPoints = (None, None)): - Command.__init__(self, name, endPoints=endPoints) - self.data : bytearray = data - self.dataHexRegex: str = dataHexRegex - self.unpackingMask:str = unpackingMask - - def send(self, port) -> bool: - try: - self.isSent = True - nBytes = port.writeData(data=self.data, endPoint=self.endPoints[0]) - if self.replyDataLength > 0: - self.reply = port.readData(length=self.replyDataLength) - elif self.replyHexRegex is not None: - raise NotImplementedError("DataCommand reply pattern not implemented") - # self.reply = port.readData(length=self.replyDataLength) - self.isSentSuccessfully = True - except Exception as err: - - self.exceptions.append(err) - self.isSentSuccessfully = False - raise(err) - - return False - class DebugPort(CommunicationPort): - def __init__(self, delay=0): - self.inputBuffers = [bytearray()] - self.outputBuffers = [bytearray()] + def __init__(self, delay=0, numberOfEndPoints=1): + self.inputBuffers = [bytearray() for _ in range(numberOfEndPoints)] + self.outputBuffers = [bytearray() for _ in range(numberOfEndPoints)] self.delay = delay + self.defaultTimeout = 500 self._isOpen = False super(DebugPort, self).__init__() @property def isOpen(self): - return self._isOpen + return self._isOpen def open(self): if self._isOpen: - raise Exception() + raise Exception("Port already open") self._isOpen = True - return def close(self): self._isOpen = False - return def bytesAvailable(self, endPoint=0): - return len(self.outputBuffers[endPoint]) + endPointIndex = endPoint if endPoint is not None else 0 + return len(self.outputBuffers[endPointIndex]) def flush(self): - self.buffers = [bytearray(),bytearray()] + for i in range(len(self.inputBuffers)): + self.inputBuffers[i] = bytearray() + for i in range(len(self.outputBuffers)): + self.outputBuffers[i] = bytearray() def readData(self, length, endPoint=None): - if endPoint is None: - endPointIndex = 0 + endPointIndex = endPoint if endPoint is not None else 0 with self.portLock: - time.sleep(self.delay*random.random()) + if self.delay > 0: + time.sleep(self.delay * random.random()) + data = bytearray() - for i in range(0, length): + for i in range(length): if len(self.outputBuffers[endPointIndex]) > 0: byte = self.outputBuffers[endPointIndex].pop(0) data.append(byte) else: - raise CommunicationReadTimeout("Unable to read data") + raise CommunicationReadTimeout("Unable to read {0} bytes, only {1} available".format(length, len(data))) return data def writeData(self, data, endPoint=None): - if endPoint is None: - endPointIndex = 0 + endPointIndex = endPoint if endPoint is not None else 0 with self.portLock: self.inputBuffers[endPointIndex].extend(data) @@ -85,14 +65,99 @@ def writeData(self, data, endPoint=None): return len(data) - def writeToOutputBuffer(self, data, endPointIndex): + def writeToOutputBuffer(self, data, endPointIndex=0): self.outputBuffers[endPointIndex].extend(data) def processInputBuffers(self, endPointIndex): # We default to ECHO for simplicity - inputBytes = self.inputBuffers[endPointIndex] # Do something, here we do an Echo self.writeToOutputBuffer(inputBytes, endPointIndex) - self.inputBuffers[endPointIndex] = bytearray() \ No newline at end of file + self.inputBuffers[endPointIndex] = bytearray() + + +class TableDrivenDebugPort(DebugPort): + """A mock port that recognizes incoming commands and produces responses, + driven by a dict of Command objects (TextCommand or DataCommand). + + Each Command object knows how to recognize itself in raw input bytes + (matches/extractParams) and how to format a response (formatResponse). + This means the same Command objects used by a real device to *send* + commands can also be reused here to *recognize* them, eliminating + protocol duplication between device code and mock code. + + To create a mock port for a device: + + 1. Pass the device's commands dict to __init__: + + commands = { + "GET": TextCommand(name="GET", text="GET {key}\\r", + matchPattern=r'GET (?P\\w+)\\r', + responseTemplate="VAL {value}\\r"), + "SET": DataCommand(name="SET", prefix=b'S', + requestFormat='...) + patterns, a positional tuple for anonymous groups or struct.unpack. + The return value is passed to formatResponse: dicts use named template + parameters, tuples use positional parameters, raw bytes/strings are + returned directly, None sends nothing. + """ + + def __init__(self, delay=0, numberOfEndPoints=1, commands=None): + super().__init__(delay=delay, numberOfEndPoints=numberOfEndPoints) + self.commands = commands if commands is not None else {} + + def processInputBuffers(self, endPointIndex): + inputBytes = self.inputBuffers[endPointIndex] + if len(inputBytes) == 0: + return + + for cmd in self.commands.values(): + if cmd.matches(inputBytes): + params = cmd.extractParams(inputBytes) + result = self.process_command(cmd.name, params, endPointIndex) + response = cmd.formatResponse(result) + if response is not None: + self.writeToOutputBuffer(response, endPointIndex) + self.inputBuffers[endPointIndex] = bytearray() + return + + print("Unrecognized command: {0}".format(inputBytes)) + self.inputBuffers[endPointIndex] = bytearray() + + def process_command(self, name, params, endPointIndex): + """Process a recognized command and return a result. + + Subclasses must override this to implement device behavior. + + Args: + name: the Command's name (matches a key in self.commands) + params: extracted parameters — a dict when using named regex + groups (?P...), a tuple for anonymous groups or + struct.unpack fields + endPointIndex: the endpoint the command arrived on + + Returns: + The return value is passed to the command's formatResponse(): + - dict: formatted using named responseTemplate parameters + - tuple: formatted using positional responseTemplate/responseFormat + - bytes/str: written to the output buffer as-is + - None: no response is sent + """ + raise NotImplementedError("Subclasses must implement process_command") \ No newline at end of file diff --git a/hardwarelibrary/motion/intellidrivedevice.py b/hardwarelibrary/motion/intellidrivedevice.py index d94a5b9..c276e00 100644 --- a/hardwarelibrary/motion/intellidrivedevice.py +++ b/hardwarelibrary/motion/intellidrivedevice.py @@ -1,7 +1,7 @@ from hardwarelibrary.motion.rotationdevice import * from hardwarelibrary.communication.serialport import SerialPort -from hardwarelibrary.communication.commands import DataCommand -from hardwarelibrary.communication.debugport import DebugPort +from hardwarelibrary.communication.commands import TextCommand +from hardwarelibrary.communication.debugport import TableDrivenDebugPort import time from struct import * @@ -21,7 +21,19 @@ class State(Enum): class IntellidriveDevice(RotationDevice): classIdVendor = 0x0403 - classIdProduct = 0x6001 + classIdProduct = 0x6001 + + commands = { + "SET_REGISTER": TextCommand(name="SET_REGISTER", text="s r{register} {value}\r", + matchPattern=r's r(?P0x[0-9a-fA-F]+) (?P-?\d+)[\r\n]', + replyPattern="ok", responseTemplate="ok\r"), + "GET_REGISTER": TextCommand(name="GET_REGISTER", text="g r{register}\n", + matchPattern=r'g r(?P0x[0-9a-fA-F]+)[\r\n]', + replyPattern=r'v\s(-?\d+)', responseTemplate="v {value}\r"), + "TRAJECTORY": TextCommand(name="TRAJECTORY", text="t {mode}\n", + matchPattern=r't (?P\d+)[\r\n]', + replyPattern="ok", responseTemplate="ok\r"), + } def __init__(self, serialNumber): super().__init__(serialNumber=serialNumber, idVendor=self.classIdVendor, idProduct=self.classIdProduct) @@ -117,3 +129,23 @@ def doHome(self): if not self.isReferenced(): raise Exception("Homing failed") + + class DebugSerialPort(TableDrivenDebugPort): + def __init__(self): + super().__init__(commands=IntellidriveDevice.commands) + self.terminator = b'\r' + self.registers = {'0xc9': 0} + + def process_command(self, name, params, endPointIndex): + if name == 'SET_REGISTER': + self.registers[params["register"]] = int(params["value"]) + return "ok\r" + elif name == 'GET_REGISTER': + value = self.registers.get(params["register"], 0) + return {"value": str(value)} + elif name == 'TRAJECTORY': + mode = int(params["mode"]) + if mode == 2: # home + self.registers['0xca'] = 0 + self.registers['0xc9'] = (1 << 12) # referenced bit + return "ok\r" diff --git a/hardwarelibrary/motion/sutterdevice.py b/hardwarelibrary/motion/sutterdevice.py index 5625bf1..254cee8 100644 --- a/hardwarelibrary/motion/sutterdevice.py +++ b/hardwarelibrary/motion/sutterdevice.py @@ -4,7 +4,7 @@ from hardwarelibrary.communication.usbport import USBPort from hardwarelibrary.communication.serialport import SerialPort from hardwarelibrary.communication.commands import DataCommand -from hardwarelibrary.communication.debugport import DebugPort +from hardwarelibrary.communication.debugport import TableDrivenDebugPort import re import time @@ -16,6 +16,18 @@ class SutterDevice(LinearMotionDevice): classIdVendor = 4930 classIdProduct = 1 + commands = { + "MOVE": DataCommand(name="MOVE", prefix=b'M', requestFormat='\w+) (?P-?\d+)\r') + params = cmd.extractParams(b'SET foo 42\r') + self.assertIsInstance(params, dict) + self.assertEqual(params["key"], "foo") + self.assertEqual(params["value"], "42") + + def testNamedGroupFormatResponse(self): + cmd = TextCommand(name="test", text="GET {key}\r", + matchPattern=r'GET (?P\w+)\r', + responseTemplate="VAL {value}\r") + result = cmd.formatResponse({"value": "99"}) + self.assertEqual(result, bytearray(b'VAL 99\r')) + + def testEffectiveMatchPatternUsesExplicit(self): + cmd = TextCommand(name="test", text="SET {0}\r", + matchPattern=r'SET (\w+)\r') + self.assertEqual(cmd.effectiveMatchPattern, r'SET (\w+)\r') + + def testEffectiveMatchPatternUsesAuto(self): + cmd = TextCommand(name="test", text="SET {0}\r") + self.assertEqual(cmd.effectiveMatchPattern, cmd._autoMatchPattern) + + +class TestDataCommandRecognition(unittest.TestCase): + def testMatchesByPrefix(self): + cmd = DataCommand(name="test", prefix=b'M') + self.assertTrue(cmd.matches(b'M\x01\x02')) + + def testMatchesCaseInsensitive(self): + cmd = DataCommand(name="test", prefix=b'M') + self.assertTrue(cmd.matches(b'm\x01\x02')) + + def testPrefixDerivedFromData(self): + cmd = DataCommand(name="test", data=b'C\r') + self.assertTrue(cmd.matches(b'C')) + self.assertTrue(cmd.matches(b'c')) + + def testNoMatchWrongPrefix(self): + cmd = DataCommand(name="test", prefix=b'M') + self.assertFalse(cmd.matches(b'G\x01')) + + def testExtractParamsWithFormat(self): + cmd = DataCommand(name="test", prefix=b'S', requestFormat='