Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/server_datamodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
This file shows examples of how to configure the datamodel for the server/simulator.

There are different examples showing the flexibility of the datamodel.

**REMARK** This code is experimental and not integrated into production.

"""

from pymodbus.constants import DataType
Expand Down Expand Up @@ -45,7 +48,7 @@ def define_datamodel():
#block4 = SimData(17, count=5, values=123, datatype=DataType.INT64)
block5 = SimData(27, 1, "Hello ", datatype=DataType.STRING)

block_def = SimData(0, count=1000, datatype=DataType.REGISTERS, default=True)
block_def = SimData(0, count=1000, datatype=DataType.REGISTERS)

# SimDevice can be instantiated with positional or optional parameters:
assert SimDevice(
Expand Down
8 changes: 6 additions & 2 deletions pymodbus/simulator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Simulator."""
"""Simulator.

**REMARK** This code is experimental and not integrated into production.
"""

__all__ = [
"SimAction",
"SimCore",
"SimData",
"SimDevice",
"SimDevices",
"SimValueType",
]

Expand All @@ -14,4 +18,4 @@
SimData,
SimValueType,
)
from .simdevice import SimDevice
from .simdevice import SimDevice, SimDevices
5 changes: 4 additions & 1 deletion pymodbus/simulator/simcore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Simulator data model implementation."""
"""Simulator data model implementation.

**REMARK** This code is experimental and not integrated into production.
"""
from __future__ import annotations

from .simdata import SimData
Expand Down
69 changes: 29 additions & 40 deletions pymodbus/simulator/simdata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Simulator data model classes."""
"""Simulator data model classes.

**REMARK** This code is experimental and not integrated into production.
"""
from __future__ import annotations

import inspect
Expand All @@ -25,12 +28,12 @@ class SimData:
SimData(
address=100,
count=5,
value=12345678
values=12345678
datatype=DataType.INT32
)
SimData(
address=100,
value=[1, 2, 3, 4, 5]
values=[1, 2, 3, 4, 5]
datatype=DataType.INT32
)

Expand All @@ -40,29 +43,38 @@ class SimData:

SimData(
address=100,
count=17,
value=True
count=16,
values=True
datatype=DataType.BITS
)
SimData(
address=100,
values=[True] * 16
datatype=DataType.BITS
)
SimData(
address=100,
value=[0xffff, 1]
values=0xffff
datatype=DataType.BITS
)
SimData(
address=100,
values=[0xffff]
datatype=DataType.BITS
)

Each SimData defines 17 BITS (coils), with value True.
Each SimData defines 16 BITS (coils), with value True.

In block mode (CO and DI) addresses are 100-116 (each 1 bit)
Value are stored in registers (16bit is 1 register), the address refer to the register.

In shared mode BITS are stored in registers (16bit is 1 register), the address refer to the register,
addresses are 100-101 (with register 101 being padded with 15 bits set to False)
**Remark** when using offsets, only bit 0 of each register is used!

.. code-block:: python

SimData(
address=0,
count=1000,
value=0x1234
values=0x1234
datatype=DataType.REGISTERS
)

Expand All @@ -76,9 +88,10 @@ class SimData:
#:
#: - count=3 datatype=DataType.REGISTERS is 3 registers.
#: - count=3 datatype=DataType.INT32 is 6 registers.
#: - count=1 (default), value="ABCD" is 2 registers
#: - count=1 datatype=DataType.STRING, values="ABCD" is 2 registers
#: - count=2 datatype=DataType.STRING, values="ABCD" is 4 registers
#:
#: Cannot be used if value is a list or datatype is DataType.STRING
#: Count cannot be used if values= is a list
count: int = 1

#: Value/Values of datatype,
Expand Down Expand Up @@ -111,25 +124,6 @@ class SimData:
#: **remark** only to be used with address= and count=
invalid: bool = False

#: Use as default for undefined registers
#: Define legal register range as:
#:
#: address= <= legal addresses <= address= + count=
#:
#: **remark** only to be used with address= and count=
default: bool = False

#: The following are internal variables
register_count: int = -1
type_size: int = -1

def __check_default(self):
"""Check use of default=."""
if self.datatype != DataType.REGISTERS:
raise TypeError("default=True only works with datatype=DataType.REGISTERS")
if isinstance(self.values, list):
raise TypeError("default=True only works with values=<integer>")

def __check_simple(self):
"""Check simple parameters."""
if not isinstance(self.address, int) or not 0 <= self.address <= 65535:
Expand All @@ -142,23 +136,18 @@ def __check_simple(self):
raise TypeError("datatype= must by an DataType")
if self.action and not (callable(self.action) and inspect.iscoroutinefunction(self.action)):
raise TypeError("action= not a async function")
if self.register_count != -1:
raise TypeError("register_count= is illegal")
if self.type_size != -1:
raise TypeError("type_size= is illegal")

def __post_init__(self):
"""Define a group of registers."""
self.__check_simple()
if self.default:
self.__check_default()
x_datatype: type | tuple[type, type]
if self.datatype == DataType.STRING:
if not isinstance(self.values, str):
raise TypeError("datatype=DataType.STRING only allows values=\"string\"")
x_datatype, x_len = str, int((len(self.values) +1) / 2)
else:
x_datatype, x_len = DATATYPE_STRUCT[self.datatype]
x = DATATYPE_STRUCT[self.datatype]
x_len = x[1]
x_datatype = cast(type[str], x[0])
if not isinstance(self.values, list):
super().__setattr__("values", [self.values])
for x_value in cast(list, self.values):
Expand Down
152 changes: 113 additions & 39 deletions pymodbus/simulator/simdevice.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""Simulator device model classes."""
"""Simulator device model classes.

**REMARK** This code is experimental and not integrated into production.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import cast

from .simdata import SimData
from pymodbus.constants import DATATYPE_STRUCT, DataType

from .simdata import SimData

OFFSET_NONE = (-1, -1, -1, -1)

@dataclass(order=True, frozen=True)
class SimDevice:
Expand Down Expand Up @@ -62,70 +66,140 @@ class SimDevice:
#:
registers: list[SimData]

#: Use this for old devices with 4 blocks.
#: Default SimData to be used for registers not defined.
default: SimData | None = None

#: Define starting address for each of the 4 blocks.
#:
#: .. tip:: content is (coil, direct, holding, input)
offset_address: tuple[int, int, int, int] = OFFSET_NONE
#: .. tip:: Content (coil, direct, holding, input) in growing order.
offset_address: tuple[int, int, int, int] | None = None

#: Enforce type checking, if True access are controlled to be conform with datatypes.
#:
#: Type violations like e.g. reading INT32 as INT16 are returned as ExceptionResponses,
#: as well as being logged.
type_check: bool = False

#: Change endianness.
#:
#: Word order is not defined in the modbus standard and thus a device that
#: uses little-endian is still within the modbus standard.
#:
#: Byte order is defined in the modbus standard to be big-endian,
#: however it is definable to test non-standard modbus devices
#:
#: ..tip:: Content (word_order, byte_order)
endian: tuple[bool, bool] = (True, True)

#: Set device identity
#:
identity: str = "pymodbus simulator/server"


def __check_block(self, block: list[SimData]) -> list[SimData]:
"""Check block content."""
if not block:
return block
for inx, entry in enumerate(block):
if not isinstance(entry, SimData):
raise TypeError(f"registers[{inx}]= is a SimData entry")
block.sort(key=lambda x: x.address)
return self.__check_block_entries(block)

def __check_block_entries(self, block: list[SimData]) -> list[SimData]:
"""Check block entries."""
last_address = -1
if len(block) > 1 and block[1].default:
temp = block[0]
block[0] = block[1]
block[1] = temp
first = True
for entry in block:
if entry.default:
if first:
first = False
continue
raise TypeError("Multiple default SimData, not allowed")
first = False
if entry.address <= last_address:
raise TypeError("SimData address {entry.address} is overlapping!")
last_address = entry.address + entry.register_count -1
if not block[0].default:
default = SimData(address=block[0].address, count=last_address - block[0].address +1, default=True)
block.insert(0, default)
max_address = block[0].address + block[0].register_count -1
if last_address > max_address:
raise TypeError("Default set max address {max_address} but {last_address} is defined?")
if len(block) > 1 and block[0].address > block[1].address:
raise TypeError("Default set lowest address to {block[0].address} but {block[1].address} is defined?")
last_address = self.__check_block_entries(last_address, entry)
if self.default and block:
first_address = block[0].address
if self.default.address > first_address:
raise TypeError("Default address is {self.default.address} but {first_address} is defined?")
def_last_address = self.default.address + self.default.count -1
if last_address > def_last_address:
raise TypeError("Default address+count is {def_last_address} but {last_address} is defined?")
return block

def __post_init__(self):
"""Define a device."""
def __check_block_entries(self, last_address: int, entry: SimData) -> int:
"""Check block entries."""
values = entry.values if isinstance(entry.values, list) else [entry.values]
if entry.address <= last_address:
raise TypeError("SimData address {entry.address} is overlapping!")
if entry.datatype == DataType.BITS:
if isinstance(values[0], bool):
reg_count = int((len(values) + 15) / 16)
else:
reg_count = len(values)
return entry.address + reg_count * entry.count -1
if entry.datatype == DataType.STRING:
return entry.address + len(cast(str, entry.values)) * entry.count -1
register_count = DATATYPE_STRUCT[entry.datatype][1]
return entry.address + register_count * entry.count -1

def __check_simple(self):
"""Check simple parameters."""
if not isinstance(self.id, int) or not 0 <= self.id <= 255:
raise TypeError("0 <= id < 255")
if not isinstance(self.registers, list) or not self.registers:
if not isinstance(self.registers, list):
raise TypeError("registers= not a list")
if not self.default and not self.registers:
raise TypeError("Either registers= or default= must contain SimData")
if not isinstance(self.type_check, bool):
raise TypeError("type_check= not a bool")
if (not self.endian
or not isinstance(self.endian, tuple)
or len(self.endian) != 2
or not isinstance(self.endian[0], bool)
or not isinstance(self.endian[1], bool)
):
raise TypeError("endian= must be a tuple with 2 bool")
if not isinstance(self.identity, str):
raise TypeError("identity= must be a string")
if not self.default:
return
if not isinstance(self.default, SimData):
raise TypeError("default= must be a SimData object")
if not self.default.datatype == DataType.REGISTERS:
raise TypeError("default= only allow datatype=DataType.REGISTERS")

def __post_init__(self):
"""Define a device."""
self.__check_simple()
super().__setattr__("registers", self.__check_block(self.registers))
if self.offset_address != OFFSET_NONE:
if self.offset_address is not None:
if not isinstance(self.offset_address, tuple):
raise TypeError("offset_address= must be a tuple")
if len(self.offset_address) != 4:
raise TypeError("offset_address= must have 4 addresses")
reg_start = self.registers[0].address
reg_end = self.registers[0].address + self.registers[0].register_count
raise TypeError("offset_address= must be a tuple with 4 addresses")
if self.default:
reg_start = self.default.address
reg_end = self.default.address + self.default.count -1
else:
reg_start = self.registers[0].address
reg_end = self.registers[-1].address
for i in range(4):
if not (reg_start < self.offset_address[i] < reg_end):
raise TypeError(f"offset_address[{i}] outside defined range")
if i and self.offset_address[i-1] >= self.offset_address[i]:
raise TypeError("offset_address= must be ascending addresses")

@dataclass(order=True, frozen=True)
class SimDevices:
"""Define a group of devices.

If wanting to use multiple devices in a single server,
each SimDevice must be grouped with SimDevices.
"""

#: Add a list of SimDevice
devices: list[SimDevice]

def __post_init__(self):
"""Define a group of devices."""
if not isinstance(self.devices, list):
raise TypeError("devices= must be a list of SimDevice")
if not self.devices:
raise TypeError("devices= must contain at least 1 SimDevice")
list_id = []
for device in self.devices:
if not isinstance(device, SimDevice):
raise TypeError("devices= contains non SimDevice entries")
if device.id in list_id:
raise TypeError(f"device_id={device.id} is duplicated")
list_id.append(device.id)
Loading