diff --git a/snap7/__init__.py b/snap7/__init__.py index ba87536d..348919a3 100644 --- a/snap7/__init__.py +++ b/snap7/__init__.py @@ -13,6 +13,7 @@ from .partner import Partner from .logo import Logo from .util.db import Row, DB +from .util.symbols import SymbolTable from .type import Area, Block, WordLen, SrvEvent, SrvArea __all__ = [ @@ -23,6 +24,7 @@ "Logo", "Row", "DB", + "SymbolTable", "Area", "Block", "WordLen", diff --git a/snap7/util/symbols.py b/snap7/util/symbols.py new file mode 100644 index 00000000..7ff62a14 --- /dev/null +++ b/snap7/util/symbols.py @@ -0,0 +1,499 @@ +""" +Symbolic addressing for S7 PLC data blocks. + +Provides a SymbolTable class that maps human-readable tag names to PLC +addresses (db_number, byte_offset, data_type), enabling read/write operations +by tag name instead of raw addresses. + +Example:: + + from snap7.util.symbols import SymbolTable + + symbols = SymbolTable.from_csv("tags.csv") + value = symbols.read(client, "Motor1.Speed") + symbols.write(client, "Motor1.Speed", 1500.0) +""" + +import csv +import io +import json +import re +from dataclasses import dataclass +from logging import getLogger +from pathlib import Path +from typing import Any, Dict, Union + +from snap7.client import Client +from snap7.type import ValueType +from snap7.util import ( + get_bool, + get_byte, + get_char, + get_dint, + get_dword, + get_dt, + get_dtl, + get_int, + get_lreal, + get_real, + get_sint, + get_string, + get_tod, + get_udint, + get_uint, + get_usint, + get_wchar, + get_word, + get_wstring, + get_date, + get_time, + get_lword, + set_bool, + set_byte, + set_char, + set_date, + set_dint, + set_dword, + set_dt, + set_dtl, + set_int, + set_lreal, + set_real, + set_sint, + set_string, + set_tod, + set_udint, + set_uint, + set_usint, + set_wchar, + set_word, + set_wstring, + set_time, + set_lword, +) + +logger = getLogger(__name__) + +# --------------------------------------------------------------------------- +# Module-level getter/setter dispatch maps (built once, not per call) +# --------------------------------------------------------------------------- + +_GETTER_MAP: Dict[str, Any] = { + "BYTE": get_byte, + "SINT": get_sint, + "USINT": get_usint, + "CHAR": get_char, + "INT": get_int, + "UINT": get_uint, + # NOTE: get_word is annotated as returning bytearray but actually returns + # int at runtime (struct.unpack(">H", ...) -> int). It behaves correctly. + "WORD": get_word, + "DATE": get_date, + "DINT": get_dint, + "UDINT": get_udint, + "DWORD": get_dword, + "REAL": get_real, + "TIME": get_time, + "TOD": get_tod, + "TIME_OF_DAY": get_tod, + "DATE_AND_TIME": get_dt, + "DT": get_dt, + "LREAL": get_lreal, + "LWORD": get_lword, + "WCHAR": get_wchar, + "DTL": get_dtl, +} + +# Setters that cast value to int before calling +_INT_SETTER_MAP: Dict[str, Any] = { + "BYTE": set_byte, + "SINT": set_sint, + "USINT": set_usint, + "INT": set_int, + "UINT": set_uint, + "WORD": set_word, + "DINT": set_dint, + "UDINT": set_udint, + "DWORD": set_dword, +} + +# Setters that pass value through without casting +_SIMPLE_SETTER_MAP: Dict[str, Any] = { + "REAL": set_real, + "LREAL": set_lreal, + "CHAR": set_char, + "WCHAR": set_wchar, + "TIME": set_time, + "DATE": set_date, + "TOD": set_tod, + "TIME_OF_DAY": set_tod, + "DATE_AND_TIME": set_dt, + "DT": set_dt, + "DTL": set_dtl, + "LWORD": set_lword, +} + +# Mapping from S7 type name to the number of bytes needed to read +_TYPE_SIZE: Dict[str, int] = { + "BOOL": 1, + "BYTE": 1, + "SINT": 1, + "USINT": 1, + "CHAR": 1, + "INT": 2, + "UINT": 2, + "WORD": 2, + "DATE": 2, + "DINT": 4, + "UDINT": 4, + "DWORD": 4, + "REAL": 4, + "TIME": 4, + "TOD": 4, + "TIME_OF_DAY": 4, + "DATE_AND_TIME": 8, + "DT": 8, + "LREAL": 8, + "LWORD": 8, + "WCHAR": 2, + "DTL": 12, +} + +# Regex to extract STRING[n] or WSTRING[n] with size parameter +_STRING_RE = re.compile(r"^(STRING|WSTRING|FSTRING)\[(\d+)]$", re.IGNORECASE) + + +def _read_source(source: Union[str, Path]) -> str: + """Resolve *source* to text content. + + If *source* is a :class:`~pathlib.Path` it is always read as a file. + If it is a string that contains a newline character it is treated as + inline content (CSV / JSON). Otherwise the string is checked as a + file path and read if it exists; if not it is returned verbatim. + """ + if isinstance(source, Path): + return source.read_text() + s = str(source) + if "\n" in s: + return s + path = Path(s) + if path.exists(): + return path.read_text() + return s + + +@dataclass(frozen=True) +class TagAddress: + """Resolved address for a single PLC tag.""" + + db: int + offset: int + bit: int + type: str + + @property + def byte_offset(self) -> int: + """Return the byte offset (without bit component).""" + return self.offset + + def read_size(self) -> int: + """Return the number of bytes that need to be read from the PLC for this tag.""" + upper = self.type.upper() + match = _STRING_RE.match(upper) + if match: + kind = match.group(1) + length = int(match.group(2)) + if kind == "FSTRING": + return length + elif kind == "STRING": + # S7 STRING: 2-byte header + max_length characters + return 2 + length + elif kind == "WSTRING": + # S7 WSTRING: 4-byte header + max_length * 2 bytes + return 4 + length * 2 + if upper in _TYPE_SIZE: + return _TYPE_SIZE[upper] + raise ValueError(f"Unknown S7 type: {self.type}") + + +def _parse_offset(offset_str: str) -> tuple[int, int]: + """Parse an offset string like '4' or '4.0' into (byte_offset, bit_index). + + Args: + offset_str: offset value, e.g. '4', '4.0', '12.3' + + Returns: + Tuple of (byte_offset, bit_index). + """ + if "." in str(offset_str): + parts = str(offset_str).split(".") + return int(parts[0]), int(parts[1]) + return int(float(offset_str)), 0 + + +class SymbolTable: + """Map symbolic tag names to PLC addresses and perform typed reads/writes. + + Supports construction from: + - A Python dict mapping tag names to address dicts + - A CSV file or string (via :meth:`from_csv`) + - A JSON file or string (via :meth:`from_json`) + + Tag names support dot-separated nested paths (e.g. ``"Motor1.Speed"``) + and array indexing (e.g. ``"Motors[3].Speed"``). + + Example:: + + symbols = SymbolTable({ + "Motor1.Speed": {"db": 1, "offset": 0, "type": "REAL"}, + "Motor1.Running": {"db": 1, "offset": 4, "bit": 0, "type": "BOOL"}, + }) + value = symbols.read(client, "Motor1.Speed") + symbols.write(client, "Motor1.Speed", 1500.0) + """ + + def __init__(self, tags: Dict[str, Dict[str, Any]]) -> None: + self._tags: Dict[str, TagAddress] = {} + for name, info in tags.items(): + self._add_tag(name, info) + + # ------------------------------------------------------------------ + # Construction helpers + # ------------------------------------------------------------------ + + def _add_tag(self, name: str, info: Dict[str, Any]) -> None: + db = int(info["db"]) + offset_raw = info.get("offset", 0) + byte_offset, default_bit = _parse_offset(str(offset_raw)) + bit = int(info.get("bit", default_bit)) + type_ = str(info["type"]) + self._tags[name] = TagAddress(db=db, offset=byte_offset, bit=bit, type=type_) + + @classmethod + def from_csv(cls, source: Union[str, Path]) -> "SymbolTable": + """Create a SymbolTable from a CSV file or CSV string. + + The CSV must have columns: ``tag``, ``db``, ``offset``, ``type``. + An optional ``bit`` column overrides the bit index parsed from the offset. + + Args: + source: path to a CSV file, or a CSV-formatted string. Strings + that contain newlines are always treated as inline CSV content. + + Returns: + A new :class:`SymbolTable`. + """ + text = _read_source(source) + + reader = csv.DictReader(io.StringIO(text)) + tags: Dict[str, Dict[str, Any]] = {} + for row in reader: + name = row["tag"].strip() + entry: Dict[str, Any] = { + "db": row["db"].strip(), + "offset": row["offset"].strip(), + "type": row["type"].strip(), + } + if "bit" in row and row["bit"] is not None and row["bit"].strip(): + entry["bit"] = row["bit"].strip() + tags[name] = entry + return cls(tags) + + @classmethod + def from_json(cls, source: Union[str, Path]) -> "SymbolTable": + """Create a SymbolTable from a JSON file or JSON string. + + The JSON should be an object mapping tag names to address objects, + each with keys ``db``, ``offset``, ``type``, and optionally ``bit``. + + Args: + source: path to a JSON file, or a JSON-formatted string. Strings + that contain newlines are always treated as inline JSON content. + + Returns: + A new :class:`SymbolTable`. + """ + text = _read_source(source) + + data: Dict[str, Dict[str, Any]] = json.loads(text) + return cls(data) + + # ------------------------------------------------------------------ + # Lookup + # ------------------------------------------------------------------ + + def resolve(self, tag: str) -> TagAddress: + """Resolve a tag name to its :class:`TagAddress`. + + Args: + tag: the symbolic name (e.g. ``"Motor1.Speed"``). + + Returns: + The resolved address. + + Raises: + KeyError: if the tag is not defined in this table. + """ + if tag in self._tags: + return self._tags[tag] + raise KeyError(f"Unknown tag: {tag!r}") + + @property + def tags(self) -> Dict[str, TagAddress]: + """Return a copy of the internal tag mapping.""" + return dict(self._tags) + + def __len__(self) -> int: + return len(self._tags) + + def __contains__(self, tag: str) -> bool: + return tag in self._tags + + # ------------------------------------------------------------------ + # Read / write + # ------------------------------------------------------------------ + + def read(self, client: Client, tag: str) -> ValueType: + """Read a single tag value from the PLC. + + Args: + client: a connected :class:`~snap7.client.Client`. + tag: symbolic tag name. + + Returns: + The value, typed according to the tag's S7 data type. + """ + addr = self.resolve(tag) + size = addr.read_size() + data = client.db_read(addr.db, addr.byte_offset, size) + return self._get_value(data, 0, addr) + + def write(self, client: Client, tag: str, value: Any) -> None: + """Write a single tag value to the PLC. + + Args: + client: a connected :class:`~snap7.client.Client`. + tag: symbolic tag name. + value: the value to write. + """ + addr = self.resolve(tag) + size = addr.read_size() + + upper = addr.type.upper() + if upper == "BOOL": + # For BOOL we need to read-modify-write the byte + data = client.db_read(addr.db, addr.byte_offset, 1) + set_bool(data, 0, addr.bit, bool(value)) + client.db_write(addr.db, addr.byte_offset, data) + return + + # For non-BOOL types we can write directly + data = bytearray(size) + self._set_value(data, 0, addr, value) + client.db_write(addr.db, addr.byte_offset, data) + + def read_many(self, client: Client, tags: list[str]) -> Dict[str, ValueType]: + """Read multiple tags individually and return them as a dictionary. + + This is a convenience method that reads each tag one at a time via + :meth:`read`. It does **not** batch or group reads. + + Args: + client: a connected :class:`~snap7.client.Client`. + tags: list of tag names to read. + + Returns: + Dictionary mapping tag names to their values. + """ + result: Dict[str, ValueType] = {} + for tag in tags: + result[tag] = self.read(client, tag) + return result + + # ------------------------------------------------------------------ + # Internal getter / setter dispatch + # ------------------------------------------------------------------ + + @staticmethod + def _get_value(data: bytearray, base_offset: int, addr: TagAddress) -> ValueType: + """Extract a typed value from a bytearray at the given offset.""" + upper = addr.type.upper() + offset = base_offset + + if upper == "BOOL": + return get_bool(data, offset, addr.bit) + + match = _STRING_RE.match(upper) + if match: + kind = match.group(1) + length = int(match.group(2)) + if kind == "FSTRING": + from snap7.util import get_fstring + + return get_fstring(data, offset, length) + elif kind == "STRING": + return get_string(data, offset) + elif kind == "WSTRING": + return get_wstring(data, offset) + + if upper in _GETTER_MAP: + return _GETTER_MAP[upper](data, offset) # type: ignore[no-any-return] + + raise ValueError(f"Unsupported S7 type for reading: {addr.type}") + + @staticmethod + def _set_value(data: bytearray, base_offset: int, addr: TagAddress, value: Any) -> None: + """Write a typed value into a bytearray at the given offset.""" + upper = addr.type.upper() + offset = base_offset + + if upper == "BOOL": + set_bool(data, offset, addr.bit, bool(value)) + return + + match = _STRING_RE.match(upper) + if match: + kind = match.group(1) + length = int(match.group(2)) + if kind == "FSTRING": + from snap7.util import set_fstring + + set_fstring(data, offset, str(value), length) + return + elif kind == "STRING": + set_string(data, offset, str(value), length) + return + elif kind == "WSTRING": + set_wstring(data, offset, str(value), length) + return + + if upper in _INT_SETTER_MAP: + _INT_SETTER_MAP[upper](data, offset, int(value)) + return + + if upper in _SIMPLE_SETTER_MAP: + _SIMPLE_SETTER_MAP[upper](data, offset, value) + return + + raise ValueError(f"Unsupported S7 type for writing: {addr.type}") + + # ------------------------------------------------------------------ + # Merge + # ------------------------------------------------------------------ + + def merge(self, other: "SymbolTable") -> "SymbolTable": + """Return a new SymbolTable containing tags from both tables. + + Args: + other: another :class:`SymbolTable` to merge with. + + Returns: + A new merged :class:`SymbolTable`. Tags from *other* override + duplicates from *self*. + """ + combined: Dict[str, Dict[str, Any]] = {} + for name, addr in self._tags.items(): + combined[name] = {"db": addr.db, "offset": addr.offset, "bit": addr.bit, "type": addr.type} + for name, addr in other._tags.items(): + combined[name] = {"db": addr.db, "offset": addr.offset, "bit": addr.bit, "type": addr.type} + return SymbolTable(combined) diff --git a/tests/test_symbols.py b/tests/test_symbols.py new file mode 100644 index 00000000..2223c80f --- /dev/null +++ b/tests/test_symbols.py @@ -0,0 +1,443 @@ +"""Tests for snap7.util.symbols — symbolic addressing.""" + +import json +import struct +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from snap7.util.symbols import SymbolTable, TagAddress, _parse_offset + + +# --------------------------------------------------------------------------- +# TagAddress basics +# --------------------------------------------------------------------------- + + +class TestTagAddress: + def test_read_size_real(self) -> None: + addr = TagAddress(db=1, offset=0, bit=0, type="REAL") + assert addr.read_size() == 4 + + def test_read_size_bool(self) -> None: + addr = TagAddress(db=1, offset=4, bit=0, type="BOOL") + assert addr.read_size() == 1 + + def test_read_size_int(self) -> None: + addr = TagAddress(db=1, offset=6, bit=0, type="INT") + assert addr.read_size() == 2 + + def test_read_size_string(self) -> None: + addr = TagAddress(db=1, offset=8, bit=0, type="STRING[20]") + assert addr.read_size() == 22 # 2-byte header + 20 + + def test_read_size_wstring(self) -> None: + addr = TagAddress(db=1, offset=0, bit=0, type="WSTRING[10]") + assert addr.read_size() == 24 # 4-byte header + 10*2 + + def test_read_size_fstring(self) -> None: + addr = TagAddress(db=1, offset=0, bit=0, type="FSTRING[15]") + assert addr.read_size() == 15 + + def test_read_size_unknown_raises(self) -> None: + addr = TagAddress(db=1, offset=0, bit=0, type="UNKNOWN_TYPE") + with pytest.raises(ValueError, match="Unknown S7 type"): + addr.read_size() + + def test_read_size_lreal(self) -> None: + addr = TagAddress(db=1, offset=0, bit=0, type="LREAL") + assert addr.read_size() == 8 + + def test_read_size_dtl(self) -> None: + addr = TagAddress(db=1, offset=0, bit=0, type="DTL") + assert addr.read_size() == 12 + + def test_byte_offset_property(self) -> None: + addr = TagAddress(db=1, offset=10, bit=3, type="BOOL") + assert addr.byte_offset == 10 + + +# --------------------------------------------------------------------------- +# _parse_offset helper +# --------------------------------------------------------------------------- + + +class TestParseOffset: + def test_integer_offset(self) -> None: + assert _parse_offset("4") == (4, 0) + + def test_decimal_offset(self) -> None: + assert _parse_offset("12.3") == (12, 3) + + def test_zero_bit(self) -> None: + assert _parse_offset("4.0") == (4, 0) + + +# --------------------------------------------------------------------------- +# Construction from dict +# --------------------------------------------------------------------------- + + +class TestDictConstruction: + def test_basic_construction(self) -> None: + table = SymbolTable( + { + "Motor1.Speed": {"db": 1, "offset": 0, "type": "REAL"}, + "Motor1.Running": {"db": 1, "offset": 4, "bit": 0, "type": "BOOL"}, + } + ) + assert len(table) == 2 + assert "Motor1.Speed" in table + assert "Motor1.Running" in table + + def test_resolve_address(self) -> None: + table = SymbolTable({"Tank.Level": {"db": 2, "offset": 10, "type": "INT"}}) + addr = table.resolve("Tank.Level") + assert addr.db == 2 + assert addr.offset == 10 + assert addr.type == "INT" + assert addr.bit == 0 + + def test_resolve_with_bit(self) -> None: + table = SymbolTable({"Valve.Open": {"db": 1, "offset": 4, "bit": 3, "type": "BOOL"}}) + addr = table.resolve("Valve.Open") + assert addr.bit == 3 + + def test_resolve_offset_with_dot_notation(self) -> None: + table = SymbolTable({"Sensor.Active": {"db": 1, "offset": "12.5", "type": "BOOL"}}) + addr = table.resolve("Sensor.Active") + assert addr.offset == 12 + assert addr.bit == 5 + + def test_resolve_unknown_tag_raises(self) -> None: + table = SymbolTable({}) + with pytest.raises(KeyError, match="Unknown tag"): + table.resolve("NonExistent") + + def test_nested_path(self) -> None: + table = SymbolTable({"Motors[3].Speed": {"db": 1, "offset": 24, "type": "REAL"}}) + addr = table.resolve("Motors[3].Speed") + assert addr.db == 1 + assert addr.offset == 24 + + def test_tags_property_returns_copy(self) -> None: + table = SymbolTable({"X": {"db": 1, "offset": 0, "type": "INT"}}) + tags = table.tags + tags["Y"] = TagAddress(db=2, offset=0, bit=0, type="INT") + assert "Y" not in table + + +# --------------------------------------------------------------------------- +# Construction from CSV +# --------------------------------------------------------------------------- + + +CSV_CONTENT = """\ +tag,db,offset,type +Motor1.Speed,1,0,REAL +Motor1.Running,1,4.0,BOOL +Tank.Level,1,6,INT +Tank.Name,1,8,STRING[20] +""" + + +class TestCSVConstruction: + def test_from_csv_string(self) -> None: + table = SymbolTable.from_csv(CSV_CONTENT) + assert len(table) == 4 + addr = table.resolve("Motor1.Speed") + assert addr.db == 1 + assert addr.offset == 0 + assert addr.type == "REAL" + + def test_from_csv_bool_bit(self) -> None: + table = SymbolTable.from_csv(CSV_CONTENT) + addr = table.resolve("Motor1.Running") + assert addr.type == "BOOL" + assert addr.bit == 0 + + def test_from_csv_file(self, tmp_path: Path) -> None: + csv_file = tmp_path / "tags.csv" + csv_file.write_text(CSV_CONTENT) + table = SymbolTable.from_csv(csv_file) + assert len(table) == 4 + + def test_from_csv_with_bit_column(self) -> None: + csv_with_bit = """\ +tag,db,offset,bit,type +Valve.Open,1,4,3,BOOL +""" + table = SymbolTable.from_csv(csv_with_bit) + addr = table.resolve("Valve.Open") + assert addr.bit == 3 + + +# --------------------------------------------------------------------------- +# Construction from JSON +# --------------------------------------------------------------------------- + + +class TestJSONConstruction: + def test_from_json_string(self) -> None: + data = { + "Motor1.Speed": {"db": 1, "offset": 0, "type": "REAL"}, + "Motor1.Running": {"db": 1, "offset": 4, "bit": 0, "type": "BOOL"}, + } + table = SymbolTable.from_json(json.dumps(data)) + assert len(table) == 2 + assert table.resolve("Motor1.Speed").type == "REAL" + + def test_from_json_file(self, tmp_path: Path) -> None: + data = {"Temp": {"db": 3, "offset": 0, "type": "REAL"}} + json_file = tmp_path / "tags.json" + json_file.write_text(json.dumps(data)) + table = SymbolTable.from_json(json_file) + assert len(table) == 1 + + +# --------------------------------------------------------------------------- +# Read / Write with mocked client +# --------------------------------------------------------------------------- + + +def _make_client() -> MagicMock: + """Create a MagicMock that behaves enough like snap7.Client.""" + return MagicMock(spec=["db_read", "db_write"]) + + +class TestRead: + def test_read_real(self) -> None: + client = _make_client() + data = bytearray(4) + struct.pack_into(">f", data, 0, 123.5) + client.db_read.return_value = data + + table = SymbolTable({"Speed": {"db": 1, "offset": 0, "type": "REAL"}}) + value = table.read(client, "Speed") + + client.db_read.assert_called_once_with(1, 0, 4) + assert isinstance(value, float) + assert abs(value - 123.5) < 0.01 + + def test_read_int(self) -> None: + client = _make_client() + data = bytearray(2) + struct.pack_into(">h", data, 0, -42) + client.db_read.return_value = data + + table = SymbolTable({"Level": {"db": 2, "offset": 10, "type": "INT"}}) + value = table.read(client, "Level") + + client.db_read.assert_called_once_with(2, 10, 2) + assert value == -42 + + def test_read_bool_true(self) -> None: + client = _make_client() + client.db_read.return_value = bytearray([0b00001000]) + + table = SymbolTable({"Flag": {"db": 1, "offset": 4, "bit": 3, "type": "BOOL"}}) + value = table.read(client, "Flag") + + assert value is True + + def test_read_bool_false(self) -> None: + client = _make_client() + client.db_read.return_value = bytearray([0b00000000]) + + table = SymbolTable({"Flag": {"db": 1, "offset": 4, "bit": 3, "type": "BOOL"}}) + value = table.read(client, "Flag") + + assert value is False + + def test_read_dint(self) -> None: + client = _make_client() + data = bytearray(4) + struct.pack_into(">i", data, 0, -100000) + client.db_read.return_value = data + + table = SymbolTable({"Counter": {"db": 1, "offset": 0, "type": "DINT"}}) + assert table.read(client, "Counter") == -100000 + + def test_read_lreal(self) -> None: + client = _make_client() + data = bytearray(8) + struct.pack_into(">d", data, 0, 3.14159265358979) + client.db_read.return_value = data + + table = SymbolTable({"Pi": {"db": 1, "offset": 0, "type": "LREAL"}}) + pi_val = table.read(client, "Pi") + assert isinstance(pi_val, float) + assert abs(pi_val - 3.14159265358979) < 1e-10 + + def test_read_byte(self) -> None: + client = _make_client() + client.db_read.return_value = bytearray([0xAB]) + + table = SymbolTable({"Status": {"db": 1, "offset": 0, "type": "BYTE"}}) + assert table.read(client, "Status") == 0xAB + + def test_read_string(self) -> None: + client = _make_client() + text = "hello" + # STRING format: max_size(1 byte), current_len(1 byte), chars... + data = bytearray(22) + data[0] = 20 # max size + data[1] = len(text) # current length + for i, c in enumerate(text): + data[2 + i] = ord(c) + client.db_read.return_value = data + + table = SymbolTable({"Name": {"db": 1, "offset": 0, "type": "STRING[20]"}}) + assert table.read(client, "Name") == "hello" + + def test_read_char(self) -> None: + client = _make_client() + client.db_read.return_value = bytearray([ord("A")]) + + table = SymbolTable({"Letter": {"db": 1, "offset": 0, "type": "CHAR"}}) + assert table.read(client, "Letter") == "A" + + def test_read_unknown_tag_raises(self) -> None: + client = _make_client() + table = SymbolTable({}) + with pytest.raises(KeyError): + table.read(client, "NonExistent") + + +class TestWrite: + def test_write_real(self) -> None: + client = _make_client() + table = SymbolTable({"Speed": {"db": 1, "offset": 0, "type": "REAL"}}) + table.write(client, "Speed", 123.5) + + client.db_write.assert_called_once() + args = client.db_write.call_args + assert args[0][0] == 1 # db + assert args[0][1] == 0 # offset + written = args[0][2] + value = struct.unpack(">f", written)[0] + assert abs(value - 123.5) < 0.01 + + def test_write_int(self) -> None: + client = _make_client() + table = SymbolTable({"Level": {"db": 2, "offset": 10, "type": "INT"}}) + table.write(client, "Level", -42) + + args = client.db_write.call_args + assert args[0][0] == 2 + assert args[0][1] == 10 + value = struct.unpack(">h", args[0][2])[0] + assert value == -42 + + def test_write_bool_set_true(self) -> None: + client = _make_client() + client.db_read.return_value = bytearray([0b00000000]) + + table = SymbolTable({"Flag": {"db": 1, "offset": 4, "bit": 3, "type": "BOOL"}}) + table.write(client, "Flag", True) + + # Should read first then write + client.db_read.assert_called_once_with(1, 4, 1) + args = client.db_write.call_args + assert args[0][0] == 1 + assert args[0][1] == 4 + assert args[0][2][0] & 0b00001000 # bit 3 should be set + + def test_write_bool_set_false(self) -> None: + client = _make_client() + client.db_read.return_value = bytearray([0b00001000]) + + table = SymbolTable({"Flag": {"db": 1, "offset": 4, "bit": 3, "type": "BOOL"}}) + table.write(client, "Flag", False) + + args = client.db_write.call_args + assert not (args[0][2][0] & 0b00001000) # bit 3 should be cleared + + def test_write_dint(self) -> None: + client = _make_client() + table = SymbolTable({"Counter": {"db": 1, "offset": 0, "type": "DINT"}}) + table.write(client, "Counter", -100000) + + args = client.db_write.call_args + value = struct.unpack(">i", args[0][2])[0] + assert value == -100000 + + def test_write_unknown_tag_raises(self) -> None: + client = _make_client() + table = SymbolTable({}) + with pytest.raises(KeyError): + table.write(client, "NonExistent", 0) + + +# --------------------------------------------------------------------------- +# read_many +# --------------------------------------------------------------------------- + + +class TestReadMany: + def test_read_many(self) -> None: + client = _make_client() + real_data = bytearray(4) + struct.pack_into(">f", real_data, 0, 50.0) + int_data = bytearray(2) + struct.pack_into(">h", int_data, 0, 100) + + client.db_read.side_effect = [real_data, int_data] + + table = SymbolTable( + { + "Speed": {"db": 1, "offset": 0, "type": "REAL"}, + "Level": {"db": 1, "offset": 4, "type": "INT"}, + } + ) + values = table.read_many(client, ["Speed", "Level"]) + speed = values["Speed"] + assert isinstance(speed, float) + assert abs(speed - 50.0) < 0.01 + assert values["Level"] == 100 + + +# --------------------------------------------------------------------------- +# Merge +# --------------------------------------------------------------------------- + + +class TestMerge: + def test_merge_two_tables(self) -> None: + t1 = SymbolTable({"A": {"db": 1, "offset": 0, "type": "INT"}}) + t2 = SymbolTable({"B": {"db": 2, "offset": 0, "type": "REAL"}}) + merged = t1.merge(t2) + assert len(merged) == 2 + assert "A" in merged + assert "B" in merged + + def test_merge_override(self) -> None: + t1 = SymbolTable({"A": {"db": 1, "offset": 0, "type": "INT"}}) + t2 = SymbolTable({"A": {"db": 2, "offset": 10, "type": "REAL"}}) + merged = t1.merge(t2) + assert merged.resolve("A").db == 2 + assert merged.resolve("A").type == "REAL" + + +# --------------------------------------------------------------------------- +# Unsupported type errors +# --------------------------------------------------------------------------- + + +class TestUnsupportedType: + def test_read_unsupported_type(self) -> None: + client = _make_client() + client.db_read.return_value = bytearray(4) + + table = SymbolTable({"X": {"db": 1, "offset": 0, "type": "STRUCT"}}) + # STRUCT is not in the type size map, so read_size will raise + with pytest.raises(ValueError, match="Unknown S7 type"): + table.read(client, "X") + + def test_write_unsupported_type(self) -> None: + client = _make_client() + + table = SymbolTable({"X": {"db": 1, "offset": 0, "type": "STRUCT"}}) + with pytest.raises(ValueError, match="Unknown S7 type"): + table.write(client, "X", 0)