From 80857e9dcdf666f0a0f1afffef3b0797133c8d0b Mon Sep 17 00:00:00 2001 From: jan iversen Date: Fri, 13 Mar 2026 19:08:30 +0100 Subject: [PATCH 1/5] Update SimDevice co, di handling. --- API_changes.rst | 4 +- examples/contrib/test_datastores.py | 89 ++++++++++++++++++++++ pymodbus/simulator/simdata.py | 86 +++++++++++---------- pymodbus/simulator/simdevice.py | 113 ++++++++++++++------------- pymodbus/simulator/simruntime.py | 61 ++++++++------- pymodbus/simulator/simutils.py | 71 ++++++++++++----- test/simulator/test_simdata.py | 83 ++++++++++---------- test/simulator/test_simdevice.py | 114 ++++++++++++++-------------- test/simulator/test_simruntime.py | 25 +++--- test/simulator/test_simutils.py | 67 ++++++++++++++++ 10 files changed, 465 insertions(+), 248 deletions(-) create mode 100755 examples/contrib/test_datastores.py create mode 100644 test/simulator/test_simutils.py diff --git a/API_changes.rst b/API_changes.rst index 97208c721..a518ccf6e 100644 --- a/API_changes.rst +++ b/API_changes.rst @@ -1,6 +1,6 @@ API changes =========== -Versions (X.Y.Z) where Z > 0 e.g. 3.0.1 do NOT have API changes! +Versions (X.Y.Z) where Z == 0 e.g. 3.0.1 do NOT have API changes! API changes 3.13.0 ------------------ @@ -8,6 +8,8 @@ API changes 3.13.0 a proper forwarder should be made at frame level. - datastore get/setValues is removed, please use server.async_get/setValues instead. +- datastore show a deprecation warning +- SimData/SimDevice have been updated API changes 3.12.0 ------------------ diff --git a/examples/contrib/test_datastores.py b/examples/contrib/test_datastores.py new file mode 100755 index 000000000..7c16c54ed --- /dev/null +++ b/examples/contrib/test_datastores.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +"""Test datastores compatibility. + +Control that: + +- ModbusSequentialDataBlock +- ModbusSparseDataBlock + +Works as the used to work. + +Control that SimData/SimDevice: + +- works as intended in both shared and non shared mode. + +Focussing on coils and discrete inputs. +""" +import asyncio + +import pymodbus.client as modbusClient +from pymodbus import FramerType +from pymodbus.datastore import ( + ModbusDeviceContext, + ModbusSequentialDataBlock, + ModbusServerContext, +) +from pymodbus.logging import Log, pymodbus_apply_logging_config +from pymodbus.server import ServerAsyncStop, StartAsyncTcpServer +from pymodbus.transport import NULLMODEM_HOST + + +async def run_async(port, context, run_test): + """Run server setup.""" + Log.info("### start ASYNC server") + task = asyncio.create_task(StartAsyncTcpServer( + context=context, + address=(NULLMODEM_HOST, port), + framer=FramerType.SOCKET + )) + await asyncio.sleep(1) + + Log.info("### Create client object") + client = modbusClient.AsyncModbusTcpClient( + NULLMODEM_HOST, + port=5020, + framer=FramerType.SOCKET, + ) + await client.connect() + await run_test(client) + client.close() + await ServerAsyncStop() + task.cancel() + await task + + +async def run_sequential(port) -> None: + """Combine setup and run.""" + async def run_old_test(client): + """Run test.""" + Log.info("### read_coils") + single = [True] + [False] * 7 + register = [True] * 7 + [False] * 8 + [True] + rr = await client.read_coils(0, count=1, device_id=1) + assert rr.bits == single + rr = await client.read_coils(0, count=16, device_id=1) + assert rr.bits == register + rr = await client.read_discrete_inputs(0, count=1, device_id=1) + assert rr.bits == single + rr = await client.read_discrete_inputs(0, count=16, device_id=1) + assert rr.bits == register + + co_data = [17] * 7 + [0] * 8 + [1] + di_data = [True] * 7 + [False] * 8 + [True] + context = ModbusServerContext(devices=ModbusDeviceContext( + co=ModbusSequentialDataBlock(1, co_data), + di=ModbusSequentialDataBlock(1, di_data), + ), + single=True + ) + Log.info("Run sequential test.") + await run_async(port, context, run_old_test) + + +async def run_all(port): + """Run all tests.""" + pymodbus_apply_logging_config("info") + await run_sequential(port) + +if __name__ == "__main__": + asyncio.run(run_all(5020), debug=True) diff --git a/pymodbus/simulator/simdata.py b/pymodbus/simulator/simdata.py index cf196a01a..3a3d46c63 100644 --- a/pymodbus/simulator/simdata.py +++ b/pymodbus/simulator/simdata.py @@ -5,7 +5,6 @@ from dataclasses import dataclass from typing import TypeAlias, cast -from ..pdu.utils import pack_bitstring, unpack_bitstring from .simutils import DataType, SimUtils @@ -73,18 +72,25 @@ class SimData: SimData( address=100, values=0xffff, - datatype=DataType.BITS + datatype=DataType.REGISTERS ) SimData( address=100, values=[0xffff], - datatype=DataType.BITS + datatype=DataType.REGISTERS ) Each SimData defines 16 BITS (coils), with value True. - Value are stored in registers (16bit is 1 register), the address refers to the register, unless - in non-shared mode where the address refers to the coil. + Value are stored in registers (16bit is 1 register). + + In shared mode (coil and discrete inputs requests): + - address refers to the register, containing individual bits, + Individual bits within the register cannot be addressed, + unless "use_bit_as_address" is set on the device. + + In non-shared mode (coil and discrete inputs requests) + - address refers to the bit. """ #: Address of first register, starting with 0 (identical to the requests) @@ -111,6 +117,11 @@ class SimData: #: Used to check access and convert value to/from registers or mark as invalid. datatype: DataType = DataType.INVALID + #: String encoding + #: + #: Used to convert a SimData(DataType.STRING) to registers. + string_encoding: str = "utf-8" + #: Mark register(s) as readonly. readonly: bool = False @@ -129,6 +140,10 @@ def __check_simple(self): raise TypeError("values= cannot be used with invalid=True") if isinstance(self.values, list) and not self.values: raise TypeError("values= list cannot be empty") + try: + "test string".encode(self.string_encoding) + except (UnicodeEncodeError, LookupError) as exc: + raise TypeError("string_encoding= not valid") from exc def __check_parameters(self): """Check all parameters.""" @@ -136,8 +151,10 @@ def __check_parameters(self): x_values = self.values if isinstance(self.values, list) else [self.values] x_datatype, _x_struct, _x_len = SimUtils.DATATYPE_STRUCT[self.datatype] if self.datatype == DataType.BITS: - x_datatype = int if isinstance(x_values[0], int) else bool + x_datatype = bool if isinstance(x_values[0], bool) else int for x_value in x_values: + if self.datatype == DataType.BITS and x_datatype is int and isinstance(x_value, bool): + raise TypeError(f"values= {x_value} int and bool cannot be mixed") if not isinstance(x_value, x_datatype): raise TypeError(f"values= {x_value} is not {x_datatype!s}") if x_datatype is str and not x_value: @@ -147,60 +164,47 @@ def __post_init__(self): """Define a group of registers.""" self.__check_parameters() - def build_registers_bits_block(self) -> list[int]: + def build_registers_bits_block(self) -> list[bool]: """Convert values= to registers from bits (1 bit in each register).""" x_values = self.values if isinstance(self.values, list) else [self.values] - regs: list[int] = [] if isinstance(x_values[0], bool): - for v in x_values: - regs.append(1 if v else 0) - else: - for v in cast(list[int], x_values): - bool_list = unpack_bitstring(v.to_bytes(2, byteorder="big")) - for i in bool_list: - regs.append(1 if i else 0) - return regs - - def build_registers_bits_shared(self, endian: tuple[bool, bool]) -> list[int]: + return cast(list[bool], x_values) + return SimUtils.registersToBits(cast(list[int], x_values)) + + def build_registers_bits_shared(self) -> list[int]: """Convert values= to registers from bits (16 bits in each register).""" x_values = self.values if isinstance(self.values, list) else [self.values] - if isinstance(x_values[0], bool): - if len(x_values) % 16: - raise TypeError(f"SimData address={self.address} values= must be a multiple of 16") - bytes_bits = bytearray(pack_bitstring(cast(list[bool], x_values))) - else: - bytes_bits = bytearray() - for v in x_values: - bytes_bits.extend(struct.pack(">H", v)) - return SimUtils.convert_bytes_registers(bytes_bits, endian[0], endian[1], 1) - - def build_registers_string(self, endian: tuple[bool, bool], string_encoding: str) -> list[int]: + if not isinstance(x_values[0], bool): + return cast(list[int], x_values) + if len(x_values) % 16: + raise TypeError(f"SimData address={self.address} values= must be a multiple of 16") + return SimUtils.bitsToRegisters(cast(list[bool], x_values)) + + def build_registers_string(self) -> list[int]: """Convert values= to registers from string(s).""" x_values = self.values if isinstance(self.values, list) else [self.values] blocks_regs: list[int] = [] for value in x_values: - bytes_string = cast(str, value).encode(string_encoding) - if len(bytes_string) % 2: - bytes_string += b"\x00" - blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(bytes_string), endian[0], endian[1], 1)) + bytes_string = cast(str, value).encode(self.string_encoding) + regs = SimUtils.bytesToRegisters(bytes_string) + blocks_regs.extend(regs) return blocks_regs - def build_registers(self, endian: tuple[bool, bool], string_encoding: str, block_bits: bool) -> list[int]: + def build_registers(self, block_bits: bool) -> list[int] | list[bool]: """Convert values= to registers.""" self.__check_parameters() if self.datatype == DataType.STRING: - block_regs = self.build_registers_string(endian, string_encoding) - return block_regs * self.count + return self.build_registers_string() * self.count + if block_bits: + return self.build_registers_bits_block() * self.count if self.datatype == DataType.BITS: - if block_bits: - return self.build_registers_bits_block() - return self.build_registers_bits_shared(endian) + return self.build_registers_bits_shared() * self.count x_values = self.values if isinstance(self.values, list) else [self.values] - _x_datatype, x_struct, x_len = SimUtils.DATATYPE_STRUCT[self.datatype] + _x_datatype, x_struct, _x_len = SimUtils.DATATYPE_STRUCT[self.datatype] blocks_regs: list[int] = [] for v in x_values: byte_list = struct.pack(f">{x_struct}", v) - blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(byte_list), endian[0], endian[1], x_len)) + blocks_regs.extend(SimUtils.bytesToRegisters(byte_list)) return blocks_regs diff --git a/pymodbus/simulator/simdevice.py b/pymodbus/simulator/simdevice.py index acec682d5..8d836b86b 100644 --- a/pymodbus/simulator/simdevice.py +++ b/pymodbus/simulator/simdevice.py @@ -12,7 +12,7 @@ from .simutils import DataType, SimUtils -SimAction: TypeAlias = Callable[[int, int, list[int], list[int] | None], Awaitable[list[int] | None | ExcCodes]] +SimAction: TypeAlias = Callable[[int, int, int, int, list[int], list[int] | list[bool] | None], Awaitable[None | ExcCodes]] SimRegs: TypeAlias = tuple[int, list[int], list[int]] TUPLE_NAMES = ( "coils", @@ -63,28 +63,24 @@ class SimDevice: #: The tuple is defined as: #: (, , , ) #: - #: / have addressing calculated differently: - #: address register = address / 16 - #: to find the coil at address - #: count is number of coils, so registers returned are count +15 / 16. - #: #: ..tip:: addresses not defined are invalid and will produce an ExceptionResponse simdata: SimData | list[SimData] | tuple[list[SimData], list[SimData], list[SimData], list[SimData]] - #: Change endianness. - #: - #: Word order is not defined in the modbus standard and thus a device that - #: uses little-endian is still within the modbus standard. + #: Define coil/discrete input addressing in shared mode. #: - #: Byte order is defined in the modbus standard to be big-endian, - #: however it is definable to test non-standard modbus devices - #: - #: ..tip:: Content (word_order, byte_order), True means big-endian. - endian: tuple[bool, bool] = (True, True) - - #: String encoding + #: False, means the register is addressed, and single bits cannot be addressed. + #: True, means single bit is being addressed. + #: effictive address is register_address * 16 + bit_offset. #: - string_encoding: str = "utf-8" + #: Example: + #: SimData(200, value=True, datatype=DataType.BITS) + #: with use_bit_addressing=False: + #: read_coils(200) returns [True] + [False] * 7 + #: read_coils(200, count=16) returns [True] + [False] * 15 + #: with use_bit_addressing=True: + #: read_coils(200*16+15) returns [True] + [False] * 7 + #: read_coils(200*16, count=16) returns [False] * 15 + [True] + use_bit_addressing: bool | None = None #: Set device identity identity: ModbusDeviceIdentification | None = None @@ -96,21 +92,18 @@ class SimDevice: #: .. code-block:: python #: #: async def my_action( - #: function_code: int, - #: start_address: int, - #: current_registers: list[int], - #: new_registers: list[int] | None) -> list[int] | ExceptionResponse: - #: - #: return registers - #: or - #: return None - #: - #: action, is called with current registers and if write request also the new registers. - #: result updates registers and if read request returned to the client. - #: - #: new_registers is None for read requests. + #: function_code: int, # request function code + #: start_address: int, # address of current_registers[0] + #: address: int, # request address + #: count: int, # request count + #: current_registers: list[int], # current registers (modify inline) + #: set_values: list[int] | list[bool] | None # request values to be written (None for read requests) + # ) -> None | ExceptionResponse: #: - #: if return is None it indicates no change. + #: action can: + #: - update registers (affect the current and future responses) + #: - update set_values (affect the register update) + #: - return an ExceptionResponse. #: #: .. tip:: use functools.partial to add extra parameters if needed. action: SimAction | None = None @@ -123,25 +116,18 @@ def __check_simple(self): raise TypeError("identity= must be a ModbusDeviceIdentification") if self.action and not (callable(self.action) and inspect.iscoroutinefunction(self.action)): raise TypeError("action= must be a async function") - if not (isinstance(self.endian, tuple) - and len(self.endian) == 2 - and isinstance(self.endian[0], bool) - and isinstance(self.endian[1], bool) - ): - raise TypeError("endian= must be a tuple with 2 bool") - test_str = "test string" - try: - test_str.encode(self.string_encoding) - except (UnicodeEncodeError, LookupError) as exc: - raise TypeError("string_encoding= not valid") from exc def __check_simple2(self): """Check simple parameters.""" if isinstance(self.simdata, tuple): + if self.use_bit_addressing is None: + self.use_bit_addressing = True self.__check_simple_blocks() - if self.action: - raise TypeError("action= id only supported with shared blocks") + if not self.use_bit_addressing: + raise TypeError("use_bit_addressing=False is only supported with shared blocks") else: + if self.use_bit_addressing is None: + self.use_bit_addressing = False x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata] for inx, entry in enumerate(x_simdata): if not isinstance(entry, SimData): @@ -173,7 +159,7 @@ def __check_block_entries(self, last_address: int, entry: SimData, use_bits: boo """Check block entries.""" if entry.address <= last_address: raise TypeError(f"SimData address {entry.address} is overlapping!") - blocks_regs = entry.build_registers(self.endian, self.string_encoding, use_bits) * entry.count + blocks_regs = entry.build_registers(use_bits) * entry.count return last_address + len(blocks_regs) def __check_parameters(self): @@ -201,7 +187,7 @@ def __build_flags(self, simdata: SimData) -> int: def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int], use_bits: bool): """Build registers for single SimData.""" flag_normal = self.__build_flags(simdata) - blocks_regs = simdata.build_registers(self.endian, self.string_encoding, use_bits) + blocks_regs = simdata.build_registers(use_bits) for _ in range(simdata.count): first = True for register in blocks_regs: @@ -212,7 +198,7 @@ def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: li flag_list.append(flag_normal & ~SimUtils.RunTimeFlag_TYPE) reg_list.append(register) - def __create_block(self, simdata: list[SimData], use_bits: bool) -> SimRegs: + def __create_block(self, simdata: list[SimData]) -> SimRegs: """Create registers for device.""" flag_list: list[int] = [] reg_list: list[int] = [] @@ -223,23 +209,48 @@ def __create_block(self, simdata: list[SimData], use_bits: bool) -> SimRegs: flag_list.append(DataType.INVALID) reg_list.append(0) next_address += 1 - self.__create_simdata(entry, flag_list, reg_list, use_bits) + self.__create_simdata(entry, flag_list, reg_list, False) flag_list.append(DataType.INVALID) reg_list.append(0) return (start_address, reg_list, flag_list) + def __create_block_bits(self, simdata: list[SimData]) -> SimRegs: + """Create registers for device.""" + bit_list: list[bool] = [] + start_address = simdata[0].address + if (offset := start_address % 16): + bit_list.extend([False] * offset) + start_address -= offset + for entry in simdata: + if (next_address := start_address + len(bit_list)) < entry.address: + bit_list.extend([False] * (entry.address - next_address)) + next_address = start_address + len(bit_list) + entry_bits = entry.build_registers(True) + bit_list.extend(cast(list[bool], entry_bits)) + if (remains := len(bit_list) % 16): + bit_list.extend([False] * (16 - remains)) + + flag_list: list[int] = [DataType.BITS] + [0] * (int(len(bit_list) / 16) -1) + [DataType.INVALID] + bit_list.extend([False] * (len(bit_list) % 16)) + reg_list = SimUtils.bitsToRegisters(bit_list) + reg_list.append(0) + return (int(start_address / 16), reg_list, flag_list) + def build_device(self) -> SimRegs | dict[str, SimRegs]: """Check simdata and built runtime structure.""" self.__check_parameters() if not isinstance(self.simdata, tuple): x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata] x_simdata.sort(key=lambda x: x.address) - return self.__create_block(x_simdata, False) + return self.__create_block(x_simdata) b: dict[str, SimRegs] = {} # (, , , ) convert = {0: "c", 1: "d", 2: "h", 3: "i"} for i in range(4): x_simdata = cast(tuple, self.simdata)[i] x_simdata.sort(key=lambda x: x.address) - b[convert[i]] = self.__create_block(x_simdata, (i in {0,1})) + if i in {0,1}: + b[convert[i]] = self.__create_block_bits(x_simdata) + else: + b[convert[i]] = self.__create_block(x_simdata) return b diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py index fe82f3fcd..3286eaca4 100644 --- a/pymodbus/simulator/simruntime.py +++ b/pymodbus/simulator/simruntime.py @@ -12,21 +12,20 @@ class SimRuntime: """Memory setup for device.""" - _fx_mapper = {2: "d", # Direct input - 4: "i"} # Input registers - _fx_mapper.update([(i, "h") + _fx_mapper = {2: "d", 4: "i"} # Direct input and Input registers + _fx_mapper.update([(i, "h") # Holding registers for i in (3, 6, 16, 22, 23)]) - _fx_mapper.update([(i, "c") + _fx_mapper.update([(i, "c") # Coils for i in (1, 5, 15)]) def __init__(self, device: SimDevice): """Build device memory.""" self.action = device.action + self.use_bit_addressing = device.use_bit_addressing build = device.build_device() self.block: dict[str, tuple[int, int, list[int], list[int]]] = {} if not isinstance(build, dict): - self.shared = True self.block["x"] = ( build[0], len(build[2]), @@ -34,27 +33,14 @@ def __init__(self, device: SimDevice): build[2], ) return - self.shared = False - self.block = {} for i in ("c", "d", "h", "i"): - x = build[i] - self.block[i] = (x[0], len(x[1]), x[1], x[2]) + self.block[i] = (build[i][0], len(build[i][1]), build[i][1], build[i][2]) - async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: - """Calculate offset.""" - block_id = "x" if self.shared else self._fx_mapper[func_code] - start_address, register_count, registers, flags = self.block[block_id] - offset = address - start_address - if values: - count = len(values) - if address > start_address + register_count or address < start_address or offset + count > register_count: - return ExcCodes.ILLEGAL_ADDRESS - if self.action: - result = await self.action(func_code, address, registers, None) - if isinstance(result, ExcCodes): - return result - if result: - values = result + async def __check_block(self, func_code: int, block_id: str, address: int, count: int, offset: int, values: list[int] | list[bool] | None) -> ExcCodes | None: + """Check block request.""" + start_address, _, registers, flags = self.block[block_id] + if self.action and (result := await self.action(func_code, start_address, address, count, registers, values)): + return result for i in range(count): addr = offset + i if flags[addr] & SimUtils.RunTimeFlag_TYPE == DataType.INVALID: @@ -63,6 +49,30 @@ async def get_block(self, func_code: int, address: int, count: int, values: list if flags[addr] & SimUtils.RunTimeFlag_READONLY: return ExcCodes.ILLEGAL_ADDRESS registers[addr] = values[i] + return None + + async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: + """Calculate offset.""" + fc_block = self._fx_mapper.get(func_code, "x") + block_id = "x" if "x" in self.block else fc_block + use_bits = fc_block in {"c", "d"} and self.use_bit_addressing + start_address, register_count, registers, _ = self.block[block_id] + if use_bits: + offset = int(address / 16) - start_address + effective_count = int(count / 16) + 1 + else: + offset = address - start_address + effective_count = count + if register_count <= offset < 0 or offset + effective_count > register_count: + return ExcCodes.ILLEGAL_ADDRESS + if (result := await self.__check_block(func_code, block_id, address, count, offset, values)): + return result + if fc_block in {"c", "d"}: + list_bools = SimUtils.registersToBits(registers[offset:offset+count]) + if not use_bits: + return list_bools + bit_offset = address % 16 + return list_bools[bit_offset:bit_offset+count] return registers[offset:offset+count] async def async_getValues(self, func_code: int, address: int, count: int) -> list[int] | list[bool] | ExcCodes: @@ -71,7 +81,6 @@ async def async_getValues(self, func_code: int, address: int, count: int) -> lis async def async_setValues(self, func_code: int, address: int, values: list[int] | list[bool] ) -> None | ExcCodes: """Set the datastore with the supplied values.""" - count = len(values) - block = await self.get_block(func_code, address, count, values) + block = await self.get_block(func_code, address, len(values), values) return block if isinstance(block, ExcCodes) else None diff --git a/pymodbus/simulator/simutils.py b/pymodbus/simulator/simutils.py index e01c82a4a..a715f5afb 100644 --- a/pymodbus/simulator/simutils.py +++ b/pymodbus/simulator/simutils.py @@ -48,7 +48,7 @@ class DataType(enum.IntEnum): #: Registers == 2 bytes (identical to UINT16) REGISTERS = enum.auto() -class SimUtils: # pylint: disable=too-few-public-methods +class SimUtils: """Define common set of utilitites.""" DATATYPE_STRUCT: dict[DataType, tuple[type, str, int]] = { @@ -69,24 +69,53 @@ class SimUtils: # pylint: disable=too-few-public-methods RunTimeFlag_TYPE = 2**4 -1 # Isolate number of registers RunTimeFlag_READONLY = 2**4 # only read is allowed + + def __init__(self): + """Ensure that class is not instantiated.""" + raise RuntimeError("SimUtils may not be instantiated.") + + @classmethod + def registersToBits(cls, registers: list[int]) -> list[bool]: + """Convert list of registers to list of bool (bit 0 first).""" + bits: list[bool] = [] + for entry in registers: + bit_str = format(entry, '016b') + new_bits = [] + for i in bit_str: + new_bits.append(i == "1") + new_bits.reverse() + bits.extend(new_bits) + return bits + + @classmethod + def bitsToRegisters(cls, bits: list[bool]) -> list[int]: + """Convert list of bits to registers (bit 0 first, divided in 16bits).""" + bit_len = len(bits) + if bit_len % 16: + raise TypeError("bits must be a multiple of 16") + registers = [] + for i in range(int(bit_len / 16)): + offset = i*16 + reg = 0 + for i, x in enumerate(bits[offset:offset+16]): + if x: + reg += 1 << i + registers.append(reg) + return registers + + @classmethod + def mergeBitsToRegisters(cls, bit_offset: int, registers: list[int], bits: list[bool]) -> None: + """Merge list of bits into registers in place.""" + new_bits = cls.registersToBits(registers) + new_bits[bit_offset:bit_offset+len(bits)] = bits + registers[0:] = cls.bitsToRegisters(new_bits) + @classmethod - def convert_bytes_registers(cls, byte_list: bytearray, word_order: bool, byte_order: bool, data_type_len: int) -> list[int]: - """Convert bytearray to registers.""" - if byte_order: - regs = [ - int.from_bytes(byte_list[x : x + 2], "big") - for x in range(0, len(byte_list), 2) - ] - else: - regs = [ - int.from_bytes([byte_list[x+1], byte_list[x]], "big") - for x in range(0, len(byte_list), 2) - ] - if word_order: - return regs - reversed_regs: list[int] = [] - for x in range(0, len(regs), data_type_len): - single_value_regs = regs[x: x + data_type_len] - single_value_regs.reverse() - reversed_regs = reversed_regs + single_value_regs - return reversed_regs + def bytesToRegisters(cls, byte_list: bytes) -> list[int]: + """Convert bytes into registers.""" + if len(byte_list) % 2: + byte_list += b"\x00" + return[ + int.from_bytes(byte_list[x : x + 2], "big") + for x in range(0, len(byte_list), 2) + ] diff --git a/test/simulator/test_simdata.py b/test/simulator/test_simdata.py index 1b2b73767..6acabae8e 100644 --- a/test/simulator/test_simdata.py +++ b/test/simulator/test_simdata.py @@ -20,6 +20,7 @@ class TestSimData: {"address": 6, "count": 10, "datatype": DataType.INT16, "values": [17, 18]}, {"address": 7, "datatype": DataType.STRING, "values": "test"}, {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test"}, + {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test", "string_encoding": "utf-8"}, {"address": 0, "datatype": DataType.REGISTERS, "values": 17, "count": 5}, {"address": 3, "datatype": DataType.INT16, "values": 17, "readonly": True}, {"address": 0, "count": 2^16 -1}, @@ -45,6 +46,7 @@ def test_simdata_instanciate(self, kwargs): {"address": 1, "datatype": "not ok"}, {"address": 1, "datatype": 11}, {"address": 2, "values": 17}, + {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test", "string_encoding": "not ok"}, ]) def test_simdata_not_ok(self, kwargs): """Test that simdata can be objects.""" @@ -80,6 +82,8 @@ def test_simdata_value_ok(self, value, value_type): ([11, 12.0], DataType.REGISTERS), (1.0, DataType.BITS), ([True, 1.0], DataType.BITS), + ([True, 1], DataType.BITS), + ([1, True], DataType.BITS), ]) def test_simdata_value_not_ok(self, value, value_type): """Test simdata value.""" @@ -92,39 +96,36 @@ def test_simdata_value_invalid(self, values): with pytest.raises(TypeError): SimData(0, values=values, datatype=DataType.INVALID) - @pytest.mark.parametrize(("value", "value_type", "order", "regs"), [ - # test word order - (-27123, DataType.INT16, (True, True), [0x960D]), - ([-27123, 27123], DataType.INT16, (True, True), [0x960D, 0x69F3]), - ([-27123, 27123], DataType.INT16, (False, True), [0x960D, 0x69F3]), - ([32145678, -32145678], DataType.INT32, (True, True), [0x01EA, 0x810E, 0xFE15, 0x7EF2]), - ([32145678, -32145678], DataType.INT32, (False, True), [0x810E, 0x01EA, 0x7EF2, 0xFE15]), - # test byte order - ([-27123, 27123], DataType.REGISTERS, (True, False), [0x0D96, 0xF369]), - ([32145678, -32145678], DataType.INT32, (True, False), [0xEA01, 0x0E81, 0x15FE, 0xF27E]), - # test data types - (27123, DataType.REGISTERS, (True, True), [0x69F3]), - (-27124, DataType.INT16, (True, True), [0x960C]), - (27123, DataType.UINT16, (True, True), [0x69F3]), - (-32145678, DataType.INT32, (True, True), [0xFE15, 0x7EF2]), - (32145678, DataType.UINT32, (True, True), [0x01EA, 0x810E]), - (-1234567890123456789, DataType.INT64, (True, True), [0xEEDD, 0xEF0B, 0x8216, 0x7EEB]), - (1234567890123456789, DataType.UINT64, (True, True), [0x1122, 0x10F4, 0x7DE9, 0x8115]), - (27123.5, DataType.FLOAT32, (True, True), [0x46D3, 0xE700]), - (3.141592, DataType.FLOAT32, (True, True), [0x4049, 0x0FD8]), - (-3.141592, DataType.FLOAT32, (True, True), [0xC049, 0x0FD8]), - (27123.5, DataType.FLOAT64, (True, True), [0x40DA, 0x7CE0, 0x0000, 0x0000]), - (3.14159265358979, DataType.FLOAT64, (True, True), [0x4009, 0x21FB, 0x5444, 0x2D11]), - (-3.14159265358979, DataType.FLOAT64, (True, True), [0xC009, 0x21FB, 0x5444, 0x2D11]), - ([True] + [False] * 15, DataType.BITS, (True, True), [256]), - (0x0100, DataType.BITS, (True, True), [256]), - ([0x0100, 0x0001], DataType.BITS, (True, True), [256, 1]), - + @pytest.mark.parametrize(("value", "value_type", "regs"), [ + (27123, DataType.INT16, [27123]), + (-27123, DataType.INT16, [0x960d]), + ([-27123, 27123], DataType.INT16, [0x960D, 0x69F3]), + ([32145678, -32145678], DataType.INT32, [0x01EA, 0x810E, 0xFE15, 0x7EF2]), + (27123, DataType.REGISTERS, [0x69F3]), + (-27124, DataType.INT16, [0x960C]), + (27123, DataType.UINT16, [0x69F3]), + (-32145678, DataType.INT32, [0xFE15, 0x7EF2]), + (32145678, DataType.UINT32, [0x01EA, 0x810E]), + (-1234567890123456789, DataType.INT64, [0xEEDD, 0xEF0B, 0x8216, 0x7EEB]), + (1234567890123456789, DataType.UINT64, [0x1122, 0x10F4, 0x7DE9, 0x8115]), + (27123.5, DataType.FLOAT32, [0x46D3, 0xE700]), + (3.141592, DataType.FLOAT32, [0x4049, 0x0FD8]), + (-3.141592, DataType.FLOAT32, [0xC049, 0x0FD8]), + (27123.5, DataType.FLOAT64, [0x40DA, 0x7CE0, 0x0000, 0x0000]), + (3.14159265358979, DataType.FLOAT64, [0x4009, 0x21FB, 0x5444, 0x2D11]), + (-3.14159265358979, DataType.FLOAT64, [0xC009, 0x21FB, 0x5444, 0x2D11]), + (0x0100, DataType.BITS, [256]), + (123, DataType.BITS, [123]), + ([123], DataType.BITS, [123]), + ([0x0100, 0x0001], DataType.BITS, [256, 1]), + ([True] + [False] * 15, DataType.BITS, [1]), + ([True] + [False] * 8 + [True] + [False] * 6, DataType.BITS, [513]), + ([True] + [False] * 15 + [False] * 8 + [True] + [False] * 7, DataType.BITS, [1, 256]), ]) - def test_simdata_build_registers(self, value, value_type, order, regs): + def test_simdata_build_registers(self, value, value_type, regs): """Test simdata value.""" sd = SimData(0, values=value, datatype=value_type) - build_regs = sd.build_registers((order[0], order[1]), "utf-8", False) + build_regs = sd.build_registers(False) assert build_regs == regs @pytest.mark.parametrize(("value", "code", "expect"), [ @@ -135,34 +136,36 @@ def test_simdata_build_registers(self, value, value_type, order, regs): ]) def test_simdata_build_string(self, value, code, expect): """Test simdata value.""" - sd = SimData(0, values=value, datatype=DataType.STRING) - build_regs = sd.build_registers((True, True), code, False) + sd = SimData(0, values=value, datatype=DataType.STRING, string_encoding=code) + build_regs = sd.build_registers(False) assert build_regs == expect @pytest.mark.parametrize(("value", "regs"), [ - ([True, False, True], [1, 0, 1]), - (0x0100, [1] + [0]*15), - ([256, 1], [1] + [0]*23 + [1] + [0]*7), + ([True, False, True], [True, False, True]), + ([True] + [False] * 14 + [True, False], [True] + [False] * 14 + [True, False]), + (0x0001, [True] + [False]*15), + (0x0100, [False]*8 + [True] + [False]*7), + ([256, 1], [False]*8 + [True] + [False]*7 + [True] + [False]*15), ]) def test_simdata_build_bit_block(self, value, regs): """Test simdata value.""" sd = SimData(0, values=value, datatype=DataType.BITS) - build_regs = sd.build_registers((True, True), "utf-8", True) + build_regs = sd.build_registers(True) assert build_regs == regs def test_simdata_build_updated_simdata(self): """Test simdata value.""" sd = SimData(0, values="ABC", datatype=DataType.STRING) - build_regs = sd.build_registers((True, True), "utf-8", False) + build_regs = sd.build_registers(False) assert build_regs == [0x4142, 0x4300] sd.values="ABCDEF" - build_regs = sd.build_registers((True, True), "utf-8", False) + build_regs = sd.build_registers(False) assert build_regs == [0x4142, 0x4344, 0x4546] sd.values=123 with pytest.raises(TypeError): - sd.build_registers((True, True), "utf-8", False) + sd.build_registers(False) sd = SimData(0, values=[True, True], datatype=DataType.BITS) with pytest.raises(TypeError): - sd.build_registers((True, True), "utf-8", False) + sd.build_registers(False) diff --git a/test/simulator/test_simdevice.py b/test/simulator/test_simdevice.py index 1eca9d042..a486ec581 100644 --- a/test/simulator/test_simdevice.py +++ b/test/simulator/test_simdevice.py @@ -15,34 +15,34 @@ class TestSimDevice: async def my_action( self, _function_code, + _start_address, _address, - current_registers, - _new_registers + _count, + _current_registers, + _set_values ): """Run action.""" - return current_registers def my_sync_action( self, _function_code, + _start_address, _address, - current_registers, - _new_registers + _count, + _current_registers, + _set_values ): """Run action.""" - return current_registers simdata1 = SimData(0, datatype=DataType.INT16, values=15) simdata2 = SimData(1, datatype=DataType.INT16, values=16) - simdata3 = SimData(1, datatype=DataType.BITS, values=16) + simdata3 = SimData(10, datatype=DataType.BITS, values=16) @pytest.mark.parametrize("kwargs", [ - {"id": 0, "simdata": [SimData(2, datatype=DataType.STRING, values="test")], "string_encoding": "utf-8"}, + {"id": 0, "simdata": [SimData(2, datatype=DataType.STRING, values="test")]}, {"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], [simdata3])}, {"id": 0, "simdata": simdata2}, {"id": 0, "simdata": [simdata2, simdata1]}, - {"id": 0, "simdata": [simdata1], "endian": (False, True)}, - {"id": 0, "simdata": [simdata1], "endian": (True, False)}, {"id": 0, "simdata": [simdata1], "identity": ModbusDeviceIdentification()}, ]) def test_simdevice_instanciate(self, kwargs): @@ -59,7 +59,6 @@ def test_simdevice_instanciate(self, kwargs): {"id": 0, "simdata": ([simdata1], [simdata3], [simdata1], [simdata1])}, {"id": 0, "simdata": ([simdata3], [simdata1], [simdata1], [simdata1])}, {"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], "not ok")}, - {"id": 0, "simdata": [simdata1], "string_encoding": "not ok"}, {"id": "not ok", "simdata": [simdata1]}, {"id": 1.0, "simdata": [simdata1]}, {"id": 1, "simdata": [simdata1, simdata1]}, @@ -79,7 +78,7 @@ def test_simdevice_not_ok(self, kwargs): ([SimData(0, values=[0xffff], datatype=DataType.BITS)], 0), ([SimData(0, values=[True]*16, datatype=DataType.BITS)], 0), ([SimData(0, values="hello", datatype=DataType.STRING)], 0), - (SimData(0), 0), + # (SimData(0), 0), ("no valid", 2), (["no valid"], 2), ]) @@ -91,33 +90,25 @@ def test_simdevice_block(self, block, expect): with pytest.raises(TypeError): SimDevice(id=0, simdata=block) - @pytest.mark.parametrize(("endian", "expect"), [ - ("not ok", 1), - (None, 1), - (["not ok"], 1), - (("not ok"), 1), # noqa: PT014 - (("not ok", "not_ok"), 1), - ((True, False), 0), - ]) - def test_simdevice_endian(self, endian, expect): - """Test offset.""" - if expect: - with pytest.raises(TypeError): - SimDevice(id=0, simdata=[self.simdata1], endian=endian) - else: - SimDevice(id=0, simdata=[self.simdata1], endian=endian) - async def test_simdevice_action(self): """Test action.""" - await self.my_action(0, 0, [], None) - self.my_sync_action(0, 0, [], None) + await self.my_action(0, 0, 0, 0, [], None) + self.my_sync_action(0, 0, 0, 0, [], None) SimDevice(1, simdata=[SimData(1)], action=self.my_action) + SimDevice(0, simdata=([self.simdata3], [self.simdata3], [self.simdata1], [self.simdata3]), action=self.my_action) with pytest.raises(TypeError): SimDevice(1, simdata=[SimData(1)], action=self.my_sync_action) with pytest.raises(TypeError): SimDevice(1, simdata=[SimData(1)], action="no good") # type: ignore[arg-type] + + async def test_simdevice_bit_addressing(self): + """Test action.""" + sdblock = ([self.simdata3], [self.simdata3], [self.simdata1], [self.simdata3]) + SimDevice(1, simdata=[SimData(1)], use_bit_addressing=False) + SimDevice(1, simdata=[SimData(1)], use_bit_addressing=True) + SimDevice(1, simdata=sdblock, use_bit_addressing=True) with pytest.raises(TypeError): - SimDevice(0, simdata=([self.simdata3], [self.simdata3], [self.simdata1], [self.simdata3]), action=self.my_action), + SimDevice(1, simdata=sdblock, use_bit_addressing=False) @pytest.mark.parametrize(("block", "result"), [ ([SimData(2, values=125, datatype=DataType.REGISTERS), SimData(1, values=123, datatype=DataType.REGISTERS),], @@ -138,6 +129,12 @@ async def test_simdevice_action(self): ([SimData(0, values=[0xffff, 0xffff], datatype=DataType.BITS)], (0, [65535, 65535, 0], [DataType.BITS, 0, DataType.INVALID])), + ([SimData(0, values=[True] * 2 + [False]*6 + [False]*8, datatype=DataType.BITS)], + (0, [3, 0], + [DataType.BITS, DataType.INVALID])), + ([SimData(0, values=[True] * 2 + [False]*6 + [True] + [False]*6 + [True], datatype=DataType.BITS)], + (0, [33027, 0], + [DataType.BITS, DataType.INVALID])), ([SimData(1, values=123, datatype=DataType.INT16), SimData(3, values=456, datatype=DataType.INT16)], (1, [123, 0, 456, 0], @@ -189,37 +186,44 @@ def test_simdevice_build(self, block, result): def test_simdevice_build_blocks(self): """Test build_device() ok.""" block = ( - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.INT16)], - [SimData(1, values=123, datatype=DataType.INT16)]) + [SimData(0, values=123, datatype=DataType.BITS)], + [SimData(0, values=123, datatype=DataType.BITS)], + [SimData(0, values=123, datatype=DataType.INT16)], + [SimData(0, values=123, datatype=DataType.INT16)]) result = { - "c": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]), - "d": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]), - "h": (1, [123, 0], [DataType.INT16, DataType.INVALID]), - "i": (1, [123,0], [DataType.INT16, DataType.INVALID]) + "c": (0, [123, 0], [DataType.BITS, DataType.INVALID]), + "d": (0, [123, 0], [DataType.BITS, DataType.INVALID]), + "h": (0, [123, 0], [DataType.INT16, DataType.INVALID]), + "i": (0, [123,0], [DataType.INT16, DataType.INVALID]) } sd = SimDevice(id=1, simdata=block) lists = sd.build_device() assert lists == result - def test_simdevice_build_bits(self): + @pytest.mark.parametrize(("block", "registers", "addr"), [ + ([SimData(0, values=123, datatype=DataType.BITS)], [0x007b, 0x0000], 0), + ([SimData(0, values=True, datatype=DataType.BITS)], [0x0001, 0x0000], 0), + ([SimData(1, values=123, datatype=DataType.BITS)], [0x00F6, 0x0000, 0x0000], 0), + ([SimData(0, values=[True] + [False]*7, datatype=DataType.BITS), + SimData(9, values=[True], datatype=DataType.BITS)], [0x0201, 0x0000], 0), + ([SimData(0, values=[True] + [False]*7, datatype=DataType.BITS), + SimData(10, values=[True], datatype=DataType.BITS)], [0x0401, 0x0000], 0), + ([SimData(16, values=123, datatype=DataType.BITS)], [0x007b, 0x0000], 1), + ([SimData(15, values=123, datatype=DataType.BITS)], [0x8000, 0x003d, 0x0000], 0), + ]) + def test_simdevice_build_bits(self, block, registers, addr): """Test build_device() ok.""" - sd = SimDevice(id=1, simdata=SimData(1, values=123, datatype=DataType.BITS)) - result_shared = cast(SimRegs, sd.build_device()) - assert len(result_shared[1]) == 2 - - sd = SimDevice(id=1, simdata= ( - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.INT16)], - [SimData(1, values=123, datatype=DataType.INT16)] - )) - result_block = cast(dict[str, SimRegs], sd.build_device()) - assert len(result_block["c"][1]) == 17 - assert len(result_block["d"][1]) == 17 - assert len(result_block["h"][1]) == 2 - assert len(result_block["i"][1]) == 2 + sim123 = [SimData(addr, values=123, datatype=DataType.INT16)] + sd = SimDevice(id=1, simdata=(block, block, sim123, sim123)) + reg_len = len(registers) - 2 + result = { + "c": (addr, registers, [DataType.BITS] + [0] * reg_len + [DataType.INVALID]), + "d": (addr, registers, [DataType.BITS] + [0] * reg_len + [DataType.INVALID]), + "h": (addr, [123, 0], [DataType.INT16, DataType.INVALID]), + "i": (addr, [123, 0], [DataType.INT16, DataType.INVALID]) + } + lists = cast(dict[str, SimRegs], sd.build_device()) + assert lists == result @pytest.mark.parametrize("count", range(1,4)) @pytest.mark.parametrize("data_count", range(1,4)) diff --git a/test/simulator/test_simruntime.py b/test/simulator/test_simruntime.py index 474d7ba2b..43077bd00 100644 --- a/test/simulator/test_simruntime.py +++ b/test/simulator/test_simruntime.py @@ -14,17 +14,16 @@ class TestSimRuntime: async def my_action( self, function_code, + _start_address, _address, - current_registers, - _new_registers - ): + _count, + _current_registers, + _set_values + ): """Run action.""" - if function_code == 3: - return current_registers if function_code == 4: return ExcCodes.ILLEGAL_ADDRESS # function_code == 5: - return None @pytest.mark.parametrize("kwargs", [ {"id": 0, "simdata": ([SimData(0, datatype=DataType.BITS, values=15)], @@ -39,11 +38,11 @@ def test_simruntime_instanciate(self, kwargs): SimRuntime(sd) @pytest.mark.parametrize(("args", "expect"), [ - ((3, 1, 1, None), -1), + ((3, 1, 1, None), 1), ((3, 200, 1, None), -1), ((3, 15, 200, None), -1), ((3, 15, 2, None), 2), - ((3, 19, 1, [1, 2 , 3]), -1), + # ((3, 19, 1, [1, 2 , 3]), -1), ((3, 19, 1, [1]), 1), ((3, 10, 2, None), -1), ((3, 10, 1, [1]), -1), @@ -65,8 +64,8 @@ async def test_simruntime_block(self, args, expect): @pytest.mark.parametrize(("args", "expect"), [ ((3, 10, 1, [1]), [1, 0, 0, 0, 0, 0]), - ((3, 11, 1, [1, 2, 3]), [0, 1, 2, 3, 0, 0]), - ((3, 12, 1, [1, 2 , 3]), [0, 0, 1, 2, 3, 0]), + # ((3, 11, 1, [1, 2, 3]), [0, 1, 2, 3, 0, 0]), + # ((3, 12, 1, [1, 2 , 3]), [0, 0, 1, 2, 3, 0]), ]) async def test_simruntime_block_set(self, args, expect): """Test that simdata can be objects.""" @@ -80,9 +79,9 @@ async def test_simruntime_block_set(self, args, expect): assert rt.block["x"][2] == expect @pytest.mark.parametrize(("args", "expect"), [ - ((3, 15, 2, None), 2), + ((3, 15, 2, None), [0, 0]), ((4, 15, 2, None), -1), - ((5, 15, 2, None), 2), + ((5, 15, 2, None), [False]*32), ]) async def test_simruntime_action(self, args, expect): """Test that simdata can be objects.""" @@ -96,7 +95,7 @@ async def test_simruntime_action(self, args, expect): if expect == -1: assert isinstance(ret, ExcCodes) else: - assert len(cast(list[int], ret)) == expect + assert ret == expect async def test_simruntime_getValues(self): """Test that simdata can be objects.""" diff --git a/test/simulator/test_simutils.py b/test/simulator/test_simutils.py new file mode 100644 index 000000000..a8f79910a --- /dev/null +++ b/test/simulator/test_simutils.py @@ -0,0 +1,67 @@ +"""Test SimData.""" +import pytest + +from pymodbus.simulator.simutils import SimUtils + + +class TestSimUtils: + """Test simulator utilities.""" + + def test_simutils_instanciate(self): + """Test that simdata cannot be a objects.""" + with pytest.raises(RuntimeError): + SimUtils() + + @pytest.mark.parametrize(("bits", "registers"), [ + ([True] + [False] * 8 + [True] + [False]*6, [513]), + ([True] + [False] * 8 + [True] + [False]*6 + [False] + [True] + [False] * 6 + [True] + [False]*7, [513, 258]), + ]) + def test_simutils_bitsToRegisters(self, bits, registers): + """Test convert list[bool] to list[int].""" + assert registers == SimUtils.bitsToRegisters(bits) + + @pytest.mark.parametrize(("bits"), [ + ([True] + [False] * 8 + [True] + [False]*5), + ([True] + [False] * 8 + [True] + [False]*6 + [False] + [True] + [False] * 6 + [True] + [False]*6), + ]) + def test_simutils_bitsToRegisters_not_ok(self, bits): + """Test convert list[bool] to list[int].""" + with pytest.raises(TypeError): + SimUtils.bitsToRegisters(bits) + + @pytest.mark.parametrize(("bits", "registers"), [ + ([True] + [False] * 8 + [True] + [False]*6, [513]), + ([True] + [False] * 8 + [True] + [False]*6 + [False] + [True] + [False] * 6 + [True] + [False]*7, [513, 258]), + ]) + def test_simutils_registersToBits(self, bits, registers): + """Test convert list[bool] to list[int].""" + new_bits = SimUtils.registersToBits(registers) + assert bits == new_bits + + + @pytest.mark.parametrize(("registers", "offset", "bits", "expect"), [ + ([513], 0, [], [513]), + ([513], 0, [False], [512]), + ([513], 1, [False], [513]), + ([513], 0, [True, True], [515]), + ([513], 1, [True], [515]), + ([513], 15, [True], [33281]), + ([513, 0], 15, [True, True], [33281, 1]), + ([513, 0], 15, [True, False], [33281, 0]), + ]) + def test_simutils_mergeBitsToRegisters(self, registers, offset, bits, expect): + """Test convert list[bool] to list[int].""" + temp_regs = registers.copy() + SimUtils.mergeBitsToRegisters(offset, temp_regs, bits) + assert temp_regs == expect + + + @pytest.mark.parametrize(("byte_list", "expect"), [ + (b'\x01\x01', [257]), + (b'\x01\x02', [258]), + (b'\x01\x01\x01\x02', [257, 258]), + ]) + def test_simutils_bytesToRegisters(self, byte_list, expect): + """Test convert list[bool] to list[int].""" + regs = SimUtils.bytesToRegisters(byte_list) + assert regs == expect From d85bda21526b76e97e2f155ee1b05eb9ba08e5b5 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Sat, 21 Mar 2026 17:47:51 +0100 Subject: [PATCH 2/5] simRuntime. --- pymodbus/simulator/simruntime.py | 51 ++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py index 3286eaca4..1167f83b4 100644 --- a/pymodbus/simulator/simruntime.py +++ b/pymodbus/simulator/simruntime.py @@ -4,6 +4,8 @@ """ from __future__ import annotations +from typing import cast + from ..constants import ExcCodes from .simdevice import SimDevice from .simutils import DataType, SimUtils @@ -48,33 +50,44 @@ async def __check_block(self, func_code: int, block_id: str, address: int, count if values: if flags[addr] & SimUtils.RunTimeFlag_READONLY: return ExcCodes.ILLEGAL_ADDRESS - registers[addr] = values[i] return None - async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: - """Calculate offset.""" - fc_block = self._fx_mapper.get(func_code, "x") - block_id = "x" if "x" in self.block else fc_block - use_bits = fc_block in {"c", "d"} and self.use_bit_addressing + async def get_bit_block(self, block_id: str, func_code: int, address: int, count: int, values: list[bool] | None) -> list[bool] | ExcCodes: + """Handle coils and discrete input.""" + start_address, register_count, registers, _ = self.block[block_id] + offset = (int(address / 16) if self.use_bit_addressing else address) - start_address + reg_count = int(count / 16) + 1 + if register_count <= offset < 0 or offset + reg_count > register_count: + return ExcCodes.ILLEGAL_ADDRESS + if (result := await self.__check_block(func_code, block_id, address, reg_count, offset, values)): + return result + list_bools = SimUtils.registersToBits(registers[offset:offset+count]) + bit_offset = address % 16 + if values: + list_bools[bit_offset:bit_offset+count] = values + registers[offset:offset+reg_count] = SimUtils.bitsToRegisters(list_bools) + return list_bools[bit_offset:bit_offset+count] + + async def get_reg_block(self, block_id: str, func_code: int, address: int, count: int, values: list[int] | None) -> list[int] | ExcCodes: + """Handle holding registers and input registers.""" start_address, register_count, registers, _ = self.block[block_id] - if use_bits: - offset = int(address / 16) - start_address - effective_count = int(count / 16) + 1 - else: - offset = address - start_address - effective_count = count - if register_count <= offset < 0 or offset + effective_count > register_count: + offset = address - start_address + if register_count <= offset < 0 or offset + count > register_count: return ExcCodes.ILLEGAL_ADDRESS if (result := await self.__check_block(func_code, block_id, address, count, offset, values)): return result - if fc_block in {"c", "d"}: - list_bools = SimUtils.registersToBits(registers[offset:offset+count]) - if not use_bits: - return list_bools - bit_offset = address % 16 - return list_bools[bit_offset:bit_offset+count] + if values: + registers[offset:offset+count] = values return registers[offset:offset+count] + async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: + """Calculate offset.""" + fc_block = self._fx_mapper.get(func_code, "x") + block_id = "x" if "x" in self.block else fc_block + if fc_block in {"c", "d"}: + return await self.get_bit_block(block_id, func_code, address, count, cast(list[bool], values)) + return await self.get_reg_block(block_id, func_code, address, count, cast(list[int], values)) + async def async_getValues(self, func_code: int, address: int, count: int) -> list[int] | list[bool] | ExcCodes: """Get `count` values from datastore.""" return await self.get_block(func_code, address, count, None) From 1e1404ed4c254570ede8439a3e4d66151f964c53 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Sat, 21 Mar 2026 22:43:26 +0100 Subject: [PATCH 3/5] Stage I. --- pymodbus/simulator/simruntime.py | 11 ++-- test/simulator/test_simruntime.py | 100 +++++++++++++++++++++--------- 2 files changed, 76 insertions(+), 35 deletions(-) diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py index 1167f83b4..4f50043d2 100644 --- a/pymodbus/simulator/simruntime.py +++ b/pymodbus/simulator/simruntime.py @@ -14,7 +14,7 @@ class SimRuntime: """Memory setup for device.""" - _fx_mapper = {2: "d", 4: "i"} # Direct input and Input registers + _fx_mapper = {2: "d", 4: "i"} # Discrete input and Input registers _fx_mapper.update([(i, "h") # Holding registers for i in (3, 6, 16, 22, 23)]) _fx_mapper.update([(i, "c") # Coils @@ -61,11 +61,11 @@ async def get_bit_block(self, block_id: str, func_code: int, address: int, count return ExcCodes.ILLEGAL_ADDRESS if (result := await self.__check_block(func_code, block_id, address, reg_count, offset, values)): return result - list_bools = SimUtils.registersToBits(registers[offset:offset+count]) + list_bools = SimUtils.registersToBits(registers[offset:offset+reg_count]) bit_offset = address % 16 if values: list_bools[bit_offset:bit_offset+count] = values - registers[offset:offset+reg_count] = SimUtils.bitsToRegisters(list_bools) + registers[offset:offset+reg_count] = SimUtils.bitsToRegisters(list_bools) return list_bools[bit_offset:bit_offset+count] async def get_reg_block(self, block_id: str, func_code: int, address: int, count: int, values: list[int] | None) -> list[int] | ExcCodes: @@ -82,7 +82,10 @@ async def get_reg_block(self, block_id: str, func_code: int, address: int, count async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: """Calculate offset.""" - fc_block = self._fx_mapper.get(func_code, "x") + if values: + values = values if isinstance(values, list) else [values] + if (fc_block := self._fx_mapper.get(func_code, "z")) == "z": + raise RuntimeError("Datastore, not supported function code") block_id = "x" if "x" in self.block else fc_block if fc_block in {"c", "d"}: return await self.get_bit_block(block_id, func_code, address, count, cast(list[bool], values)) diff --git a/test/simulator/test_simruntime.py b/test/simulator/test_simruntime.py index 43077bd00..ceb1a24a7 100644 --- a/test/simulator/test_simruntime.py +++ b/test/simulator/test_simruntime.py @@ -17,26 +17,82 @@ async def my_action( _start_address, _address, _count, - _current_registers, - _set_values + current_registers, + set_values ): """Run action.""" + if function_code in {1, 2, 3}: + current_registers[1] = 17 if function_code == 4: return ExcCodes.ILLEGAL_ADDRESS - # function_code == 5: + elif function_code == 15: + set_values[0] = False + elif function_code == 16: + set_values[0] = 17 + sd_block = ( + [SimData(0, count=2, values=15, datatype=DataType.BITS)], + [SimData(0, count=2, values=15, datatype=DataType.BITS)], + [SimData(0, count=2, values=15, datatype=DataType.REGISTERS)], + [SimData(0, count=2, values=15, datatype=DataType.REGISTERS)], + ) + sd_shared = SimData(0, count=2, datatype=DataType.REGISTERS, values=15) - @pytest.mark.parametrize("kwargs", [ - {"id": 0, "simdata": ([SimData(0, datatype=DataType.BITS, values=15)], - [SimData(0, datatype=DataType.BITS, values=15)], - [SimData(0, datatype=DataType.INT16, values=15)], - [SimData(0, datatype=DataType.INT16, values=15)])}, - {"id": 0, "simdata": SimData(0, datatype=DataType.INT16, values=15)}, - ]) - def test_simruntime_instanciate(self, kwargs): + def test_simruntime_instanciate(self): + """Test that simdata can be objects.""" + SimRuntime(SimDevice(0, self.sd_block)) + SimRuntime(SimDevice(0, self.sd_shared)) + + @pytest.mark.parametrize("block", [False, True]) + @pytest.mark.parametrize("fc", range(1, 25)) + async def test_simruntime_fc(self, fc, block): """Test that simdata can be objects.""" - sd = SimDevice(**kwargs) - SimRuntime(sd) + sd = SimDevice(1, simdata=(self.sd_block if block else self.sd_shared)) + rt = SimRuntime(sd) + if fc in (1,2,3,4,5,6,15,16,22,23): + ret = await rt.get_block(fc, 1, 1, None) + assert not isinstance(ret, ExcCodes) + else: + with pytest.raises(RuntimeError): + await rt.get_block(fc, 1, 1, None) + @pytest.mark.parametrize(("block", "fc", "values", "expect"), [ + (False, 1, None, [15, 17, 0]), + (True, 1, None, [15, 17, 0]), + (False, 2, None, [15, 17, 0]), + (True, 2, None, [15, 17, 0]), + (False, 3, None, [15, 17, 0]), + (True, 3, None, [15, 17, 0]), + (False, 4, None, -1), + (True, 4, None, -1), + (False, 15, [True, False], [15, 9, 0]), + (True, 15, [True, False], [9, 15, 0]), + (False, 16, [12], [15, 17, 0]), + (True, 16, [12], [15, 17, 0]), + ]) + async def test_simruntime_action(self, block, fc, values, expect): + """Test that simdata can be objects.""" + rt = SimRuntime(SimDevice(1, + action=self.my_action, + simdata=(self.sd_block if block else self.sd_shared)) + ) + if block: + block_id = { + 1: "c", + 2: "d", + 3: "h", + 4: "i", + 15: "c", + 16: "h" + }[fc] + else: + block_id = "x" + count = len(values) if values else 2 + ret = await rt.get_block(fc, 1, count, values) + if expect == -1: + assert ret == ExcCodes.ILLEGAL_ADDRESS + else: + assert rt.block[block_id][2] == expect + @pytest.mark.parametrize(("args", "expect"), [ ((3, 1, 1, None), 1), ((3, 200, 1, None), -1), @@ -78,24 +134,6 @@ async def test_simruntime_block_set(self, args, expect): assert ret == args[3] assert rt.block["x"][2] == expect - @pytest.mark.parametrize(("args", "expect"), [ - ((3, 15, 2, None), [0, 0]), - ((4, 15, 2, None), -1), - ((5, 15, 2, None), [False]*32), - ]) - async def test_simruntime_action(self, args, expect): - """Test that simdata can be objects.""" - sd = SimDevice(0, action=self.my_action, simdata=[ - SimData(10, count=1, values=0, datatype=DataType.REGISTERS, readonly=True), - SimData(11, count=1, values=0, datatype=DataType.INVALID), - SimData(12, count=8, values=0, datatype=DataType.REGISTERS), - ]) - rt = SimRuntime(sd) - ret = await rt.get_block(*args) - if expect == -1: - assert isinstance(ret, ExcCodes) - else: - assert ret == expect async def test_simruntime_getValues(self): """Test that simdata can be objects.""" From 11d4098ac6018949bb30480a409f572e4f6d4b27 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Sun, 22 Mar 2026 09:34:55 +0100 Subject: [PATCH 4/5] Stage II. --- pymodbus/simulator/simruntime.py | 2 +- test/simulator/test_simruntime.py | 110 +++++++++++------------------- 2 files changed, 40 insertions(+), 72 deletions(-) diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py index 4f50043d2..737db225c 100644 --- a/pymodbus/simulator/simruntime.py +++ b/pymodbus/simulator/simruntime.py @@ -83,7 +83,7 @@ async def get_reg_block(self, block_id: str, func_code: int, address: int, count async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: """Calculate offset.""" if values: - values = values if isinstance(values, list) else [values] + values = cast(list[int], values if isinstance(values, list) else [values]) if (fc_block := self._fx_mapper.get(func_code, "z")) == "z": raise RuntimeError("Datastore, not supported function code") block_id = "x" if "x" in self.block else fc_block diff --git a/test/simulator/test_simruntime.py b/test/simulator/test_simruntime.py index ceb1a24a7..fa00a1e36 100644 --- a/test/simulator/test_simruntime.py +++ b/test/simulator/test_simruntime.py @@ -1,5 +1,4 @@ """Test SimCore.""" -from typing import cast import pytest @@ -30,12 +29,20 @@ async def my_action( elif function_code == 16: set_values[0] = 17 sd_block = ( - [SimData(0, count=2, values=15, datatype=DataType.BITS)], - [SimData(0, count=2, values=15, datatype=DataType.BITS)], - [SimData(0, count=2, values=15, datatype=DataType.REGISTERS)], - [SimData(0, count=2, values=15, datatype=DataType.REGISTERS)], + [SimData(0, values=[14, 15, 16], datatype=DataType.BITS)], + [SimData(0, values=[14, 15, 16], datatype=DataType.BITS)], + [SimData(0, values=[14, 15, 16], datatype=DataType.REGISTERS)], + [SimData(0, values=[14, 15, 16], datatype=DataType.REGISTERS)], ) - sd_shared = SimData(0, count=2, datatype=DataType.REGISTERS, values=15) + sd_shared = SimData(0, datatype=DataType.REGISTERS, values=[14, 15, 16]) + block_ids = { + 1: "c", + 2: "d", + 3: "h", + 4: "i", + 15: "c", + 16: "h" + } def test_simruntime_instanciate(self): """Test that simdata can be objects.""" @@ -55,19 +62,14 @@ async def test_simruntime_fc(self, fc, block): with pytest.raises(RuntimeError): await rt.get_block(fc, 1, 1, None) - @pytest.mark.parametrize(("block", "fc", "values", "expect"), [ - (False, 1, None, [15, 17, 0]), - (True, 1, None, [15, 17, 0]), - (False, 2, None, [15, 17, 0]), - (True, 2, None, [15, 17, 0]), - (False, 3, None, [15, 17, 0]), - (True, 3, None, [15, 17, 0]), - (False, 4, None, -1), - (True, 4, None, -1), - (False, 15, [True, False], [15, 9, 0]), - (True, 15, [True, False], [9, 15, 0]), - (False, 16, [12], [15, 17, 0]), - (True, 16, [12], [15, 17, 0]), + @pytest.mark.parametrize("block", [True, False]) + @pytest.mark.parametrize(("fc", "values", "expect"), [ + (1, None, [14, 17, 16, 0]), + (2, None, [14, 17, 16, 0]), + (3, None, [14, 17, 16, 0]), + (4, None, -1), + (15, [True, False], [12, 15, 16, 0]), + (16, [12], [17, 15, 16, 0]), ]) async def test_simruntime_action(self, block, fc, values, expect): """Test that simdata can be objects.""" @@ -75,65 +77,31 @@ async def test_simruntime_action(self, block, fc, values, expect): action=self.my_action, simdata=(self.sd_block if block else self.sd_shared)) ) - if block: - block_id = { - 1: "c", - 2: "d", - 3: "h", - 4: "i", - 15: "c", - 16: "h" - }[fc] - else: - block_id = "x" + block_id = self.block_ids[fc] if block else "x" count = len(values) if values else 2 - ret = await rt.get_block(fc, 1, count, values) + ret = await rt.get_block(fc, 0, count, values) if expect == -1: assert ret == ExcCodes.ILLEGAL_ADDRESS else: assert rt.block[block_id][2] == expect - - @pytest.mark.parametrize(("args", "expect"), [ - ((3, 1, 1, None), 1), - ((3, 200, 1, None), -1), - ((3, 15, 200, None), -1), - ((3, 15, 2, None), 2), - # ((3, 19, 1, [1, 2 , 3]), -1), - ((3, 19, 1, [1]), 1), - ((3, 10, 2, None), -1), - ((3, 10, 1, [1]), -1), - ]) - async def test_simruntime_block(self, args, expect): - """Test that simdata can be objects.""" - sd = SimDevice(0, simdata=[ - SimData(10, count=1, values=0, datatype=DataType.REGISTERS, readonly=True), - SimData(11, count=1, values=0, datatype=DataType.INVALID), - SimData(12, count=8, values=0, datatype=DataType.REGISTERS), - ]) - rt = SimRuntime(sd) - ret = await rt.get_block(*args) - if expect == -1: - assert isinstance(ret, ExcCodes) - else: - assert len(cast(list[int], ret)) == expect - - @pytest.mark.parametrize(("args", "expect"), [ - ((3, 10, 1, [1]), [1, 0, 0, 0, 0, 0]), - # ((3, 11, 1, [1, 2, 3]), [0, 1, 2, 3, 0, 0]), - # ((3, 12, 1, [1, 2 , 3]), [0, 0, 1, 2, 3, 0]), - ]) - async def test_simruntime_block_set(self, args, expect): + @pytest.mark.parametrize("block", [True, False]) + @pytest.mark.parametrize(("fc", "addr", "values", "expect"), [ + (1, 0, None, [False, True]), + (2, 0, None, [False, True]), + (3, 0, None, [14, 15]), + (4, 0, None, [14, 15]), + (15, 0, [True, False], [True, False]), + (16, 0, [12], [12]), + ]) + async def test_simruntime_block(self, block, fc, addr, values, expect): """Test that simdata can be objects.""" - sd = SimDevice(0, simdata= - SimData(10, count=5, values=0, datatype=DataType.REGISTERS) - ) - rt = SimRuntime(sd) - - ret = await rt.get_block(*args) - assert ret == args[3] - assert rt.block["x"][2] == expect - + rt = SimRuntime(SimDevice(1, + simdata=(self.sd_block if block else self.sd_shared)) + ) + count = len(values) if values else 2 + ret = await rt.get_block(fc, addr, count, values) + assert ret == expect async def test_simruntime_getValues(self): """Test that simdata can be objects.""" From 3a14c83196d6b460dba40fcf733df69157754c60 Mon Sep 17 00:00:00 2001 From: jan iversen Date: Sun, 22 Mar 2026 11:34:43 +0100 Subject: [PATCH 5/5] Final. --- pymodbus/simulator/simruntime.py | 2 +- test/simulator/test_simdevice.py | 2 +- test/simulator/test_simruntime.py | 66 +++++++++++++++++++++---------- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py index 737db225c..ba12b6abf 100644 --- a/pymodbus/simulator/simruntime.py +++ b/pymodbus/simulator/simruntime.py @@ -85,7 +85,7 @@ async def get_block(self, func_code: int, address: int, count: int, values: list if values: values = cast(list[int], values if isinstance(values, list) else [values]) if (fc_block := self._fx_mapper.get(func_code, "z")) == "z": - raise RuntimeError("Datastore, not supported function code") + raise AttributeError("Datastore, not supported function code") block_id = "x" if "x" in self.block else fc_block if fc_block in {"c", "d"}: return await self.get_bit_block(block_id, func_code, address, count, cast(list[bool], values)) diff --git a/test/simulator/test_simdevice.py b/test/simulator/test_simdevice.py index a486ec581..538711bab 100644 --- a/test/simulator/test_simdevice.py +++ b/test/simulator/test_simdevice.py @@ -187,7 +187,7 @@ def test_simdevice_build_blocks(self): """Test build_device() ok.""" block = ( [SimData(0, values=123, datatype=DataType.BITS)], - [SimData(0, values=123, datatype=DataType.BITS)], + [SimData(0, values=123, readonly=True, datatype=DataType.BITS)], [SimData(0, values=123, datatype=DataType.INT16)], [SimData(0, values=123, datatype=DataType.INT16)]) result = { diff --git a/test/simulator/test_simruntime.py b/test/simulator/test_simruntime.py index fa00a1e36..29b9b905f 100644 --- a/test/simulator/test_simruntime.py +++ b/test/simulator/test_simruntime.py @@ -20,21 +20,36 @@ async def my_action( set_values ): """Run action.""" - if function_code in {1, 2, 3}: + if function_code in {1, 3}: current_registers[1] = 17 - if function_code == 4: + if function_code in {2, 4}: return ExcCodes.ILLEGAL_ADDRESS elif function_code == 15: set_values[0] = False elif function_code == 16: set_values[0] = 17 sd_block = ( - [SimData(0, values=[14, 15, 16], datatype=DataType.BITS)], - [SimData(0, values=[14, 15, 16], datatype=DataType.BITS)], - [SimData(0, values=[14, 15, 16], datatype=DataType.REGISTERS)], - [SimData(0, values=[14, 15, 16], datatype=DataType.REGISTERS)], + [SimData(0, values=[14], datatype=DataType.BITS), + SimData(16, values=[15], readonly=True, datatype=DataType.BITS), + SimData(32, values=[16], datatype=DataType.BITS), + ], + [SimData(0, values=[14], datatype=DataType.BITS), + SimData(16, values=[15], readonly=True, datatype=DataType.BITS), + SimData(32, values=[16], datatype=DataType.BITS), + ], + [SimData(0, values=[14], datatype=DataType.REGISTERS), + SimData(1, values=[15], readonly=True, datatype=DataType.REGISTERS), + SimData(2, values=[16], datatype=DataType.REGISTERS), + ], + [SimData(0, values=[14], datatype=DataType.REGISTERS), + SimData(1, values=[15], readonly=True, datatype=DataType.REGISTERS), + SimData(2, values=[16], datatype=DataType.REGISTERS), + ], ) - sd_shared = SimData(0, datatype=DataType.REGISTERS, values=[14, 15, 16]) + sd_shared = [SimData(0, datatype=DataType.REGISTERS, values=[14]), + SimData(1, readonly=True, datatype=DataType.REGISTERS, values=[15]), + SimData(2, datatype=DataType.REGISTERS, values=[16]), + ] block_ids = { 1: "c", 2: "d", @@ -59,15 +74,15 @@ async def test_simruntime_fc(self, fc, block): ret = await rt.get_block(fc, 1, 1, None) assert not isinstance(ret, ExcCodes) else: - with pytest.raises(RuntimeError): + with pytest.raises(AttributeError): await rt.get_block(fc, 1, 1, None) @pytest.mark.parametrize("block", [True, False]) @pytest.mark.parametrize(("fc", "values", "expect"), [ (1, None, [14, 17, 16, 0]), - (2, None, [14, 17, 16, 0]), (3, None, [14, 17, 16, 0]), (4, None, -1), + (2, None, -1), (15, [True, False], [12, 15, 16, 0]), (16, [12], [17, 15, 16, 0]), ]) @@ -86,22 +101,33 @@ async def test_simruntime_action(self, block, fc, values, expect): assert rt.block[block_id][2] == expect @pytest.mark.parametrize("block", [True, False]) - @pytest.mark.parametrize(("fc", "addr", "values", "expect"), [ - (1, 0, None, [False, True]), - (2, 0, None, [False, True]), - (3, 0, None, [14, 15]), - (4, 0, None, [14, 15]), - (15, 0, [True, False], [True, False]), - (16, 0, [12], [12]), + @pytest.mark.parametrize(("fc", "addr", "count", "values", "expect"), [ + (1, 0, 2, None, [False, True]), + (1, 0, 17, None, [False] + [True]*3 + [False]*12 + [True]), + (2, 0, 2, None, [False, True]), + (3, 0, 2, None, [14, 15]), + (4, 0, 2, None, [14, 15]), + (15, 0, 2, [True, False], [True, False]), + (16, 0, 1, [12], [12]), + (99, 0, 2, None, AttributeError), + (3, 15, 2, None, ExcCodes.ILLEGAL_ADDRESS), + (3, 2, 2, None, ExcCodes.ILLEGAL_ADDRESS), + (16, 2, 2, [12, 12], ExcCodes.ILLEGAL_ADDRESS), + (16, 1, 1, [12], ExcCodes.ILLEGAL_ADDRESS), + (1, 64, 2, None, ExcCodes.ILLEGAL_ADDRESS), + (15, 48, 17, [True]*17, ExcCodes.ILLEGAL_ADDRESS), ]) - async def test_simruntime_block(self, block, fc, addr, values, expect): + async def test_simruntime_block(self, block, fc, addr, count, values, expect): """Test that simdata can be objects.""" rt = SimRuntime(SimDevice(1, simdata=(self.sd_block if block else self.sd_shared)) ) - count = len(values) if values else 2 - ret = await rt.get_block(fc, addr, count, values) - assert ret == expect + if not isinstance(expect, list) and expect in {AttributeError}: + with pytest.raises(expect): + await rt.get_block(fc, addr, count, values) + else: + ret = await rt.get_block(fc, addr, count, values) + assert ret == expect async def test_simruntime_getValues(self): """Test that simdata can be objects."""