Skip to content

Commit e8fb614

Browse files
committed
test utils.
1 parent 0077d26 commit e8fb614

8 files changed

Lines changed: 102 additions & 108 deletions

File tree

API_changes.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ API changes 3.13.0
88
a proper forwarder should be made at frame level.
99
- datastore get/setValues is removed,
1010
please use server.async_get/setValues instead.
11+
- datastore show a deprecation warning
12+
- SimData/SimDevice have been updated
1113

1214
API changes 3.12.0
1315
------------------

pymodbus/simulator/simdata.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from dataclasses import dataclass
66
from typing import TypeAlias, cast
77

8-
from ..pdu.utils import pack_bitstring
98
from .simutils import DataType, SimUtils
109

1110

@@ -118,6 +117,11 @@ class SimData:
118117
#: Used to check access and convert value to/from registers or mark as invalid.
119118
datatype: DataType = DataType.INVALID
120119

120+
#: String encoding
121+
#:
122+
#: Used to convert a SimData(DataType.STRING) to registers.
123+
string_encoding: str = "utf-8"
124+
121125
#: Mark register(s) as readonly.
122126
readonly: bool = False
123127

@@ -136,6 +140,10 @@ def __check_simple(self):
136140
raise TypeError("values= cannot be used with invalid=True")
137141
if isinstance(self.values, list) and not self.values:
138142
raise TypeError("values= list cannot be empty")
143+
try:
144+
"test string".encode(self.string_encoding)
145+
except (UnicodeEncodeError, LookupError) as exc:
146+
raise TypeError("string_encoding= not valid") from exc
139147

140148
def __check_parameters(self):
141149
"""Check all parameters."""
@@ -159,13 +167,9 @@ def __post_init__(self):
159167
def build_registers_bits_block(self) -> list[bool]:
160168
"""Convert values= to registers from bits (1 bit in each register)."""
161169
x_values = self.values if isinstance(self.values, list) else [self.values]
162-
coils: list[bool] = []
163170
if isinstance(x_values[0], bool):
164-
coils = cast(list[bool], x_values)
165-
else:
166-
bool_list = SimUtils.convert_registers_bits(cast(list[int], x_values))
167-
coils.extend(bool_list)
168-
return coils
171+
return cast(list[bool], x_values)
172+
return SimUtils.registersToBits(cast(list[int], x_values))
169173

170174
def build_registers_bits_shared(self) -> list[int]:
171175
"""Convert values= to registers from bits (16 bits in each register)."""
@@ -176,33 +180,31 @@ def build_registers_bits_shared(self) -> list[int]:
176180
raise TypeError(f"SimData address={self.address} values= must be a multiple of 16")
177181
return SimUtils.bitsToRegisters(cast(list[bool], x_values))
178182

179-
def build_registers_string(self, endian: tuple[bool, bool], string_encoding: str) -> list[int]:
183+
def build_registers_string(self) -> list[int]:
180184
"""Convert values= to registers from string(s)."""
181185
x_values = self.values if isinstance(self.values, list) else [self.values]
182186
blocks_regs: list[int] = []
183187
for value in x_values:
184-
bytes_string = cast(str, value).encode(string_encoding)
185-
if len(bytes_string) % 2:
186-
bytes_string += b"\x00"
187-
blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(bytes_string), endian[0], endian[1], 1))
188+
bytes_string = cast(str, value).encode(self.string_encoding)
189+
regs = SimUtils.bytesToRegisters(bytes_string)
190+
blocks_regs.extend(regs)
188191
return blocks_regs
189192

190193

191-
def build_registers(self, endian: tuple[bool, bool], string_encoding: str, block_bits: bool) -> list[int] | list[bool]:
194+
def build_registers(self, block_bits: bool) -> list[int] | list[bool]:
192195
"""Convert values= to registers."""
193196
self.__check_parameters()
194197
if self.datatype == DataType.STRING:
195-
block_regs = self.build_registers_string(endian, string_encoding)
196-
return block_regs * self.count
198+
return self.build_registers_string() * self.count
197199
if block_bits:
198-
return self.build_registers_bits_block()
200+
return self.build_registers_bits_block() * self.count
199201
if self.datatype == DataType.BITS:
200-
return self.build_registers_bits_shared()
202+
return self.build_registers_bits_shared() * self.count
201203

202204
x_values = self.values if isinstance(self.values, list) else [self.values]
203-
_x_datatype, x_struct, x_len = SimUtils.DATATYPE_STRUCT[self.datatype]
205+
_x_datatype, x_struct, _x_len = SimUtils.DATATYPE_STRUCT[self.datatype]
204206
blocks_regs: list[int] = []
205207
for v in x_values:
206208
byte_list = struct.pack(f">{x_struct}", v)
207-
blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(byte_list), endian[0], endian[1], x_len))
209+
blocks_regs.extend(SimUtils.bytesToRegisters(byte_list))
208210
return blocks_regs

pymodbus/simulator/simdevice.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,6 @@ class SimDevice:
9494
#: ..tip:: Content (word_order, byte_order), True means big-endian.
9595
endian: tuple[bool, bool] = (True, True)
9696

97-
#: String encoding
98-
#:
99-
string_encoding: str = "utf-8"
100-
10197
#: Set device identity
10298
identity: ModbusDeviceIdentification | None = None
10399

@@ -138,11 +134,6 @@ def __check_simple(self):
138134
and isinstance(self.endian[1], bool)
139135
):
140136
raise TypeError("endian= must be a tuple with 2 bool")
141-
test_str = "test string"
142-
try:
143-
test_str.encode(self.string_encoding)
144-
except (UnicodeEncodeError, LookupError) as exc:
145-
raise TypeError("string_encoding= not valid") from exc
146137

147138
def __check_simple2(self):
148139
"""Check simple parameters."""
@@ -186,7 +177,7 @@ def __check_block_entries(self, last_address: int, entry: SimData, use_bits: boo
186177
"""Check block entries."""
187178
if entry.address <= last_address:
188179
raise TypeError(f"SimData address {entry.address} is overlapping!")
189-
blocks_regs = entry.build_registers(self.endian, self.string_encoding, use_bits) * entry.count
180+
blocks_regs = entry.build_registers(use_bits) * entry.count
190181
return last_address + len(blocks_regs)
191182

192183
def __check_parameters(self):
@@ -214,7 +205,7 @@ def __build_flags(self, simdata: SimData) -> int:
214205
def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int], use_bits: bool):
215206
"""Build registers for single SimData."""
216207
flag_normal = self.__build_flags(simdata)
217-
blocks_regs = simdata.build_registers(self.endian, self.string_encoding, use_bits)
208+
blocks_regs = simdata.build_registers(use_bits)
218209
for _ in range(simdata.count):
219210
first = True
220211
for register in blocks_regs:
@@ -252,14 +243,14 @@ def __create_block_bits(self, simdata: list[SimData]) -> SimRegs:
252243
if (next_address := start_address + len(bit_list)) < entry.address:
253244
bit_list.extend([False] * (entry.address - next_address))
254245
next_address = start_address + len(bit_list)
255-
entry_bits = entry.build_registers(self.endian, self.string_encoding, True)
246+
entry_bits = entry.build_registers(True)
256247
bit_list.extend(cast(list[bool], entry_bits))
257248
if (remains := len(bit_list) % 16):
258249
bit_list.extend([False] * (16 - remains))
259250

260251
flag_list: list[int] = [DataType.BITS] + [0] * (int(len(bit_list) / 16) -1) + [DataType.INVALID]
261252
bytes_bits = bytearray(pack_bitstring(bit_list))
262-
reg_list = SimUtils.convert_bytes_registers(bytes_bits, self.endian[0], not self.endian[1], 1)
253+
reg_list = SimUtils.bytesToRegisters(bytes_bits)
263254
reg_list.append(0)
264255
return (int(start_address / 16), reg_list, flag_list)
265256

pymodbus/simulator/simruntime.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async def get_block(self, func_code: int, address: int, count: int, values: list
6868
if (result := await self.__check_block(func_code, block_id, address, count, offset, values)):
6969
return result
7070
if fc_block in {"c", "d"}:
71-
list_bools = SimUtils.convert_registers_bits(registers[offset:offset+count])
71+
list_bools = SimUtils.registersToBits(registers[offset:offset+count])
7272
if not use_bits:
7373
return list_bools
7474
bit_offset = address % 16

pymodbus/simulator/simutils.py

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
import enum
55

6-
from ..pdu.utils import unpack_bitstring
7-
86

97
class DataType(enum.IntEnum):
108
"""Register types, used to define of a group of registers.
@@ -108,38 +106,16 @@ def bitsToRegisters(cls, bits: list[bool]) -> list[int]:
108106
@classmethod
109107
def mergeBitsToRegisters(cls, bit_offset: int, registers: list[int], bits: list[bool]) -> None:
110108
"""Merge list of bits into registers in place."""
111-
bits = cls.registersToBits(registers)
112-
new_bits = bits[0:bit_offset] + bits + bits[bit_offset + len(bits):]
113-
registers = cls.bitsToRegisters(new_bits)
114-
115-
@classmethod
116-
def convert_registers_bits(cls, registers: list[int]):
117-
"""Convert list of registers to list of bool (bit 0 first)."""
118-
coils: list[bool] = []
119-
for v in registers:
120-
x = v.to_bytes(2, byteorder="big")
121-
bool_list = unpack_bitstring(x[1:2] + x[0:1])
122-
coils.extend(bool_list)
123-
return coils
109+
new_bits = cls.registersToBits(registers)
110+
new_bits[bit_offset:bit_offset+len(bits)] = bits
111+
registers[0:] = cls.bitsToRegisters(new_bits)
124112

125113
@classmethod
126-
def convert_bytes_registers(cls, byte_list: bytearray, word_order: bool, byte_order: bool, data_type_len: int) -> list[int]:
127-
"""Convert bytearray to registers."""
128-
if byte_order:
129-
regs = [
130-
int.from_bytes(byte_list[x : x + 2], "big")
131-
for x in range(0, len(byte_list), 2)
132-
]
133-
else:
134-
regs = [
135-
int.from_bytes([byte_list[x+1], byte_list[x]], "big")
136-
for x in range(0, len(byte_list), 2)
137-
]
138-
if word_order:
139-
return regs
140-
reversed_regs: list[int] = []
141-
for x in range(0, len(regs), data_type_len):
142-
single_value_regs = regs[x: x + data_type_len]
143-
single_value_regs.reverse()
144-
reversed_regs = reversed_regs + single_value_regs
145-
return reversed_regs
114+
def bytesToRegisters(cls, byte_list: bytes) -> list[int]:
115+
"""Convert bytes into registers."""
116+
if len(byte_list) % 2:
117+
byte_list += b"\x00"
118+
return[
119+
int.from_bytes(byte_list[x : x + 2], "big")
120+
for x in range(0, len(byte_list), 2)
121+
]

test/simulator/test_simdata.py

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TestSimData:
2020
{"address": 6, "count": 10, "datatype": DataType.INT16, "values": [17, 18]},
2121
{"address": 7, "datatype": DataType.STRING, "values": "test"},
2222
{"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test"},
23+
{"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test", "string_encoding": "utf-8"},
2324
{"address": 0, "datatype": DataType.REGISTERS, "values": 17, "count": 5},
2425
{"address": 3, "datatype": DataType.INT16, "values": 17, "readonly": True},
2526
{"address": 0, "count": 2^16 -1},
@@ -45,6 +46,7 @@ def test_simdata_instanciate(self, kwargs):
4546
{"address": 1, "datatype": "not ok"},
4647
{"address": 1, "datatype": 11},
4748
{"address": 2, "values": 17},
49+
{"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test", "string_encoding": "not ok"},
4850
])
4951
def test_simdata_not_ok(self, kwargs):
5052
"""Test that simdata can be objects."""
@@ -94,40 +96,34 @@ def test_simdata_value_invalid(self, values):
9496
with pytest.raises(TypeError):
9597
SimData(0, values=values, datatype=DataType.INVALID)
9698

97-
@pytest.mark.parametrize(("value", "value_type", "order", "regs"), [
98-
# test word order
99-
(-27123, DataType.INT16, (True, True), [0x960D]),
100-
([-27123, 27123], DataType.INT16, (True, True), [0x960D, 0x69F3]),
101-
([-27123, 27123], DataType.INT16, (False, True), [0x960D, 0x69F3]),
102-
([32145678, -32145678], DataType.INT32, (True, True), [0x01EA, 0x810E, 0xFE15, 0x7EF2]),
103-
([32145678, -32145678], DataType.INT32, (False, True), [0x810E, 0x01EA, 0x7EF2, 0xFE15]),
104-
# test byte order
105-
([-27123, 27123], DataType.REGISTERS, (True, False), [0x0D96, 0xF369]),
106-
([32145678, -32145678], DataType.INT32, (True, False), [0xEA01, 0x0E81, 0x15FE, 0xF27E]),
99+
@pytest.mark.parametrize(("value", "value_type", "regs"), [
100+
(-27123, DataType.INT16, [0x960D]),
101+
([-27123, 27123], DataType.INT16, [0x960D, 0x69F3]),
102+
([32145678, -32145678], DataType.INT32, [0x01EA, 0x810E, 0xFE15, 0x7EF2]),
107103
# test data types
108-
(27123, DataType.REGISTERS, (True, True), [0x69F3]),
109-
(-27124, DataType.INT16, (True, True), [0x960C]),
110-
(27123, DataType.UINT16, (True, True), [0x69F3]),
111-
(-32145678, DataType.INT32, (True, True), [0xFE15, 0x7EF2]),
112-
(32145678, DataType.UINT32, (True, True), [0x01EA, 0x810E]),
113-
(-1234567890123456789, DataType.INT64, (True, True), [0xEEDD, 0xEF0B, 0x8216, 0x7EEB]),
114-
(1234567890123456789, DataType.UINT64, (True, True), [0x1122, 0x10F4, 0x7DE9, 0x8115]),
115-
(27123.5, DataType.FLOAT32, (True, True), [0x46D3, 0xE700]),
116-
(3.141592, DataType.FLOAT32, (True, True), [0x4049, 0x0FD8]),
117-
(-3.141592, DataType.FLOAT32, (True, True), [0xC049, 0x0FD8]),
118-
(27123.5, DataType.FLOAT64, (True, True), [0x40DA, 0x7CE0, 0x0000, 0x0000]),
119-
(3.14159265358979, DataType.FLOAT64, (True, True), [0x4009, 0x21FB, 0x5444, 0x2D11]),
120-
(-3.14159265358979, DataType.FLOAT64, (True, True), [0xC009, 0x21FB, 0x5444, 0x2D11]),
121-
(0x0100, DataType.BITS, (True, True), [256]),
122-
([0x0100, 0x0001], DataType.BITS, (True, True), [256, 1]),
123-
([True] + [False] * 15, DataType.BITS, (True, True), [1]),
124-
([True] + [False] * 8 + [True] + [False] * 6, DataType.BITS, (True, True), [513]),
125-
([True] + [False] * 15 + [False] * 8 + [True] + [False] * 7, DataType.BITS, (True, True), [1, 256]),
104+
(27123, DataType.REGISTERS, [0x69F3]),
105+
(-27124, DataType.INT16, [0x960C]),
106+
(27123, DataType.UINT16, [0x69F3]),
107+
(-32145678, DataType.INT32, [0xFE15, 0x7EF2]),
108+
(32145678, DataType.UINT32, [0x01EA, 0x810E]),
109+
(-1234567890123456789, DataType.INT64, [0xEEDD, 0xEF0B, 0x8216, 0x7EEB]),
110+
(1234567890123456789, DataType.UINT64, [0x1122, 0x10F4, 0x7DE9, 0x8115]),
111+
(27123.5, DataType.FLOAT32, [0x46D3, 0xE700]),
112+
(3.141592, DataType.FLOAT32, [0x4049, 0x0FD8]),
113+
(-3.141592, DataType.FLOAT32, [0xC049, 0x0FD8]),
114+
(27123.5, DataType.FLOAT64, [0x40DA, 0x7CE0, 0x0000, 0x0000]),
115+
(3.14159265358979, DataType.FLOAT64, [0x4009, 0x21FB, 0x5444, 0x2D11]),
116+
(-3.14159265358979, DataType.FLOAT64, [0xC009, 0x21FB, 0x5444, 0x2D11]),
117+
(0x0100, DataType.BITS, [256]),
118+
([0x0100, 0x0001], DataType.BITS, [256, 1]),
119+
([True] + [False] * 15, DataType.BITS, [1]),
120+
([True] + [False] * 8 + [True] + [False] * 6, DataType.BITS, [513]),
121+
([True] + [False] * 15 + [False] * 8 + [True] + [False] * 7, DataType.BITS, [1, 256]),
126122
])
127-
def test_simdata_build_registers(self, value, value_type, order, regs):
123+
def test_simdata_build_registers(self, value, value_type, regs):
128124
"""Test simdata value."""
129125
sd = SimData(0, values=value, datatype=value_type)
130-
build_regs = sd.build_registers((order[0], order[1]), "utf-8", False)
126+
build_regs = sd.build_registers(False)
131127
assert build_regs == regs
132128

133129
@pytest.mark.parametrize(("value", "code", "expect"), [
@@ -138,8 +134,8 @@ def test_simdata_build_registers(self, value, value_type, order, regs):
138134
])
139135
def test_simdata_build_string(self, value, code, expect):
140136
"""Test simdata value."""
141-
sd = SimData(0, values=value, datatype=DataType.STRING)
142-
build_regs = sd.build_registers((True, True), code, False)
137+
sd = SimData(0, values=value, datatype=DataType.STRING, string_encoding=code)
138+
build_regs = sd.build_registers(False)
143139
assert build_regs == expect
144140

145141
@pytest.mark.parametrize(("value", "regs"), [
@@ -152,22 +148,22 @@ def test_simdata_build_string(self, value, code, expect):
152148
def test_simdata_build_bit_block(self, value, regs):
153149
"""Test simdata value."""
154150
sd = SimData(0, values=value, datatype=DataType.BITS)
155-
build_regs = sd.build_registers((True, True), "utf-8", True)
151+
build_regs = sd.build_registers(True)
156152
assert build_regs == regs
157153

158154
def test_simdata_build_updated_simdata(self):
159155
"""Test simdata value."""
160156
sd = SimData(0, values="ABC", datatype=DataType.STRING)
161-
build_regs = sd.build_registers((True, True), "utf-8", False)
157+
build_regs = sd.build_registers(False)
162158
assert build_regs == [0x4142, 0x4300]
163159
sd.values="ABCDEF"
164-
build_regs = sd.build_registers((True, True), "utf-8", False)
160+
build_regs = sd.build_registers(False)
165161
assert build_regs == [0x4142, 0x4344, 0x4546]
166162

167163
sd.values=123
168164
with pytest.raises(TypeError):
169-
sd.build_registers((True, True), "utf-8", False)
165+
sd.build_registers(False)
170166
sd = SimData(0, values=[True, True], datatype=DataType.BITS)
171167
with pytest.raises(TypeError):
172-
sd.build_registers((True, True), "utf-8", False)
168+
sd.build_registers(False)
173169

test/simulator/test_simdevice.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def my_sync_action(
3939
simdata3 = SimData(10, datatype=DataType.BITS, values=16)
4040

4141
@pytest.mark.parametrize("kwargs", [
42-
{"id": 0, "simdata": [SimData(2, datatype=DataType.STRING, values="test")], "string_encoding": "utf-8"},
42+
{"id": 0, "simdata": [SimData(2, datatype=DataType.STRING, values="test")]},
4343
{"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], [simdata3])},
4444
{"id": 0, "simdata": simdata2},
4545
{"id": 0, "simdata": [simdata2, simdata1]},
@@ -61,7 +61,6 @@ def test_simdevice_instanciate(self, kwargs):
6161
{"id": 0, "simdata": ([simdata1], [simdata3], [simdata1], [simdata1])},
6262
{"id": 0, "simdata": ([simdata3], [simdata1], [simdata1], [simdata1])},
6363
{"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], "not ok")},
64-
{"id": 0, "simdata": [simdata1], "string_encoding": "not ok"},
6564
{"id": "not ok", "simdata": [simdata1]},
6665
{"id": 1.0, "simdata": [simdata1]},
6766
{"id": 1, "simdata": [simdata1, simdata1]},

test/simulator/test_simutils.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,35 @@ def test_simutils_bitsToRegisters_not_ok(self, bits):
3333
([True] + [False] * 8 + [True] + [False]*6, [513]),
3434
([True] + [False] * 8 + [True] + [False]*6 + [False] + [True] + [False] * 6 + [True] + [False]*7, [513, 258]),
3535
])
36-
def test_simutils_RegistersToBits(self, bits, registers):
36+
def test_simutils_registersToBits(self, bits, registers):
3737
"""Test convert list[bool] to list[int]."""
3838
new_bits = SimUtils.registersToBits(registers)
3939
assert bits == new_bits
40+
41+
42+
@pytest.mark.parametrize(("registers", "offset", "bits", "expect"), [
43+
([513], 0, [], [513]),
44+
([513], 0, [False], [512]),
45+
([513], 1, [False], [513]),
46+
([513], 0, [True, True], [515]),
47+
([513], 1, [True], [515]),
48+
([513], 15, [True], [33281]),
49+
([513, 0], 15, [True, True], [33281, 1]),
50+
([513, 0], 15, [True, False], [33281, 0]),
51+
])
52+
def test_simutils_mergeBitsToRegisters(self, registers, offset, bits, expect):
53+
"""Test convert list[bool] to list[int]."""
54+
temp_regs = registers.copy()
55+
SimUtils.mergeBitsToRegisters(offset, temp_regs, bits)
56+
assert temp_regs == expect
57+
58+
59+
@pytest.mark.parametrize(("byte_list", "expect"), [
60+
(b'\x01\x01', [257]),
61+
(b'\x01\x02', [258]),
62+
(b'\x01\x01\x01\x02', [257, 258]),
63+
])
64+
def test_simutils_bytesToRegisters(self, byte_list, expect):
65+
"""Test convert list[bool] to list[int]."""
66+
regs = SimUtils.bytesToRegisters(byte_list)
67+
assert regs == expect

0 commit comments

Comments
 (0)