diff --git a/API_changes.rst b/API_changes.rst index 97208c721..a518ccf6e 100644 --- a/API_changes.rst +++ b/API_changes.rst @@ -1,6 +1,6 @@ API changes =========== -Versions (X.Y.Z) where Z > 0 e.g. 3.0.1 do NOT have API changes! +Versions (X.Y.Z) where Z == 0 e.g. 3.0.1 do NOT have API changes! API changes 3.13.0 ------------------ @@ -8,6 +8,8 @@ API changes 3.13.0 a proper forwarder should be made at frame level. - datastore get/setValues is removed, please use server.async_get/setValues instead. +- datastore show a deprecation warning +- SimData/SimDevice have been updated API changes 3.12.0 ------------------ diff --git a/examples/contrib/test_datastores.py b/examples/contrib/test_datastores.py new file mode 100755 index 000000000..7c16c54ed --- /dev/null +++ b/examples/contrib/test_datastores.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +"""Test datastores compatibility. + +Control that: + +- ModbusSequentialDataBlock +- ModbusSparseDataBlock + +Works as the used to work. + +Control that SimData/SimDevice: + +- works as intended in both shared and non shared mode. + +Focussing on coils and discrete inputs. +""" +import asyncio + +import pymodbus.client as modbusClient +from pymodbus import FramerType +from pymodbus.datastore import ( + ModbusDeviceContext, + ModbusSequentialDataBlock, + ModbusServerContext, +) +from pymodbus.logging import Log, pymodbus_apply_logging_config +from pymodbus.server import ServerAsyncStop, StartAsyncTcpServer +from pymodbus.transport import NULLMODEM_HOST + + +async def run_async(port, context, run_test): + """Run server setup.""" + Log.info("### start ASYNC server") + task = asyncio.create_task(StartAsyncTcpServer( + context=context, + address=(NULLMODEM_HOST, port), + framer=FramerType.SOCKET + )) + await asyncio.sleep(1) + + Log.info("### Create client object") + client = modbusClient.AsyncModbusTcpClient( + NULLMODEM_HOST, + port=5020, + framer=FramerType.SOCKET, + ) + await client.connect() + await run_test(client) + client.close() + await ServerAsyncStop() + task.cancel() + await task + + +async def run_sequential(port) -> None: + """Combine setup and run.""" + async def run_old_test(client): + """Run test.""" + Log.info("### read_coils") + single = [True] + [False] * 7 + register = [True] * 7 + [False] * 8 + [True] + rr = await client.read_coils(0, count=1, device_id=1) + assert rr.bits == single + rr = await client.read_coils(0, count=16, device_id=1) + assert rr.bits == register + rr = await client.read_discrete_inputs(0, count=1, device_id=1) + assert rr.bits == single + rr = await client.read_discrete_inputs(0, count=16, device_id=1) + assert rr.bits == register + + co_data = [17] * 7 + [0] * 8 + [1] + di_data = [True] * 7 + [False] * 8 + [True] + context = ModbusServerContext(devices=ModbusDeviceContext( + co=ModbusSequentialDataBlock(1, co_data), + di=ModbusSequentialDataBlock(1, di_data), + ), + single=True + ) + Log.info("Run sequential test.") + await run_async(port, context, run_old_test) + + +async def run_all(port): + """Run all tests.""" + pymodbus_apply_logging_config("info") + await run_sequential(port) + +if __name__ == "__main__": + asyncio.run(run_all(5020), debug=True) diff --git a/pymodbus/simulator/simdata.py b/pymodbus/simulator/simdata.py index cf196a01a..3a3d46c63 100644 --- a/pymodbus/simulator/simdata.py +++ b/pymodbus/simulator/simdata.py @@ -5,7 +5,6 @@ from dataclasses import dataclass from typing import TypeAlias, cast -from ..pdu.utils import pack_bitstring, unpack_bitstring from .simutils import DataType, SimUtils @@ -73,18 +72,25 @@ class SimData: SimData( address=100, values=0xffff, - datatype=DataType.BITS + datatype=DataType.REGISTERS ) SimData( address=100, values=[0xffff], - datatype=DataType.BITS + datatype=DataType.REGISTERS ) Each SimData defines 16 BITS (coils), with value True. - Value are stored in registers (16bit is 1 register), the address refers to the register, unless - in non-shared mode where the address refers to the coil. + Value are stored in registers (16bit is 1 register). + + In shared mode (coil and discrete inputs requests): + - address refers to the register, containing individual bits, + Individual bits within the register cannot be addressed, + unless "use_bit_as_address" is set on the device. + + In non-shared mode (coil and discrete inputs requests) + - address refers to the bit. """ #: Address of first register, starting with 0 (identical to the requests) @@ -111,6 +117,11 @@ class SimData: #: Used to check access and convert value to/from registers or mark as invalid. datatype: DataType = DataType.INVALID + #: String encoding + #: + #: Used to convert a SimData(DataType.STRING) to registers. + string_encoding: str = "utf-8" + #: Mark register(s) as readonly. readonly: bool = False @@ -129,6 +140,10 @@ def __check_simple(self): raise TypeError("values= cannot be used with invalid=True") if isinstance(self.values, list) and not self.values: raise TypeError("values= list cannot be empty") + try: + "test string".encode(self.string_encoding) + except (UnicodeEncodeError, LookupError) as exc: + raise TypeError("string_encoding= not valid") from exc def __check_parameters(self): """Check all parameters.""" @@ -136,8 +151,10 @@ def __check_parameters(self): x_values = self.values if isinstance(self.values, list) else [self.values] x_datatype, _x_struct, _x_len = SimUtils.DATATYPE_STRUCT[self.datatype] if self.datatype == DataType.BITS: - x_datatype = int if isinstance(x_values[0], int) else bool + x_datatype = bool if isinstance(x_values[0], bool) else int for x_value in x_values: + if self.datatype == DataType.BITS and x_datatype is int and isinstance(x_value, bool): + raise TypeError(f"values= {x_value} int and bool cannot be mixed") if not isinstance(x_value, x_datatype): raise TypeError(f"values= {x_value} is not {x_datatype!s}") if x_datatype is str and not x_value: @@ -147,60 +164,47 @@ def __post_init__(self): """Define a group of registers.""" self.__check_parameters() - def build_registers_bits_block(self) -> list[int]: + def build_registers_bits_block(self) -> list[bool]: """Convert values= to registers from bits (1 bit in each register).""" x_values = self.values if isinstance(self.values, list) else [self.values] - regs: list[int] = [] if isinstance(x_values[0], bool): - for v in x_values: - regs.append(1 if v else 0) - else: - for v in cast(list[int], x_values): - bool_list = unpack_bitstring(v.to_bytes(2, byteorder="big")) - for i in bool_list: - regs.append(1 if i else 0) - return regs - - def build_registers_bits_shared(self, endian: tuple[bool, bool]) -> list[int]: + return cast(list[bool], x_values) + return SimUtils.registersToBits(cast(list[int], x_values)) + + def build_registers_bits_shared(self) -> list[int]: """Convert values= to registers from bits (16 bits in each register).""" x_values = self.values if isinstance(self.values, list) else [self.values] - if isinstance(x_values[0], bool): - if len(x_values) % 16: - raise TypeError(f"SimData address={self.address} values= must be a multiple of 16") - bytes_bits = bytearray(pack_bitstring(cast(list[bool], x_values))) - else: - bytes_bits = bytearray() - for v in x_values: - bytes_bits.extend(struct.pack(">H", v)) - return SimUtils.convert_bytes_registers(bytes_bits, endian[0], endian[1], 1) - - def build_registers_string(self, endian: tuple[bool, bool], string_encoding: str) -> list[int]: + if not isinstance(x_values[0], bool): + return cast(list[int], x_values) + if len(x_values) % 16: + raise TypeError(f"SimData address={self.address} values= must be a multiple of 16") + return SimUtils.bitsToRegisters(cast(list[bool], x_values)) + + def build_registers_string(self) -> list[int]: """Convert values= to registers from string(s).""" x_values = self.values if isinstance(self.values, list) else [self.values] blocks_regs: list[int] = [] for value in x_values: - bytes_string = cast(str, value).encode(string_encoding) - if len(bytes_string) % 2: - bytes_string += b"\x00" - blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(bytes_string), endian[0], endian[1], 1)) + bytes_string = cast(str, value).encode(self.string_encoding) + regs = SimUtils.bytesToRegisters(bytes_string) + blocks_regs.extend(regs) return blocks_regs - def build_registers(self, endian: tuple[bool, bool], string_encoding: str, block_bits: bool) -> list[int]: + def build_registers(self, block_bits: bool) -> list[int] | list[bool]: """Convert values= to registers.""" self.__check_parameters() if self.datatype == DataType.STRING: - block_regs = self.build_registers_string(endian, string_encoding) - return block_regs * self.count + return self.build_registers_string() * self.count + if block_bits: + return self.build_registers_bits_block() * self.count if self.datatype == DataType.BITS: - if block_bits: - return self.build_registers_bits_block() - return self.build_registers_bits_shared(endian) + return self.build_registers_bits_shared() * self.count x_values = self.values if isinstance(self.values, list) else [self.values] - _x_datatype, x_struct, x_len = SimUtils.DATATYPE_STRUCT[self.datatype] + _x_datatype, x_struct, _x_len = SimUtils.DATATYPE_STRUCT[self.datatype] blocks_regs: list[int] = [] for v in x_values: byte_list = struct.pack(f">{x_struct}", v) - blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(byte_list), endian[0], endian[1], x_len)) + blocks_regs.extend(SimUtils.bytesToRegisters(byte_list)) return blocks_regs diff --git a/pymodbus/simulator/simdevice.py b/pymodbus/simulator/simdevice.py index acec682d5..8d836b86b 100644 --- a/pymodbus/simulator/simdevice.py +++ b/pymodbus/simulator/simdevice.py @@ -12,7 +12,7 @@ from .simutils import DataType, SimUtils -SimAction: TypeAlias = Callable[[int, int, list[int], list[int] | None], Awaitable[list[int] | None | ExcCodes]] +SimAction: TypeAlias = Callable[[int, int, int, int, list[int], list[int] | list[bool] | None], Awaitable[None | ExcCodes]] SimRegs: TypeAlias = tuple[int, list[int], list[int]] TUPLE_NAMES = ( "coils", @@ -63,28 +63,24 @@ class SimDevice: #: The tuple is defined as: #: (, , , ) #: - #: / have addressing calculated differently: - #: address register = address / 16 - #: to find the coil at address - #: count is number of coils, so registers returned are count +15 / 16. - #: #: ..tip:: addresses not defined are invalid and will produce an ExceptionResponse simdata: SimData | list[SimData] | tuple[list[SimData], list[SimData], list[SimData], list[SimData]] - #: Change endianness. - #: - #: Word order is not defined in the modbus standard and thus a device that - #: uses little-endian is still within the modbus standard. + #: Define coil/discrete input addressing in shared mode. #: - #: Byte order is defined in the modbus standard to be big-endian, - #: however it is definable to test non-standard modbus devices - #: - #: ..tip:: Content (word_order, byte_order), True means big-endian. - endian: tuple[bool, bool] = (True, True) - - #: String encoding + #: False, means the register is addressed, and single bits cannot be addressed. + #: True, means single bit is being addressed. + #: effictive address is register_address * 16 + bit_offset. #: - string_encoding: str = "utf-8" + #: Example: + #: SimData(200, value=True, datatype=DataType.BITS) + #: with use_bit_addressing=False: + #: read_coils(200) returns [True] + [False] * 7 + #: read_coils(200, count=16) returns [True] + [False] * 15 + #: with use_bit_addressing=True: + #: read_coils(200*16+15) returns [True] + [False] * 7 + #: read_coils(200*16, count=16) returns [False] * 15 + [True] + use_bit_addressing: bool | None = None #: Set device identity identity: ModbusDeviceIdentification | None = None @@ -96,21 +92,18 @@ class SimDevice: #: .. code-block:: python #: #: async def my_action( - #: function_code: int, - #: start_address: int, - #: current_registers: list[int], - #: new_registers: list[int] | None) -> list[int] | ExceptionResponse: - #: - #: return registers - #: or - #: return None - #: - #: action, is called with current registers and if write request also the new registers. - #: result updates registers and if read request returned to the client. - #: - #: new_registers is None for read requests. + #: function_code: int, # request function code + #: start_address: int, # address of current_registers[0] + #: address: int, # request address + #: count: int, # request count + #: current_registers: list[int], # current registers (modify inline) + #: set_values: list[int] | list[bool] | None # request values to be written (None for read requests) + # ) -> None | ExceptionResponse: #: - #: if return is None it indicates no change. + #: action can: + #: - update registers (affect the current and future responses) + #: - update set_values (affect the register update) + #: - return an ExceptionResponse. #: #: .. tip:: use functools.partial to add extra parameters if needed. action: SimAction | None = None @@ -123,25 +116,18 @@ def __check_simple(self): raise TypeError("identity= must be a ModbusDeviceIdentification") if self.action and not (callable(self.action) and inspect.iscoroutinefunction(self.action)): raise TypeError("action= must be a async function") - if not (isinstance(self.endian, tuple) - and len(self.endian) == 2 - and isinstance(self.endian[0], bool) - and isinstance(self.endian[1], bool) - ): - raise TypeError("endian= must be a tuple with 2 bool") - test_str = "test string" - try: - test_str.encode(self.string_encoding) - except (UnicodeEncodeError, LookupError) as exc: - raise TypeError("string_encoding= not valid") from exc def __check_simple2(self): """Check simple parameters.""" if isinstance(self.simdata, tuple): + if self.use_bit_addressing is None: + self.use_bit_addressing = True self.__check_simple_blocks() - if self.action: - raise TypeError("action= id only supported with shared blocks") + if not self.use_bit_addressing: + raise TypeError("use_bit_addressing=False is only supported with shared blocks") else: + if self.use_bit_addressing is None: + self.use_bit_addressing = False x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata] for inx, entry in enumerate(x_simdata): if not isinstance(entry, SimData): @@ -173,7 +159,7 @@ def __check_block_entries(self, last_address: int, entry: SimData, use_bits: boo """Check block entries.""" if entry.address <= last_address: raise TypeError(f"SimData address {entry.address} is overlapping!") - blocks_regs = entry.build_registers(self.endian, self.string_encoding, use_bits) * entry.count + blocks_regs = entry.build_registers(use_bits) * entry.count return last_address + len(blocks_regs) def __check_parameters(self): @@ -201,7 +187,7 @@ def __build_flags(self, simdata: SimData) -> int: def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int], use_bits: bool): """Build registers for single SimData.""" flag_normal = self.__build_flags(simdata) - blocks_regs = simdata.build_registers(self.endian, self.string_encoding, use_bits) + blocks_regs = simdata.build_registers(use_bits) for _ in range(simdata.count): first = True for register in blocks_regs: @@ -212,7 +198,7 @@ def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: li flag_list.append(flag_normal & ~SimUtils.RunTimeFlag_TYPE) reg_list.append(register) - def __create_block(self, simdata: list[SimData], use_bits: bool) -> SimRegs: + def __create_block(self, simdata: list[SimData]) -> SimRegs: """Create registers for device.""" flag_list: list[int] = [] reg_list: list[int] = [] @@ -223,23 +209,48 @@ def __create_block(self, simdata: list[SimData], use_bits: bool) -> SimRegs: flag_list.append(DataType.INVALID) reg_list.append(0) next_address += 1 - self.__create_simdata(entry, flag_list, reg_list, use_bits) + self.__create_simdata(entry, flag_list, reg_list, False) flag_list.append(DataType.INVALID) reg_list.append(0) return (start_address, reg_list, flag_list) + def __create_block_bits(self, simdata: list[SimData]) -> SimRegs: + """Create registers for device.""" + bit_list: list[bool] = [] + start_address = simdata[0].address + if (offset := start_address % 16): + bit_list.extend([False] * offset) + start_address -= offset + for entry in simdata: + if (next_address := start_address + len(bit_list)) < entry.address: + bit_list.extend([False] * (entry.address - next_address)) + next_address = start_address + len(bit_list) + entry_bits = entry.build_registers(True) + bit_list.extend(cast(list[bool], entry_bits)) + if (remains := len(bit_list) % 16): + bit_list.extend([False] * (16 - remains)) + + flag_list: list[int] = [DataType.BITS] + [0] * (int(len(bit_list) / 16) -1) + [DataType.INVALID] + bit_list.extend([False] * (len(bit_list) % 16)) + reg_list = SimUtils.bitsToRegisters(bit_list) + reg_list.append(0) + return (int(start_address / 16), reg_list, flag_list) + def build_device(self) -> SimRegs | dict[str, SimRegs]: """Check simdata and built runtime structure.""" self.__check_parameters() if not isinstance(self.simdata, tuple): x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata] x_simdata.sort(key=lambda x: x.address) - return self.__create_block(x_simdata, False) + return self.__create_block(x_simdata) b: dict[str, SimRegs] = {} # (, , , ) convert = {0: "c", 1: "d", 2: "h", 3: "i"} for i in range(4): x_simdata = cast(tuple, self.simdata)[i] x_simdata.sort(key=lambda x: x.address) - b[convert[i]] = self.__create_block(x_simdata, (i in {0,1})) + if i in {0,1}: + b[convert[i]] = self.__create_block_bits(x_simdata) + else: + b[convert[i]] = self.__create_block(x_simdata) return b diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py index fe82f3fcd..ba12b6abf 100644 --- a/pymodbus/simulator/simruntime.py +++ b/pymodbus/simulator/simruntime.py @@ -4,6 +4,8 @@ """ from __future__ import annotations +from typing import cast + from ..constants import ExcCodes from .simdevice import SimDevice from .simutils import DataType, SimUtils @@ -12,21 +14,20 @@ class SimRuntime: """Memory setup for device.""" - _fx_mapper = {2: "d", # Direct input - 4: "i"} # Input registers - _fx_mapper.update([(i, "h") + _fx_mapper = {2: "d", 4: "i"} # Discrete input and Input registers + _fx_mapper.update([(i, "h") # Holding registers for i in (3, 6, 16, 22, 23)]) - _fx_mapper.update([(i, "c") + _fx_mapper.update([(i, "c") # Coils for i in (1, 5, 15)]) def __init__(self, device: SimDevice): """Build device memory.""" self.action = device.action + self.use_bit_addressing = device.use_bit_addressing build = device.build_device() self.block: dict[str, tuple[int, int, list[int], list[int]]] = {} if not isinstance(build, dict): - self.shared = True self.block["x"] = ( build[0], len(build[2]), @@ -34,27 +35,14 @@ def __init__(self, device: SimDevice): build[2], ) return - self.shared = False - self.block = {} for i in ("c", "d", "h", "i"): - x = build[i] - self.block[i] = (x[0], len(x[1]), x[1], x[2]) + self.block[i] = (build[i][0], len(build[i][1]), build[i][1], build[i][2]) - async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: - """Calculate offset.""" - block_id = "x" if self.shared else self._fx_mapper[func_code] - start_address, register_count, registers, flags = self.block[block_id] - offset = address - start_address - if values: - count = len(values) - if address > start_address + register_count or address < start_address or offset + count > register_count: - return ExcCodes.ILLEGAL_ADDRESS - if self.action: - result = await self.action(func_code, address, registers, None) - if isinstance(result, ExcCodes): - return result - if result: - values = result + async def __check_block(self, func_code: int, block_id: str, address: int, count: int, offset: int, values: list[int] | list[bool] | None) -> ExcCodes | None: + """Check block request.""" + start_address, _, registers, flags = self.block[block_id] + if self.action and (result := await self.action(func_code, start_address, address, count, registers, values)): + return result for i in range(count): addr = offset + i if flags[addr] & SimUtils.RunTimeFlag_TYPE == DataType.INVALID: @@ -62,16 +50,53 @@ async def get_block(self, func_code: int, address: int, count: int, values: list if values: if flags[addr] & SimUtils.RunTimeFlag_READONLY: return ExcCodes.ILLEGAL_ADDRESS - registers[addr] = values[i] + return None + + async def get_bit_block(self, block_id: str, func_code: int, address: int, count: int, values: list[bool] | None) -> list[bool] | ExcCodes: + """Handle coils and discrete input.""" + start_address, register_count, registers, _ = self.block[block_id] + offset = (int(address / 16) if self.use_bit_addressing else address) - start_address + reg_count = int(count / 16) + 1 + if register_count <= offset < 0 or offset + reg_count > register_count: + return ExcCodes.ILLEGAL_ADDRESS + if (result := await self.__check_block(func_code, block_id, address, reg_count, offset, values)): + return result + list_bools = SimUtils.registersToBits(registers[offset:offset+reg_count]) + bit_offset = address % 16 + if values: + list_bools[bit_offset:bit_offset+count] = values + registers[offset:offset+reg_count] = SimUtils.bitsToRegisters(list_bools) + return list_bools[bit_offset:bit_offset+count] + + async def get_reg_block(self, block_id: str, func_code: int, address: int, count: int, values: list[int] | None) -> list[int] | ExcCodes: + """Handle holding registers and input registers.""" + start_address, register_count, registers, _ = self.block[block_id] + offset = address - start_address + if register_count <= offset < 0 or offset + count > register_count: + return ExcCodes.ILLEGAL_ADDRESS + if (result := await self.__check_block(func_code, block_id, address, count, offset, values)): + return result + if values: + registers[offset:offset+count] = values return registers[offset:offset+count] + async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: + """Calculate offset.""" + if values: + values = cast(list[int], values if isinstance(values, list) else [values]) + if (fc_block := self._fx_mapper.get(func_code, "z")) == "z": + raise AttributeError("Datastore, not supported function code") + block_id = "x" if "x" in self.block else fc_block + if fc_block in {"c", "d"}: + return await self.get_bit_block(block_id, func_code, address, count, cast(list[bool], values)) + return await self.get_reg_block(block_id, func_code, address, count, cast(list[int], values)) + async def async_getValues(self, func_code: int, address: int, count: int) -> list[int] | list[bool] | ExcCodes: """Get `count` values from datastore.""" return await self.get_block(func_code, address, count, None) async def async_setValues(self, func_code: int, address: int, values: list[int] | list[bool] ) -> None | ExcCodes: """Set the datastore with the supplied values.""" - count = len(values) - block = await self.get_block(func_code, address, count, values) + block = await self.get_block(func_code, address, len(values), values) return block if isinstance(block, ExcCodes) else None diff --git a/pymodbus/simulator/simutils.py b/pymodbus/simulator/simutils.py index e01c82a4a..a715f5afb 100644 --- a/pymodbus/simulator/simutils.py +++ b/pymodbus/simulator/simutils.py @@ -48,7 +48,7 @@ class DataType(enum.IntEnum): #: Registers == 2 bytes (identical to UINT16) REGISTERS = enum.auto() -class SimUtils: # pylint: disable=too-few-public-methods +class SimUtils: """Define common set of utilitites.""" DATATYPE_STRUCT: dict[DataType, tuple[type, str, int]] = { @@ -69,24 +69,53 @@ class SimUtils: # pylint: disable=too-few-public-methods RunTimeFlag_TYPE = 2**4 -1 # Isolate number of registers RunTimeFlag_READONLY = 2**4 # only read is allowed + + def __init__(self): + """Ensure that class is not instantiated.""" + raise RuntimeError("SimUtils may not be instantiated.") + + @classmethod + def registersToBits(cls, registers: list[int]) -> list[bool]: + """Convert list of registers to list of bool (bit 0 first).""" + bits: list[bool] = [] + for entry in registers: + bit_str = format(entry, '016b') + new_bits = [] + for i in bit_str: + new_bits.append(i == "1") + new_bits.reverse() + bits.extend(new_bits) + return bits + + @classmethod + def bitsToRegisters(cls, bits: list[bool]) -> list[int]: + """Convert list of bits to registers (bit 0 first, divided in 16bits).""" + bit_len = len(bits) + if bit_len % 16: + raise TypeError("bits must be a multiple of 16") + registers = [] + for i in range(int(bit_len / 16)): + offset = i*16 + reg = 0 + for i, x in enumerate(bits[offset:offset+16]): + if x: + reg += 1 << i + registers.append(reg) + return registers + + @classmethod + def mergeBitsToRegisters(cls, bit_offset: int, registers: list[int], bits: list[bool]) -> None: + """Merge list of bits into registers in place.""" + new_bits = cls.registersToBits(registers) + new_bits[bit_offset:bit_offset+len(bits)] = bits + registers[0:] = cls.bitsToRegisters(new_bits) + @classmethod - def convert_bytes_registers(cls, byte_list: bytearray, word_order: bool, byte_order: bool, data_type_len: int) -> list[int]: - """Convert bytearray to registers.""" - if byte_order: - regs = [ - int.from_bytes(byte_list[x : x + 2], "big") - for x in range(0, len(byte_list), 2) - ] - else: - regs = [ - int.from_bytes([byte_list[x+1], byte_list[x]], "big") - for x in range(0, len(byte_list), 2) - ] - if word_order: - return regs - reversed_regs: list[int] = [] - for x in range(0, len(regs), data_type_len): - single_value_regs = regs[x: x + data_type_len] - single_value_regs.reverse() - reversed_regs = reversed_regs + single_value_regs - return reversed_regs + def bytesToRegisters(cls, byte_list: bytes) -> list[int]: + """Convert bytes into registers.""" + if len(byte_list) % 2: + byte_list += b"\x00" + return[ + int.from_bytes(byte_list[x : x + 2], "big") + for x in range(0, len(byte_list), 2) + ] diff --git a/test/simulator/test_simdata.py b/test/simulator/test_simdata.py index 1b2b73767..6acabae8e 100644 --- a/test/simulator/test_simdata.py +++ b/test/simulator/test_simdata.py @@ -20,6 +20,7 @@ class TestSimData: {"address": 6, "count": 10, "datatype": DataType.INT16, "values": [17, 18]}, {"address": 7, "datatype": DataType.STRING, "values": "test"}, {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test"}, + {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test", "string_encoding": "utf-8"}, {"address": 0, "datatype": DataType.REGISTERS, "values": 17, "count": 5}, {"address": 3, "datatype": DataType.INT16, "values": 17, "readonly": True}, {"address": 0, "count": 2^16 -1}, @@ -45,6 +46,7 @@ def test_simdata_instanciate(self, kwargs): {"address": 1, "datatype": "not ok"}, {"address": 1, "datatype": 11}, {"address": 2, "values": 17}, + {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test", "string_encoding": "not ok"}, ]) def test_simdata_not_ok(self, kwargs): """Test that simdata can be objects.""" @@ -80,6 +82,8 @@ def test_simdata_value_ok(self, value, value_type): ([11, 12.0], DataType.REGISTERS), (1.0, DataType.BITS), ([True, 1.0], DataType.BITS), + ([True, 1], DataType.BITS), + ([1, True], DataType.BITS), ]) def test_simdata_value_not_ok(self, value, value_type): """Test simdata value.""" @@ -92,39 +96,36 @@ def test_simdata_value_invalid(self, values): with pytest.raises(TypeError): SimData(0, values=values, datatype=DataType.INVALID) - @pytest.mark.parametrize(("value", "value_type", "order", "regs"), [ - # test word order - (-27123, DataType.INT16, (True, True), [0x960D]), - ([-27123, 27123], DataType.INT16, (True, True), [0x960D, 0x69F3]), - ([-27123, 27123], DataType.INT16, (False, True), [0x960D, 0x69F3]), - ([32145678, -32145678], DataType.INT32, (True, True), [0x01EA, 0x810E, 0xFE15, 0x7EF2]), - ([32145678, -32145678], DataType.INT32, (False, True), [0x810E, 0x01EA, 0x7EF2, 0xFE15]), - # test byte order - ([-27123, 27123], DataType.REGISTERS, (True, False), [0x0D96, 0xF369]), - ([32145678, -32145678], DataType.INT32, (True, False), [0xEA01, 0x0E81, 0x15FE, 0xF27E]), - # test data types - (27123, DataType.REGISTERS, (True, True), [0x69F3]), - (-27124, DataType.INT16, (True, True), [0x960C]), - (27123, DataType.UINT16, (True, True), [0x69F3]), - (-32145678, DataType.INT32, (True, True), [0xFE15, 0x7EF2]), - (32145678, DataType.UINT32, (True, True), [0x01EA, 0x810E]), - (-1234567890123456789, DataType.INT64, (True, True), [0xEEDD, 0xEF0B, 0x8216, 0x7EEB]), - (1234567890123456789, DataType.UINT64, (True, True), [0x1122, 0x10F4, 0x7DE9, 0x8115]), - (27123.5, DataType.FLOAT32, (True, True), [0x46D3, 0xE700]), - (3.141592, DataType.FLOAT32, (True, True), [0x4049, 0x0FD8]), - (-3.141592, DataType.FLOAT32, (True, True), [0xC049, 0x0FD8]), - (27123.5, DataType.FLOAT64, (True, True), [0x40DA, 0x7CE0, 0x0000, 0x0000]), - (3.14159265358979, DataType.FLOAT64, (True, True), [0x4009, 0x21FB, 0x5444, 0x2D11]), - (-3.14159265358979, DataType.FLOAT64, (True, True), [0xC009, 0x21FB, 0x5444, 0x2D11]), - ([True] + [False] * 15, DataType.BITS, (True, True), [256]), - (0x0100, DataType.BITS, (True, True), [256]), - ([0x0100, 0x0001], DataType.BITS, (True, True), [256, 1]), - + @pytest.mark.parametrize(("value", "value_type", "regs"), [ + (27123, DataType.INT16, [27123]), + (-27123, DataType.INT16, [0x960d]), + ([-27123, 27123], DataType.INT16, [0x960D, 0x69F3]), + ([32145678, -32145678], DataType.INT32, [0x01EA, 0x810E, 0xFE15, 0x7EF2]), + (27123, DataType.REGISTERS, [0x69F3]), + (-27124, DataType.INT16, [0x960C]), + (27123, DataType.UINT16, [0x69F3]), + (-32145678, DataType.INT32, [0xFE15, 0x7EF2]), + (32145678, DataType.UINT32, [0x01EA, 0x810E]), + (-1234567890123456789, DataType.INT64, [0xEEDD, 0xEF0B, 0x8216, 0x7EEB]), + (1234567890123456789, DataType.UINT64, [0x1122, 0x10F4, 0x7DE9, 0x8115]), + (27123.5, DataType.FLOAT32, [0x46D3, 0xE700]), + (3.141592, DataType.FLOAT32, [0x4049, 0x0FD8]), + (-3.141592, DataType.FLOAT32, [0xC049, 0x0FD8]), + (27123.5, DataType.FLOAT64, [0x40DA, 0x7CE0, 0x0000, 0x0000]), + (3.14159265358979, DataType.FLOAT64, [0x4009, 0x21FB, 0x5444, 0x2D11]), + (-3.14159265358979, DataType.FLOAT64, [0xC009, 0x21FB, 0x5444, 0x2D11]), + (0x0100, DataType.BITS, [256]), + (123, DataType.BITS, [123]), + ([123], DataType.BITS, [123]), + ([0x0100, 0x0001], DataType.BITS, [256, 1]), + ([True] + [False] * 15, DataType.BITS, [1]), + ([True] + [False] * 8 + [True] + [False] * 6, DataType.BITS, [513]), + ([True] + [False] * 15 + [False] * 8 + [True] + [False] * 7, DataType.BITS, [1, 256]), ]) - def test_simdata_build_registers(self, value, value_type, order, regs): + def test_simdata_build_registers(self, value, value_type, regs): """Test simdata value.""" sd = SimData(0, values=value, datatype=value_type) - build_regs = sd.build_registers((order[0], order[1]), "utf-8", False) + build_regs = sd.build_registers(False) assert build_regs == regs @pytest.mark.parametrize(("value", "code", "expect"), [ @@ -135,34 +136,36 @@ def test_simdata_build_registers(self, value, value_type, order, regs): ]) def test_simdata_build_string(self, value, code, expect): """Test simdata value.""" - sd = SimData(0, values=value, datatype=DataType.STRING) - build_regs = sd.build_registers((True, True), code, False) + sd = SimData(0, values=value, datatype=DataType.STRING, string_encoding=code) + build_regs = sd.build_registers(False) assert build_regs == expect @pytest.mark.parametrize(("value", "regs"), [ - ([True, False, True], [1, 0, 1]), - (0x0100, [1] + [0]*15), - ([256, 1], [1] + [0]*23 + [1] + [0]*7), + ([True, False, True], [True, False, True]), + ([True] + [False] * 14 + [True, False], [True] + [False] * 14 + [True, False]), + (0x0001, [True] + [False]*15), + (0x0100, [False]*8 + [True] + [False]*7), + ([256, 1], [False]*8 + [True] + [False]*7 + [True] + [False]*15), ]) def test_simdata_build_bit_block(self, value, regs): """Test simdata value.""" sd = SimData(0, values=value, datatype=DataType.BITS) - build_regs = sd.build_registers((True, True), "utf-8", True) + build_regs = sd.build_registers(True) assert build_regs == regs def test_simdata_build_updated_simdata(self): """Test simdata value.""" sd = SimData(0, values="ABC", datatype=DataType.STRING) - build_regs = sd.build_registers((True, True), "utf-8", False) + build_regs = sd.build_registers(False) assert build_regs == [0x4142, 0x4300] sd.values="ABCDEF" - build_regs = sd.build_registers((True, True), "utf-8", False) + build_regs = sd.build_registers(False) assert build_regs == [0x4142, 0x4344, 0x4546] sd.values=123 with pytest.raises(TypeError): - sd.build_registers((True, True), "utf-8", False) + sd.build_registers(False) sd = SimData(0, values=[True, True], datatype=DataType.BITS) with pytest.raises(TypeError): - sd.build_registers((True, True), "utf-8", False) + sd.build_registers(False) diff --git a/test/simulator/test_simdevice.py b/test/simulator/test_simdevice.py index 1eca9d042..538711bab 100644 --- a/test/simulator/test_simdevice.py +++ b/test/simulator/test_simdevice.py @@ -15,34 +15,34 @@ class TestSimDevice: async def my_action( self, _function_code, + _start_address, _address, - current_registers, - _new_registers + _count, + _current_registers, + _set_values ): """Run action.""" - return current_registers def my_sync_action( self, _function_code, + _start_address, _address, - current_registers, - _new_registers + _count, + _current_registers, + _set_values ): """Run action.""" - return current_registers simdata1 = SimData(0, datatype=DataType.INT16, values=15) simdata2 = SimData(1, datatype=DataType.INT16, values=16) - simdata3 = SimData(1, datatype=DataType.BITS, values=16) + simdata3 = SimData(10, datatype=DataType.BITS, values=16) @pytest.mark.parametrize("kwargs", [ - {"id": 0, "simdata": [SimData(2, datatype=DataType.STRING, values="test")], "string_encoding": "utf-8"}, + {"id": 0, "simdata": [SimData(2, datatype=DataType.STRING, values="test")]}, {"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], [simdata3])}, {"id": 0, "simdata": simdata2}, {"id": 0, "simdata": [simdata2, simdata1]}, - {"id": 0, "simdata": [simdata1], "endian": (False, True)}, - {"id": 0, "simdata": [simdata1], "endian": (True, False)}, {"id": 0, "simdata": [simdata1], "identity": ModbusDeviceIdentification()}, ]) def test_simdevice_instanciate(self, kwargs): @@ -59,7 +59,6 @@ def test_simdevice_instanciate(self, kwargs): {"id": 0, "simdata": ([simdata1], [simdata3], [simdata1], [simdata1])}, {"id": 0, "simdata": ([simdata3], [simdata1], [simdata1], [simdata1])}, {"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], "not ok")}, - {"id": 0, "simdata": [simdata1], "string_encoding": "not ok"}, {"id": "not ok", "simdata": [simdata1]}, {"id": 1.0, "simdata": [simdata1]}, {"id": 1, "simdata": [simdata1, simdata1]}, @@ -79,7 +78,7 @@ def test_simdevice_not_ok(self, kwargs): ([SimData(0, values=[0xffff], datatype=DataType.BITS)], 0), ([SimData(0, values=[True]*16, datatype=DataType.BITS)], 0), ([SimData(0, values="hello", datatype=DataType.STRING)], 0), - (SimData(0), 0), + # (SimData(0), 0), ("no valid", 2), (["no valid"], 2), ]) @@ -91,33 +90,25 @@ def test_simdevice_block(self, block, expect): with pytest.raises(TypeError): SimDevice(id=0, simdata=block) - @pytest.mark.parametrize(("endian", "expect"), [ - ("not ok", 1), - (None, 1), - (["not ok"], 1), - (("not ok"), 1), # noqa: PT014 - (("not ok", "not_ok"), 1), - ((True, False), 0), - ]) - def test_simdevice_endian(self, endian, expect): - """Test offset.""" - if expect: - with pytest.raises(TypeError): - SimDevice(id=0, simdata=[self.simdata1], endian=endian) - else: - SimDevice(id=0, simdata=[self.simdata1], endian=endian) - async def test_simdevice_action(self): """Test action.""" - await self.my_action(0, 0, [], None) - self.my_sync_action(0, 0, [], None) + await self.my_action(0, 0, 0, 0, [], None) + self.my_sync_action(0, 0, 0, 0, [], None) SimDevice(1, simdata=[SimData(1)], action=self.my_action) + SimDevice(0, simdata=([self.simdata3], [self.simdata3], [self.simdata1], [self.simdata3]), action=self.my_action) with pytest.raises(TypeError): SimDevice(1, simdata=[SimData(1)], action=self.my_sync_action) with pytest.raises(TypeError): SimDevice(1, simdata=[SimData(1)], action="no good") # type: ignore[arg-type] + + async def test_simdevice_bit_addressing(self): + """Test action.""" + sdblock = ([self.simdata3], [self.simdata3], [self.simdata1], [self.simdata3]) + SimDevice(1, simdata=[SimData(1)], use_bit_addressing=False) + SimDevice(1, simdata=[SimData(1)], use_bit_addressing=True) + SimDevice(1, simdata=sdblock, use_bit_addressing=True) with pytest.raises(TypeError): - SimDevice(0, simdata=([self.simdata3], [self.simdata3], [self.simdata1], [self.simdata3]), action=self.my_action), + SimDevice(1, simdata=sdblock, use_bit_addressing=False) @pytest.mark.parametrize(("block", "result"), [ ([SimData(2, values=125, datatype=DataType.REGISTERS), SimData(1, values=123, datatype=DataType.REGISTERS),], @@ -138,6 +129,12 @@ async def test_simdevice_action(self): ([SimData(0, values=[0xffff, 0xffff], datatype=DataType.BITS)], (0, [65535, 65535, 0], [DataType.BITS, 0, DataType.INVALID])), + ([SimData(0, values=[True] * 2 + [False]*6 + [False]*8, datatype=DataType.BITS)], + (0, [3, 0], + [DataType.BITS, DataType.INVALID])), + ([SimData(0, values=[True] * 2 + [False]*6 + [True] + [False]*6 + [True], datatype=DataType.BITS)], + (0, [33027, 0], + [DataType.BITS, DataType.INVALID])), ([SimData(1, values=123, datatype=DataType.INT16), SimData(3, values=456, datatype=DataType.INT16)], (1, [123, 0, 456, 0], @@ -189,37 +186,44 @@ def test_simdevice_build(self, block, result): def test_simdevice_build_blocks(self): """Test build_device() ok.""" block = ( - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.INT16)], - [SimData(1, values=123, datatype=DataType.INT16)]) + [SimData(0, values=123, datatype=DataType.BITS)], + [SimData(0, values=123, readonly=True, datatype=DataType.BITS)], + [SimData(0, values=123, datatype=DataType.INT16)], + [SimData(0, values=123, datatype=DataType.INT16)]) result = { - "c": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]), - "d": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]), - "h": (1, [123, 0], [DataType.INT16, DataType.INVALID]), - "i": (1, [123,0], [DataType.INT16, DataType.INVALID]) + "c": (0, [123, 0], [DataType.BITS, DataType.INVALID]), + "d": (0, [123, 0], [DataType.BITS, DataType.INVALID]), + "h": (0, [123, 0], [DataType.INT16, DataType.INVALID]), + "i": (0, [123,0], [DataType.INT16, DataType.INVALID]) } sd = SimDevice(id=1, simdata=block) lists = sd.build_device() assert lists == result - def test_simdevice_build_bits(self): + @pytest.mark.parametrize(("block", "registers", "addr"), [ + ([SimData(0, values=123, datatype=DataType.BITS)], [0x007b, 0x0000], 0), + ([SimData(0, values=True, datatype=DataType.BITS)], [0x0001, 0x0000], 0), + ([SimData(1, values=123, datatype=DataType.BITS)], [0x00F6, 0x0000, 0x0000], 0), + ([SimData(0, values=[True] + [False]*7, datatype=DataType.BITS), + SimData(9, values=[True], datatype=DataType.BITS)], [0x0201, 0x0000], 0), + ([SimData(0, values=[True] + [False]*7, datatype=DataType.BITS), + SimData(10, values=[True], datatype=DataType.BITS)], [0x0401, 0x0000], 0), + ([SimData(16, values=123, datatype=DataType.BITS)], [0x007b, 0x0000], 1), + ([SimData(15, values=123, datatype=DataType.BITS)], [0x8000, 0x003d, 0x0000], 0), + ]) + def test_simdevice_build_bits(self, block, registers, addr): """Test build_device() ok.""" - sd = SimDevice(id=1, simdata=SimData(1, values=123, datatype=DataType.BITS)) - result_shared = cast(SimRegs, sd.build_device()) - assert len(result_shared[1]) == 2 - - sd = SimDevice(id=1, simdata= ( - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.BITS)], - [SimData(1, values=123, datatype=DataType.INT16)], - [SimData(1, values=123, datatype=DataType.INT16)] - )) - result_block = cast(dict[str, SimRegs], sd.build_device()) - assert len(result_block["c"][1]) == 17 - assert len(result_block["d"][1]) == 17 - assert len(result_block["h"][1]) == 2 - assert len(result_block["i"][1]) == 2 + sim123 = [SimData(addr, values=123, datatype=DataType.INT16)] + sd = SimDevice(id=1, simdata=(block, block, sim123, sim123)) + reg_len = len(registers) - 2 + result = { + "c": (addr, registers, [DataType.BITS] + [0] * reg_len + [DataType.INVALID]), + "d": (addr, registers, [DataType.BITS] + [0] * reg_len + [DataType.INVALID]), + "h": (addr, [123, 0], [DataType.INT16, DataType.INVALID]), + "i": (addr, [123, 0], [DataType.INT16, DataType.INVALID]) + } + lists = cast(dict[str, SimRegs], sd.build_device()) + assert lists == result @pytest.mark.parametrize("count", range(1,4)) @pytest.mark.parametrize("data_count", range(1,4)) diff --git a/test/simulator/test_simruntime.py b/test/simulator/test_simruntime.py index 474d7ba2b..29b9b905f 100644 --- a/test/simulator/test_simruntime.py +++ b/test/simulator/test_simruntime.py @@ -1,5 +1,4 @@ """Test SimCore.""" -from typing import cast import pytest @@ -14,89 +13,121 @@ class TestSimRuntime: async def my_action( self, function_code, + _start_address, _address, + _count, current_registers, - _new_registers - ): + set_values + ): """Run action.""" - if function_code == 3: - return current_registers - if function_code == 4: + if function_code in {1, 3}: + current_registers[1] = 17 + if function_code in {2, 4}: return ExcCodes.ILLEGAL_ADDRESS - # function_code == 5: - return None + elif function_code == 15: + set_values[0] = False + elif function_code == 16: + set_values[0] = 17 + sd_block = ( + [SimData(0, values=[14], datatype=DataType.BITS), + SimData(16, values=[15], readonly=True, datatype=DataType.BITS), + SimData(32, values=[16], datatype=DataType.BITS), + ], + [SimData(0, values=[14], datatype=DataType.BITS), + SimData(16, values=[15], readonly=True, datatype=DataType.BITS), + SimData(32, values=[16], datatype=DataType.BITS), + ], + [SimData(0, values=[14], datatype=DataType.REGISTERS), + SimData(1, values=[15], readonly=True, datatype=DataType.REGISTERS), + SimData(2, values=[16], datatype=DataType.REGISTERS), + ], + [SimData(0, values=[14], datatype=DataType.REGISTERS), + SimData(1, values=[15], readonly=True, datatype=DataType.REGISTERS), + SimData(2, values=[16], datatype=DataType.REGISTERS), + ], + ) + sd_shared = [SimData(0, datatype=DataType.REGISTERS, values=[14]), + SimData(1, readonly=True, datatype=DataType.REGISTERS, values=[15]), + SimData(2, datatype=DataType.REGISTERS, values=[16]), + ] + block_ids = { + 1: "c", + 2: "d", + 3: "h", + 4: "i", + 15: "c", + 16: "h" + } - @pytest.mark.parametrize("kwargs", [ - {"id": 0, "simdata": ([SimData(0, datatype=DataType.BITS, values=15)], - [SimData(0, datatype=DataType.BITS, values=15)], - [SimData(0, datatype=DataType.INT16, values=15)], - [SimData(0, datatype=DataType.INT16, values=15)])}, - {"id": 0, "simdata": SimData(0, datatype=DataType.INT16, values=15)}, - ]) - def test_simruntime_instanciate(self, kwargs): + def test_simruntime_instanciate(self): """Test that simdata can be objects.""" - sd = SimDevice(**kwargs) - SimRuntime(sd) + SimRuntime(SimDevice(0, self.sd_block)) + SimRuntime(SimDevice(0, self.sd_shared)) - @pytest.mark.parametrize(("args", "expect"), [ - ((3, 1, 1, None), -1), - ((3, 200, 1, None), -1), - ((3, 15, 200, None), -1), - ((3, 15, 2, None), 2), - ((3, 19, 1, [1, 2 , 3]), -1), - ((3, 19, 1, [1]), 1), - ((3, 10, 2, None), -1), - ((3, 10, 1, [1]), -1), - ]) - async def test_simruntime_block(self, args, expect): + @pytest.mark.parametrize("block", [False, True]) + @pytest.mark.parametrize("fc", range(1, 25)) + async def test_simruntime_fc(self, fc, block): """Test that simdata can be objects.""" - sd = SimDevice(0, simdata=[ - SimData(10, count=1, values=0, datatype=DataType.REGISTERS, readonly=True), - SimData(11, count=1, values=0, datatype=DataType.INVALID), - SimData(12, count=8, values=0, datatype=DataType.REGISTERS), - ]) + sd = SimDevice(1, simdata=(self.sd_block if block else self.sd_shared)) rt = SimRuntime(sd) - ret = await rt.get_block(*args) - if expect == -1: - assert isinstance(ret, ExcCodes) + if fc in (1,2,3,4,5,6,15,16,22,23): + ret = await rt.get_block(fc, 1, 1, None) + assert not isinstance(ret, ExcCodes) else: - assert len(cast(list[int], ret)) == expect - + with pytest.raises(AttributeError): + await rt.get_block(fc, 1, 1, None) - @pytest.mark.parametrize(("args", "expect"), [ - ((3, 10, 1, [1]), [1, 0, 0, 0, 0, 0]), - ((3, 11, 1, [1, 2, 3]), [0, 1, 2, 3, 0, 0]), - ((3, 12, 1, [1, 2 , 3]), [0, 0, 1, 2, 3, 0]), - ]) - async def test_simruntime_block_set(self, args, expect): + @pytest.mark.parametrize("block", [True, False]) + @pytest.mark.parametrize(("fc", "values", "expect"), [ + (1, None, [14, 17, 16, 0]), + (3, None, [14, 17, 16, 0]), + (4, None, -1), + (2, None, -1), + (15, [True, False], [12, 15, 16, 0]), + (16, [12], [17, 15, 16, 0]), + ]) + async def test_simruntime_action(self, block, fc, values, expect): """Test that simdata can be objects.""" - sd = SimDevice(0, simdata= - SimData(10, count=5, values=0, datatype=DataType.REGISTERS) - ) - rt = SimRuntime(sd) - - ret = await rt.get_block(*args) - assert ret == args[3] - assert rt.block["x"][2] == expect + rt = SimRuntime(SimDevice(1, + action=self.my_action, + simdata=(self.sd_block if block else self.sd_shared)) + ) + block_id = self.block_ids[fc] if block else "x" + count = len(values) if values else 2 + ret = await rt.get_block(fc, 0, count, values) + if expect == -1: + assert ret == ExcCodes.ILLEGAL_ADDRESS + else: + assert rt.block[block_id][2] == expect - @pytest.mark.parametrize(("args", "expect"), [ - ((3, 15, 2, None), 2), - ((4, 15, 2, None), -1), - ((5, 15, 2, None), 2), - ]) - async def test_simruntime_action(self, args, expect): - """Test that simdata can be objects.""" - sd = SimDevice(0, action=self.my_action, simdata=[ - SimData(10, count=1, values=0, datatype=DataType.REGISTERS, readonly=True), - SimData(11, count=1, values=0, datatype=DataType.INVALID), - SimData(12, count=8, values=0, datatype=DataType.REGISTERS), + @pytest.mark.parametrize("block", [True, False]) + @pytest.mark.parametrize(("fc", "addr", "count", "values", "expect"), [ + (1, 0, 2, None, [False, True]), + (1, 0, 17, None, [False] + [True]*3 + [False]*12 + [True]), + (2, 0, 2, None, [False, True]), + (3, 0, 2, None, [14, 15]), + (4, 0, 2, None, [14, 15]), + (15, 0, 2, [True, False], [True, False]), + (16, 0, 1, [12], [12]), + (99, 0, 2, None, AttributeError), + (3, 15, 2, None, ExcCodes.ILLEGAL_ADDRESS), + (3, 2, 2, None, ExcCodes.ILLEGAL_ADDRESS), + (16, 2, 2, [12, 12], ExcCodes.ILLEGAL_ADDRESS), + (16, 1, 1, [12], ExcCodes.ILLEGAL_ADDRESS), + (1, 64, 2, None, ExcCodes.ILLEGAL_ADDRESS), + (15, 48, 17, [True]*17, ExcCodes.ILLEGAL_ADDRESS), ]) - rt = SimRuntime(sd) - ret = await rt.get_block(*args) - if expect == -1: - assert isinstance(ret, ExcCodes) + async def test_simruntime_block(self, block, fc, addr, count, values, expect): + """Test that simdata can be objects.""" + rt = SimRuntime(SimDevice(1, + simdata=(self.sd_block if block else self.sd_shared)) + ) + if not isinstance(expect, list) and expect in {AttributeError}: + with pytest.raises(expect): + await rt.get_block(fc, addr, count, values) else: - assert len(cast(list[int], ret)) == expect + ret = await rt.get_block(fc, addr, count, values) + assert ret == expect async def test_simruntime_getValues(self): """Test that simdata can be objects.""" diff --git a/test/simulator/test_simutils.py b/test/simulator/test_simutils.py new file mode 100644 index 000000000..a8f79910a --- /dev/null +++ b/test/simulator/test_simutils.py @@ -0,0 +1,67 @@ +"""Test SimData.""" +import pytest + +from pymodbus.simulator.simutils import SimUtils + + +class TestSimUtils: + """Test simulator utilities.""" + + def test_simutils_instanciate(self): + """Test that simdata cannot be a objects.""" + with pytest.raises(RuntimeError): + SimUtils() + + @pytest.mark.parametrize(("bits", "registers"), [ + ([True] + [False] * 8 + [True] + [False]*6, [513]), + ([True] + [False] * 8 + [True] + [False]*6 + [False] + [True] + [False] * 6 + [True] + [False]*7, [513, 258]), + ]) + def test_simutils_bitsToRegisters(self, bits, registers): + """Test convert list[bool] to list[int].""" + assert registers == SimUtils.bitsToRegisters(bits) + + @pytest.mark.parametrize(("bits"), [ + ([True] + [False] * 8 + [True] + [False]*5), + ([True] + [False] * 8 + [True] + [False]*6 + [False] + [True] + [False] * 6 + [True] + [False]*6), + ]) + def test_simutils_bitsToRegisters_not_ok(self, bits): + """Test convert list[bool] to list[int].""" + with pytest.raises(TypeError): + SimUtils.bitsToRegisters(bits) + + @pytest.mark.parametrize(("bits", "registers"), [ + ([True] + [False] * 8 + [True] + [False]*6, [513]), + ([True] + [False] * 8 + [True] + [False]*6 + [False] + [True] + [False] * 6 + [True] + [False]*7, [513, 258]), + ]) + def test_simutils_registersToBits(self, bits, registers): + """Test convert list[bool] to list[int].""" + new_bits = SimUtils.registersToBits(registers) + assert bits == new_bits + + + @pytest.mark.parametrize(("registers", "offset", "bits", "expect"), [ + ([513], 0, [], [513]), + ([513], 0, [False], [512]), + ([513], 1, [False], [513]), + ([513], 0, [True, True], [515]), + ([513], 1, [True], [515]), + ([513], 15, [True], [33281]), + ([513, 0], 15, [True, True], [33281, 1]), + ([513, 0], 15, [True, False], [33281, 0]), + ]) + def test_simutils_mergeBitsToRegisters(self, registers, offset, bits, expect): + """Test convert list[bool] to list[int].""" + temp_regs = registers.copy() + SimUtils.mergeBitsToRegisters(offset, temp_regs, bits) + assert temp_regs == expect + + + @pytest.mark.parametrize(("byte_list", "expect"), [ + (b'\x01\x01', [257]), + (b'\x01\x02', [258]), + (b'\x01\x01\x01\x02', [257, 258]), + ]) + def test_simutils_bytesToRegisters(self, byte_list, expect): + """Test convert list[bool] to list[int].""" + regs = SimUtils.bytesToRegisters(byte_list) + assert regs == expect