diff --git a/CLAUDE.md b/CLAUDE.md index 5353da6e..4998f4a8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -8,6 +8,7 @@ Python-snap7 is a pure Python S7 communication library for interfacing with Siem ## Key Architecture +### snap7/ — Legacy S7 protocol (S7-300/400, PUT/GET on S7-1200/1500) - **snap7/client.py**: Main Client class for connecting to S7 PLCs - **snap7/server.py**: Server implementation for PLC simulation - **snap7/logo.py**: Logo PLC communication @@ -19,6 +20,20 @@ Python-snap7 is a pure Python S7 communication library for interfacing with Siem - **snap7/type.py**: Type definitions and enums (Area, Block, WordLen, etc.) - **snap7/error.py**: Error handling and exceptions +### s7/ — Unified client with S7CommPlus + legacy fallback +- **s7/client.py**: Unified Client — tries S7CommPlus, falls back to snap7.Client +- **s7/async_client.py**: Unified AsyncClient — same pattern, async +- **s7/server.py**: Unified Server wrapping both legacy and S7CommPlus +- **s7/_protocol.py**: Protocol enum (AUTO/LEGACY/S7COMMPLUS) +- **s7/_s7commplus_client.py**: Pure S7CommPlus sync client (internal) +- **s7/_s7commplus_async_client.py**: Pure S7CommPlus async client (internal) +- **s7/_s7commplus_server.py**: S7CommPlus server emulator (internal) +- **s7/connection.py**: S7CommPlus low-level connection +- **s7/protocol.py**: S7CommPlus protocol constants/enums +- **s7/codec.py**: S7CommPlus encoding/decoding +- **s7/vlq.py**: Variable-Length Quantity encoding +- **s7/legitimation.py**: Authentication helpers + ## Implementation Details ### Protocol Stack @@ -41,24 +56,31 @@ The library implements the complete S7 protocol stack: - Block operations (list, info, upload, download) - Date/time operations -### Usage +### Usage (unified s7 package — recommended for S7-1200/1500) + +```python +from s7 import Client + +client = Client() +client.connect("192.168.1.10", 0, 1) # auto-detects S7CommPlus vs legacy +data = client.db_read(1, 0, 4) +client.disconnect() +``` + +### Usage (legacy snap7 package — S7-300/400) ```python import snap7 -# Create and connect client client = snap7.Client() client.connect("192.168.1.10", 0, 1) -# Read/write operations data = client.db_read(1, 0, 4) client.db_write(1, 0, bytearray([1, 2, 3, 4])) -# Memory area access marker_data = client.mb_read(0, 4) client.mb_write(0, 4, bytearray([1, 2, 3, 4])) -# Disconnect client.disconnect() ``` @@ -98,15 +120,15 @@ pytest tests/test_client.py ### Code Quality ```bash # Type checking -mypy snap7 tests example +mypy snap7 s7 tests example # Linting and formatting check -ruff check snap7 tests example -ruff format --diff snap7 tests example +ruff check snap7 s7 tests example +ruff format --diff snap7 s7 tests example # Auto-format code -ruff format snap7 tests example -ruff check --fix snap7 tests example +ruff format snap7 s7 tests example +ruff check --fix snap7 s7 tests example ``` ### Development with tox diff --git a/Makefile b/Makefile index ab74d2ee..fcaf4b07 100644 --- a/Makefile +++ b/Makefile @@ -29,8 +29,8 @@ doc: .venv/bin/sphinx-build .PHONY: check check: .venv/bin/pytest - uv run ruff check snap7 tests example - uv run ruff format --diff snap7 tests example + uv run ruff check snap7 s7 tests example + uv run ruff format --diff snap7 s7 tests example .PHONY: ruff ruff: .venv/bin/tox diff --git a/README.rst b/README.rst index db97e768..8a097efb 100644 --- a/README.rst +++ b/README.rst @@ -13,55 +13,94 @@ .. image:: https://readthedocs.org/projects/python-snap7/badge/ :target: https://python-snap7.readthedocs.io/en/latest/ -About -===== -Python-snap7 is a pure Python S7 communication library for interfacing with Siemens S7 PLCs. +python-snap7 +============ -The name "python-snap7" is historical — the library originally started as a Python wrapper -around the `Snap7 `_ C library. As of version 3.0, the C -library is no longer used, but the name is kept for backwards compatibility. +Python-snap7 is a pure Python S7 communication library for interfacing with +Siemens S7 PLCs. It supports Python 3.10+ and runs on Windows, Linux, and macOS +without any native dependencies. -Python-snap7 is tested with Python 3.10+, on Windows, Linux and OS X. +The name "python-snap7" is historical — the library originally started as a +Python wrapper around the `Snap7 `_ C library. +As of version 3.0, the C library is no longer used, but the name is kept for +backwards compatibility. The full documentation is available on `Read The Docs `_. -Version 3.0 - Pure Python Rewrite -================================== +Installation +============ + +Install using pip:: -Version 3.0 is a ground-up rewrite of python-snap7. The library no longer wraps the -C snap7 shared library — instead, the entire S7 protocol stack (TPKT, COTP, and S7) -is now implemented in pure Python. This is a **breaking change** from all previous -versions. + $ pip install python-snap7 + +No native libraries or platform-specific dependencies are required — python-snap7 +is a pure Python package that works on all platforms. + + +Version 3.0 — Pure Python Rewrite +================================== -**Why this matters:** +Version 3.0 was a ground-up rewrite of python-snap7. The library no longer wraps +the C snap7 shared library — instead, the entire S7 protocol stack (TPKT, COTP, +and S7) is implemented in pure Python. -* **Portability**: No more platform-specific shared libraries (`.dll`, `.so`, `.dylib`). - python-snap7 now works on any platform that runs Python — including ARM, Alpine Linux, - and other environments where the C library was difficult or impossible to install. +* **Portability**: No more platform-specific shared libraries (``.dll``, ``.so``, ``.dylib``). + Works on any platform that runs Python — including ARM, Alpine Linux, and other + environments where the C library was difficult or impossible to install. * **Easier installation**: Just ``pip install python-snap7``. No native dependencies, no compiler toolchains, no manual library setup. * **Easier to extend**: New features and protocol support can be added directly in Python. **If you experience issues with 3.0:** -1. Please report them on the `issue tracker `_ - with a clear description of the problem and the version you are using - (``python -c "import snap7; print(snap7.__version__)"``). +1. Please report them on the `issue tracker `_. 2. As a workaround, you can pin to the last pre-3.0 release:: $ pip install "python-snap7<3" - The latest stable pre-3.0 release is version 2.1.0. Documentation for pre-3.0 - versions is available at `Read The Docs `_. + Documentation for pre-3.0 versions is available at + `Read The Docs `_. -Installation -============ +Version 3.1 — S7CommPlus Protocol Support (unreleased) +======================================================= -Install using pip:: +Version 3.1 adds support for the S7CommPlus protocol (up to V3), which is required +for communicating with newer Siemens S7-1200 and S7-1500 PLCs that have PUT/GET +disabled. This is fully backwards compatible with 3.0. - $ pip install python-snap7 +The biggest change is the new ``s7`` module, which is now the recommended entry point +for connecting to any supported S7 PLC:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) # auto-detects S7CommPlus vs legacy S7 + data = client.db_read(1, 0, 4) + client.disconnect() + +The ``s7.Client`` automatically tries S7CommPlus first, and falls back to legacy S7 +if the PLC does not support it. The existing ``snap7.Client`` continues to work +unchanged for legacy S7 connections. + +**Help us test!** Version 3.1 needs more real-world testing before release. If you +have access to any of the following PLCs, we would greatly appreciate testing and +feedback: + +* S7-1200 (any firmware version) +* S7-1500 (any firmware version) +* S7-1500 with TLS enabled +* S7-300 +* S7-400 +* S7-1200/1500 with PUT/GET disabled (S7CommPlus-only) +* LOGO! 0BA8 and newer + +Please report your results — whether it works or not — on the +`issue tracker `_. + +To install the development version:: -No native libraries or platform-specific dependencies are required — python-snap7 is a pure Python package that works on all platforms. + $ pip install git+https://github.com/gijzelaerr/python-snap7.git@master diff --git a/doc/API/s7commplus.rst b/doc/API/s7commplus.rst index bd5ceecb..48066e91 100644 --- a/doc/API/s7commplus.rst +++ b/doc/API/s7commplus.rst @@ -7,24 +7,21 @@ S7CommPlus (S7-1200/1500) releases. If you encounter problems, please `open an issue `_. -The :mod:`snap7.s7commplus` package provides support for Siemens S7-1200 and -S7-1500 PLCs, which use the S7CommPlus protocol instead of the classic S7 -protocol used by S7-300/400. - -Both synchronous and asynchronous clients are available. When a PLC does not -support S7CommPlus data operations, the clients automatically fall back to the -legacy S7 protocol transparently. +The ``s7`` package provides a unified client for Siemens S7-1200 and S7-1500 +PLCs. It automatically tries the S7CommPlus protocol first and falls back to +the legacy S7 protocol when needed. Synchronous client ------------------ .. code-block:: python - from snap7.s7commplus.client import S7CommPlusClient + from s7 import Client - client = S7CommPlusClient() - client.connect("192.168.1.10") + client = Client() + client.connect("192.168.1.10", 0, 1) data = client.db_read(1, 0, 4) + print(client.protocol) # Protocol.S7COMMPLUS or Protocol.LEGACY client.disconnect() Asynchronous client @@ -33,11 +30,11 @@ Asynchronous client .. code-block:: python import asyncio - from snap7.s7commplus.async_client import S7CommPlusAsyncClient + from s7 import AsyncClient async def main(): - client = S7CommPlusAsyncClient() - await client.connect("192.168.1.10") + client = AsyncClient() + await client.connect("192.168.1.10", 0, 1) data = await client.db_read(1, 0, 4) await client.disconnect() @@ -51,10 +48,10 @@ S7-1500 PLCs with firmware 2.x use S7CommPlus V2, which requires TLS. Pass .. code-block:: python - from snap7.s7commplus.client import S7CommPlusClient + from s7 import Client - client = S7CommPlusClient() - client.connect("192.168.1.10", use_tls=True) + client = Client() + client.connect("192.168.1.10", 0, 1, use_tls=True) data = client.db_read(1, 0, 4) client.disconnect() @@ -63,7 +60,7 @@ For PLCs with custom certificates, provide the certificate paths: .. code-block:: python client.connect( - "192.168.1.10", + "192.168.1.10", 0, 1, use_tls=True, tls_cert="/path/to/client.pem", tls_key="/path/to/client.key", @@ -73,56 +70,39 @@ For PLCs with custom certificates, provide the certificate paths: Password authentication ----------------------- -Password-protected PLCs require authentication after connecting. Call -``authenticate()`` before performing data operations: +Password-protected PLCs require the ``password`` keyword argument: .. code-block:: python - from snap7.s7commplus.client import S7CommPlusClient - - client = S7CommPlusClient() - client.connect("192.168.1.10", use_tls=True) - client.authenticate(password="my_plc_password") + from s7 import Client + client = Client() + client.connect("192.168.1.10", 0, 1, use_tls=True, password="my_plc_password") data = client.db_read(1, 0, 4) client.disconnect() -The method auto-detects whether to use legacy (SHA-1 XOR) or new-style -(AES-256-CBC) authentication based on the PLC firmware version. For new-style -authentication, you can also provide a username: - -.. code-block:: python - - client.authenticate(password="my_password", username="admin") - -.. note:: - - Authentication requires TLS to be active. Calling ``authenticate()`` - without ``use_tls=True`` will raise :class:`~snap7.error.S7ConnectionError`. +Protocol selection +------------------ +By default the client uses ``Protocol.AUTO`` which tries S7CommPlus first. +You can force a specific protocol: -Legacy fallback ---------------- +.. code-block:: python -If the PLC returns an error for S7CommPlus data operations (common with some -firmware versions), the client automatically falls back to the classic S7 -protocol. You can check whether fallback is active: + from s7 import Client, Protocol -.. code-block:: python + # Force legacy S7 only + client = Client() + client.connect("192.168.1.10", 0, 1, protocol=Protocol.LEGACY) - client.connect("192.168.1.10") - if client.using_legacy_fallback: - print("Using legacy S7 protocol") + # Force S7CommPlus (raises on failure) + client.connect("192.168.1.10", 0, 1, protocol=Protocol.S7COMMPLUS) API reference ------------- -.. automodule:: snap7.s7commplus.client - :members: - -.. automodule:: snap7.s7commplus.async_client +.. automodule:: s7.client :members: -.. automodule:: snap7.s7commplus.connection +.. automodule:: s7.async_client :members: - :exclude-members: S7CommPlusConnection diff --git a/doc/connecting.rst b/doc/connecting.rst index 8eefe4a6..34f2310e 100644 --- a/doc/connecting.rst +++ b/doc/connecting.rst @@ -78,6 +78,20 @@ S7-1200 / S7-1500 client = snap7.Client() client.connect("192.168.1.10", 0, 1) +.. tip:: + + For S7-1200/1500 PLCs you can also use the **experimental** ``s7`` package, + which automatically tries the newer S7CommPlus protocol and falls back to + legacy S7 when needed:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + print(client.protocol) # Protocol.S7COMMPLUS or Protocol.LEGACY + + See :doc:`API/s7commplus` for full details. + S7-200 / Logo (TSAP Connection) -------------------------------- diff --git a/doc/introduction.rst b/doc/introduction.rst index cf1d864b..e2583d46 100644 --- a/doc/introduction.rst +++ b/doc/introduction.rst @@ -14,6 +14,20 @@ backwards compatibility. python-snap7 requires Python 3.10+ and runs on Windows, macOS and Linux without any native dependencies. +The library provides two packages: + +- **snap7** -- the original S7 protocol implementation, supporting S7-300, + S7-400, S7-1200 and S7-1500 PLCs via the classic PUT/GET interface. +- **s7** -- a newer unified client that automatically tries the S7CommPlus + protocol (used natively by S7-1200/1500) and falls back to legacy S7 when + needed. ``s7.Client`` is a drop-in replacement for ``snap7.Client``. + +.. note:: + + The ``s7`` package and its S7CommPlus support are **experimental**. + The legacy ``snap7`` package remains fully supported and is the safe choice + for production use. See :doc:`API/s7commplus` for details. + .. note:: **Version 3.0 is a complete rewrite.** Previous versions of python-snap7 diff --git a/doc/limitations.rst b/doc/limitations.rst index c270ab06..03a220a8 100644 --- a/doc/limitations.rst +++ b/doc/limitations.rst @@ -26,5 +26,5 @@ are **not possible** with this protocol: individual blocks, but this is not a complete backup. * - Access S7-1200/1500 PLCs with S7CommPlus security - python-snap7 supports S7CommPlus V1 and V2 (with TLS) via - :mod:`snap7.s7commplus`. V3 is not yet supported. For PLCs that only + the ``s7`` package. V3 is not yet supported. For PLCs that only support V3, enable PUT/GET as a fallback or use OPC UA. diff --git a/doc/plc-support.rst b/doc/plc-support.rst index 1d2b0e59..3df53e30 100644 --- a/doc/plc-support.rst +++ b/doc/plc-support.rst @@ -59,7 +59,7 @@ Supported PLCs - No - V2 - **Full** (S7CommPlus V2) - - S7CommPlus V2 with TLS is supported via :mod:`snap7.s7commplus`. + - S7CommPlus V2 with TLS is supported via the ``s7`` package. * - S7-1500 (FW 3.x+) - ~2022 - PUT/GET only @@ -143,7 +143,7 @@ Siemens has evolved their PLC communication protocols over time: python-snap7 implements the **classic S7 protocol** and **S7CommPlus V1/V2**. The classic protocol remains available on most PLC families via the PUT/GET mechanism. S7CommPlus V1 and V2 (with TLS) are supported via the -:mod:`snap7.s7commplus` package. For PLCs that require S7CommPlus V3 (such +``s7`` package. For PLCs that require S7CommPlus V3 (such as the S7-1500R/H), consider using OPC UA as an alternative. diff --git a/pyproject.toml b/pyproject.toml index b865de58..ca23f81a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,9 +40,10 @@ discovery = ["pnio-dcp"] [tool.setuptools.package-data] snap7 = ["py.typed"] +s7 = ["py.typed"] [tool.setuptools.packages.find] -include = ["snap7*"] +include = ["snap7*", "s7*"] [project.scripts] snap7-server = "snap7.server:mainloop" diff --git a/s7/__init__.py b/s7/__init__.py new file mode 100644 index 00000000..1cfaa189 --- /dev/null +++ b/s7/__init__.py @@ -0,0 +1,35 @@ +"""Unified S7 communication library. + +Provides protocol-agnostic access to Siemens S7 PLCs with automatic +protocol discovery (S7CommPlus vs legacy S7). + +Usage:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + data = client.db_read(1, 0, 4) +""" + +from .client import Client +from .async_client import AsyncClient +from .server import Server +from ._protocol import Protocol + +from snap7.type import Area, Block, WordLen, SrvEvent, SrvArea +from snap7.util.db import Row, DB + +__all__ = [ + "Client", + "AsyncClient", + "Server", + "Protocol", + "Area", + "Block", + "WordLen", + "SrvEvent", + "SrvArea", + "Row", + "DB", +] diff --git a/s7/_protocol.py b/s7/_protocol.py new file mode 100644 index 00000000..bad2be02 --- /dev/null +++ b/s7/_protocol.py @@ -0,0 +1,17 @@ +"""Protocol enum for unified S7 client.""" + +from enum import Enum + + +class Protocol(Enum): + """S7 communication protocol selection. + + Attributes: + AUTO: Try S7CommPlus first, fall back to legacy S7 if unsupported. + LEGACY: Use legacy S7 protocol only (S7-300/400, basic S7-1200/1500). + S7COMMPLUS: Use S7CommPlus protocol only (S7-1200/1500 with full access). + """ + + AUTO = "auto" + LEGACY = "legacy" + S7COMMPLUS = "s7commplus" diff --git a/snap7/s7commplus/async_client.py b/s7/_s7commplus_async_client.py similarity index 73% rename from snap7/s7commplus/async_client.py rename to s7/_s7commplus_async_client.py index 70f831e1..25559c44 100644 --- a/snap7/s7commplus/async_client.py +++ b/s7/_s7commplus_async_client.py @@ -1,19 +1,10 @@ -""" -Async S7CommPlus client for S7-1200/1500 PLCs. - -Provides the same API as S7CommPlusClient but using asyncio for -non-blocking I/O. Uses asyncio.Lock for concurrent safety. +"""Pure async S7CommPlus client for S7-1200/1500 PLCs (no legacy fallback). -When a PLC does not support S7CommPlus data operations, the client -transparently falls back to the legacy S7 protocol for data block -read/write operations (using synchronous calls in an executor). +This is an internal module used by the unified ``s7.AsyncClient``. It provides +raw S7CommPlus data operations without any fallback logic -- the unified +client is responsible for deciding when to fall back to legacy S7. -Example:: - - async with S7CommPlusAsyncClient() as client: - await client.connect("192.168.1.10") - data = await client.db_read(1, 0, 4) - await client.db_write(1, 0, struct.pack(">f", 23.5)) +Reference: thomas-v2/S7CommPlusDriver (C#, LGPL-3.0) """ import asyncio @@ -35,7 +26,7 @@ ) from .codec import encode_header, decode_header, encode_typed_value, encode_object_qualifier from .vlq import encode_uint32_vlq, decode_uint32_vlq, decode_uint64_vlq -from .client import _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response +from ._s7commplus_client import _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response logger = logging.getLogger(__name__) @@ -46,17 +37,9 @@ class S7CommPlusAsyncClient: - """Async S7CommPlus client for S7-1200/1500 PLCs. - - Supports all S7CommPlus protocol versions (V1/V2/V3/TLS). The protocol - version is auto-detected from the PLC's CreateObject response during - connection setup. + """Pure async S7CommPlus client without legacy fallback. - Uses asyncio for all I/O operations and asyncio.Lock for - concurrent safety when shared between multiple coroutines. - - When the PLC does not support S7CommPlus data operations, the client - automatically falls back to legacy S7 protocol for db_read/db_write. + Use ``s7.AsyncClient`` for automatic protocol selection. """ def __init__(self) -> None: @@ -67,12 +50,6 @@ def __init__(self) -> None: self._protocol_version: int = 0 self._connected = False self._lock = asyncio.Lock() - self._legacy_client: Optional[Any] = None - self._use_legacy_data: bool = False - self._host: str = "" - self._port: int = 102 - self._rack: int = 0 - self._slot: int = 1 # V2+ IntegrityId tracking self._integrity_id_read: int = 0 @@ -83,11 +60,10 @@ def __init__(self) -> None: self._tls_active: bool = False self._oms_secret: Optional[bytes] = None self._server_session_version: Optional[int] = None + self._session_setup_ok: bool = False @property def connected(self) -> bool: - if self._use_legacy_data and self._legacy_client is not None: - return bool(self._legacy_client.connected) return self._connected @property @@ -99,9 +75,9 @@ def session_id(self) -> int: return self._session_id @property - def using_legacy_fallback(self) -> bool: - """Whether the client is using legacy S7 protocol for data operations.""" - return self._use_legacy_data + def session_setup_ok(self) -> bool: + """Whether the S7CommPlus session setup succeeded for data operations.""" + return self._session_setup_ok @property def tls_active(self) -> bool: @@ -125,32 +101,19 @@ async def connect( tls_key: Optional[str] = None, tls_ca: Optional[str] = None, ) -> None: - """Connect to an S7-1200/1500 PLC. - - The connection sequence: - 1. COTP connection (same as legacy S7comm) - 2. InitSSL handshake - 3. TLS activation (if use_tls=True, required for V2) - 4. CreateObject to establish S7CommPlus session - 5. Enable IntegrityId tracking (V2+) - - If the PLC does not support S7CommPlus data operations, a secondary - legacy S7 connection is established transparently for data access. + """Connect to an S7-1200/1500 PLC using S7CommPlus. Args: host: PLC IP address or hostname port: TCP port (default 102) - rack: PLC rack number - slot: PLC slot number + rack: PLC rack number (unused, kept for API symmetry) + slot: PLC slot number (unused, kept for API symmetry) use_tls: Whether to activate TLS after InitSSL. tls_cert: Path to client TLS certificate (PEM) tls_key: Path to client private key (PEM) tls_ca: Path to CA certificate for PLC verification (PEM) """ self._host = host - self._port = port - self._rack = rack - self._slot = slot # TCP connect self._reader, self._writer = await asyncio.open_connection(host, port) @@ -169,7 +132,7 @@ async def connect( # Step 4: S7CommPlus session setup (CreateObject) await self._create_session() - # Step 5: Version-specific validation (before session setup handshake) + # Step 5: Version-specific validation if self._protocol_version >= ProtocolVersion.V3: if not use_tls: logger.warning( @@ -177,10 +140,9 @@ async def connect( ) elif self._protocol_version == ProtocolVersion.V2: if not self._tls_active: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("PLC reports V2 protocol but TLS is not active. V2 requires TLS. Use use_tls=True.") - # Enable IntegrityId tracking for V2+ self._with_integrity_id = True self._integrity_id_read = 0 self._integrity_id_write = 0 @@ -190,21 +152,16 @@ async def connect( # Step 6: Session setup - echo ServerSessionVersion back to PLC if self._server_session_version is not None: - session_setup_ok = await self._setup_session() + self._session_setup_ok = await self._setup_session() else: logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete") - session_setup_ok = False + self._session_setup_ok = False logger.info( f"Async S7CommPlus connected to {host}:{port}, " f"version=V{self._protocol_version}, session={self._session_id}, " f"tls={self._tls_active}" ) - # Check if S7CommPlus session setup succeeded - if not session_setup_ok: - logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") - await self._setup_legacy_fallback() - except Exception: await self.disconnect() raise @@ -212,12 +169,6 @@ async def connect( async def authenticate(self, password: str, username: str = "") -> None: """Perform PLC password authentication (legitimation). - Must be called after connect() and before data operations on - password-protected PLCs. Requires TLS to be active (V2+). - - The method auto-detects legacy vs new legitimation based on - the PLC's firmware version. - Args: password: PLC password username: Username for new-style auth (optional) @@ -226,20 +177,18 @@ async def authenticate(self, password: str, username: str = "") -> None: S7ConnectionError: If not connected, TLS not active, or auth fails """ if not self._connected: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Not connected") if not self._tls_active or self._oms_secret is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Legitimation requires TLS. Connect with use_tls=True.") - # Step 1: Get challenge from PLC challenge = await self._get_legitimation_challenge() logger.info(f"Received legitimation challenge ({len(challenge)} bytes)") - # Step 2: Build response (auto-detect legacy vs new) from .legitimation import build_legacy_response, build_new_response if username: @@ -261,28 +210,17 @@ async def _activate_tls( tls_key: Optional[str] = None, tls_ca: Optional[str] = None, ) -> None: - """Activate TLS 1.3 over the COTP connection. - - Called after InitSSL and before CreateObject. Uses asyncio's - start_tls() to upgrade the existing connection to TLS. - - Args: - tls_cert: Path to client TLS certificate (PEM) - tls_key: Path to client private key (PEM) - tls_ca: Path to CA certificate for PLC verification (PEM) - """ + """Activate TLS 1.3 over the COTP connection.""" if self._writer is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Cannot activate TLS: not connected") ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.minimum_version = ssl.TLSVersion.TLSv1_3 - # TLS 1.3 ciphersuites are configured differently from TLS 1.2 if hasattr(ctx, "set_ciphersuites"): ctx.set_ciphersuites("TLS_AES_256_GCM_SHA384:TLS_AES_128_GCM_SHA256") - # If set_ciphersuites not available, TLS 1.3 uses its mandatory defaults if tls_cert and tls_key: ctx.load_cert_chain(tls_cert, tls_key) @@ -293,7 +231,6 @@ async def _activate_tls( ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - # Upgrade existing transport to TLS using asyncio start_tls transport = self._writer.transport loop = asyncio.get_event_loop() new_transport = await loop.start_tls( @@ -303,13 +240,11 @@ async def _activate_tls( server_hostname=self._host, ) - # Update reader/writer to use the TLS transport self._writer._transport = new_transport self._tls_active = True - # Extract OMS exporter secret for legitimation key derivation if new_transport is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("TLS handshake failed: no transport returned") @@ -325,13 +260,7 @@ async def _activate_tls( logger.info("TLS 1.3 activated on async COTP connection") async def _get_legitimation_challenge(self) -> bytes: - """Request legitimation challenge from PLC. - - Sends GetVarSubStreamed with address ServerSessionRequest (303). - - Returns: - Challenge bytes from PLC - """ + """Request legitimation challenge from PLC.""" from .protocol import LegitimationId, DataType as DT payload = bytearray() @@ -348,12 +277,12 @@ async def _get_legitimation_challenge(self) -> bytes: offset += consumed if return_value != 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"GetVarSubStreamed for challenge failed: return_value={return_value}") if offset + 2 > len(resp_payload): - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Challenge response too short") @@ -388,7 +317,7 @@ async def _send_legitimation_new(self, encrypted_response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legitimation rejected by PLC: return_value={return_value}") logger.debug(f"New legitimation return_value={return_value}") @@ -411,32 +340,13 @@ async def _send_legitimation_legacy(self, response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}") logger.debug(f"Legacy legitimation return_value={return_value}") - async def _setup_legacy_fallback(self) -> None: - """Establish a secondary legacy S7 connection for data operations.""" - from ..client import Client - - loop = asyncio.get_event_loop() - client = Client() - await loop.run_in_executor(None, lambda: client.connect(self._host, self._rack, self._slot, self._port)) - self._legacy_client = client - self._use_legacy_data = True - logger.info(f"Legacy S7 fallback connected to {self._host}:{self._port}") - async def disconnect(self) -> None: """Disconnect from PLC.""" - if self._legacy_client is not None: - try: - self._legacy_client.disconnect() - except Exception: - pass - self._legacy_client = None - self._use_legacy_data = False - if self._connected and self._session_id: try: await self._delete_session() @@ -453,6 +363,7 @@ async def disconnect(self) -> None: self._tls_active = False self._oms_secret = None self._server_session_version = None + self._session_setup_ok = False if self._writer: try: @@ -464,22 +375,7 @@ async def disconnect(self) -> None: self._reader = None async def db_read(self, db_number: int, start: int, size: int) -> bytes: - """Read raw bytes from a data block. - - Args: - db_number: Data block number - start: Start byte offset - size: Number of bytes to read - - Returns: - Raw bytes read from the data block - """ - if self._use_legacy_data and self._legacy_client is not None: - client = self._legacy_client - loop = asyncio.get_event_loop() - data = await loop.run_in_executor(None, lambda: client.db_read(db_number, start, size)) - return bytes(data) - + """Read raw bytes from a data block.""" payload = _build_read_payload([(db_number, start, size)]) response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload) @@ -491,67 +387,26 @@ async def db_read(self, db_number: int, start: int, size: int) -> bytes: return results[0] async def db_write(self, db_number: int, start: int, data: bytes) -> None: - """Write raw bytes to a data block. - - Args: - db_number: Data block number - start: Start byte offset - data: Bytes to write - """ - if self._use_legacy_data and self._legacy_client is not None: - client = self._legacy_client - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, lambda: client.db_write(db_number, start, bytearray(data))) - return - + """Write raw bytes to a data block.""" payload = _build_write_payload([(db_number, start, data)]) response = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, payload) _parse_write_response(response) async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytes]: - """Read multiple data block regions in a single request. - - Args: - items: List of (db_number, start_offset, size) tuples - - Returns: - List of raw bytes for each item - """ - if self._use_legacy_data and self._legacy_client is not None: - client = self._legacy_client - loop = asyncio.get_event_loop() - multi_results: list[bytes] = [] - for db_number, start, size in items: - - def _read(db: int = db_number, s: int = start, sz: int = size) -> bytearray: - return bytearray(client.db_read(db, s, sz)) - - data = await loop.run_in_executor(None, _read) - multi_results.append(bytes(data)) - return multi_results - + """Read multiple data block regions in a single request.""" payload = _build_read_payload(items) response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - parsed = _parse_read_response(response) return [r if r is not None else b"" for r in parsed] async def explore(self) -> bytes: - """Browse the PLC object tree. - - Returns: - Raw response payload - """ + """Browse the PLC object tree.""" return await self._send_request(FunctionCode.EXPLORE, b"") # -- Internal methods -- async def _send_request(self, function_code: int, payload: bytes) -> bytes: - """Send an S7CommPlus request and receive the response. - - For V2+ with IntegrityId tracking, inserts IntegrityId after the - 14-byte request header and strips it from the response. - """ + """Send an S7CommPlus request and receive the response.""" async with self._lock: if not self._connected or self._writer is None or self._reader is None: raise RuntimeError("Not connected") @@ -569,7 +424,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: 0x36, ) - # For V2+ with IntegrityId, insert after header integrity_id_bytes = b"" if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2: is_read = function_code in READ_FUNCTION_CODES @@ -582,7 +436,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: frame += struct.pack(">BBH", 0x72, self._protocol_version, 0x0000) await self._send_cotp_dt(frame) - # Increment appropriate IntegrityId counter if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2: if function_code in READ_FUNCTION_CODES: self._integrity_id_read = (self._integrity_id_read + 1) & 0xFFFFFFFF @@ -597,7 +450,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: if len(response) < 14: raise RuntimeError("Response too short") - # For V2+, skip IntegrityId in response resp_offset = 14 if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2: if resp_offset < len(response): @@ -611,7 +463,6 @@ async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None: if self._writer is None or self._reader is None: raise RuntimeError("Not connected") - # Build COTP CR base_pdu = struct.pack(">BBHHB", 6, _COTP_CR, 0x0000, 0x0001, 0x00) calling_tsap = struct.pack(">BBH", 0xC1, 2, local_tsap) called_tsap = struct.pack(">BB", 0xC2, len(remote_tsap)) + remote_tsap @@ -620,12 +471,10 @@ async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None: params = calling_tsap + called_tsap + pdu_size_param cr_pdu = struct.pack(">B", 6 + len(params)) + base_pdu[1:] + params - # Send TPKT + CR tpkt = struct.pack(">BBH", 3, 0, 4 + len(cr_pdu)) + cr_pdu self._writer.write(tpkt) await self._writer.drain() - # Receive TPKT + CC tpkt_header = await self._reader.readexactly(4) _, _, length = struct.unpack(">BBH", tpkt_header) payload = await self._reader.readexactly(length - 4) @@ -645,7 +494,7 @@ async def _init_ssl(self) -> None: 0x0000, seq_num, 0x00000000, - 0x30, # Transport flags for InitSSL + 0x30, ) request += struct.pack(">I", 0) @@ -666,7 +515,6 @@ async def _create_session(self) -> None: """Send CreateObject to establish S7CommPlus session.""" seq_num = self._next_sequence_number() - # Build CreateObject request header request = struct.pack( ">BHHHHIB", Opcode.REQUEST, @@ -674,32 +522,24 @@ async def _create_session(self) -> None: FunctionCode.CREATE_OBJECT, 0x0000, seq_num, - ObjectId.OBJECT_NULL_SERVER_SESSION, # SessionId = 288 + ObjectId.OBJECT_NULL_SERVER_SESSION, 0x36, ) - # RequestId: ObjectServerSessionContainer (285) request += struct.pack(">I", ObjectId.OBJECT_SERVER_SESSION_CONTAINER) - - # RequestValue: ValueUDInt(0) request += bytes([0x00, DataType.UDINT]) + encode_uint32_vlq(0) - - # Unknown padding request += struct.pack(">I", 0) - # RequestObject: NullServerSession PObject request += bytes([ElementID.START_OF_OBJECT]) request += struct.pack(">I", ObjectId.GET_NEW_RID_ON_SERVER) request += encode_uint32_vlq(ObjectId.CLASS_SERVER_SESSION) - request += encode_uint32_vlq(0) # ClassFlags - request += encode_uint32_vlq(0) # AttributeId + request += encode_uint32_vlq(0) + request += encode_uint32_vlq(0) - # Attribute: ServerSessionClientRID = 0x80c3c901 request += bytes([ElementID.ATTRIBUTE]) request += encode_uint32_vlq(ObjectId.SERVER_SESSION_CLIENT_RID) request += encode_typed_value(DataType.RID, 0x80C3C901) - # Nested: ClassSubscriptions request += bytes([ElementID.START_OF_OBJECT]) request += struct.pack(">I", ObjectId.GET_NEW_RID_ON_SERVER) request += encode_uint32_vlq(ObjectId.CLASS_SUBSCRIPTIONS) @@ -710,7 +550,6 @@ async def _create_session(self) -> None: request += bytes([ElementID.TERMINATING_OBJECT]) request += struct.pack(">I", 0) - # Frame header + trailer frame = encode_header(ProtocolVersion.V1, len(request)) + request frame += struct.pack(">BBH", 0x72, ProtocolVersion.V1, 0x0000) await self._send_cotp_dt(frame) @@ -725,7 +564,6 @@ async def _create_session(self) -> None: self._session_id = struct.unpack_from(">I", response, 9)[0] self._protocol_version = version - # Parse ServerSessionVersion from response payload self._parse_create_object_response(response[14:]) def _parse_create_object_response(self, payload: bytes) -> None: @@ -754,13 +592,11 @@ def _parse_create_object_response(self, payload: bytes) -> None: logger.info(f"ServerSessionVersion = {value}") return else: - # Skip attribute value if offset + 2 > len(payload): break _flags = payload[offset] _dt = payload[offset + 1] offset += 2 - # Best-effort skip: advance past common VLQ-encoded values if offset < len(payload): _, consumed = decode_uint32_vlq(payload, offset) offset += consumed @@ -769,13 +605,13 @@ def _parse_create_object_response(self, payload: bytes) -> None: offset += 1 if offset + 4 > len(payload): break - offset += 4 # RelationId + offset += 4 _, consumed = decode_uint32_vlq(payload, offset) - offset += consumed # ClassId + offset += consumed _, consumed = decode_uint32_vlq(payload, offset) - offset += consumed # ClassFlags + offset += consumed _, consumed = decode_uint32_vlq(payload, offset) - offset += consumed # AttributeId + offset += consumed elif tag == ElementID.TERMINATING_OBJECT: offset += 1 @@ -787,28 +623,21 @@ def _parse_create_object_response(self, payload: bytes) -> None: logger.debug("ServerSessionVersion not found in CreateObject response") async def _setup_session(self) -> bool: - """Echo ServerSessionVersion back to the PLC via SetMultiVariables. - - Without this step, the PLC rejects all subsequent data operations - with ERROR2 (0x05A9). - - Returns: - True if session setup succeeded. - """ + """Echo ServerSessionVersion back to the PLC via SetMultiVariables.""" if self._server_session_version is None: return False payload = bytearray() payload += struct.pack(">I", self._session_id) - payload += encode_uint32_vlq(1) # Item count - payload += encode_uint32_vlq(1) # Total address field count + payload += encode_uint32_vlq(1) + payload += encode_uint32_vlq(1) payload += encode_uint32_vlq(ObjectId.SERVER_SESSION_VERSION) - payload += encode_uint32_vlq(1) # ItemNumber + payload += encode_uint32_vlq(1) payload += bytes([0x00, DataType.UDINT]) payload += encode_uint32_vlq(self._server_session_version) - payload += bytes([0x00]) # Fill byte + payload += bytes([0x00]) payload += encode_object_qualifier() - payload += struct.pack(">I", 0) # Trailing padding + payload += struct.pack(">I", 0) try: resp_payload = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, bytes(payload)) diff --git a/snap7/s7commplus/client.py b/s7/_s7commplus_client.py similarity index 57% rename from snap7/s7commplus/client.py rename to s7/_s7commplus_client.py index a3f32ab4..d70e9458 100644 --- a/snap7/s7commplus/client.py +++ b/s7/_s7commplus_client.py @@ -1,20 +1,8 @@ -""" -S7CommPlus client for S7-1200/1500 PLCs. - -Provides high-level operations over the S7CommPlus protocol, similar to -the existing snap7.Client but targeting S7-1200/1500 PLCs with full -engineering access (symbolic addressing, optimized data blocks, etc.). - -Supports all S7CommPlus protocol versions (V1/V2/V3/TLS). The protocol -version is auto-detected from the PLC's CreateObject response during -connection setup. +"""Pure S7CommPlus client for S7-1200/1500 PLCs (no legacy fallback). -When a PLC does not support S7CommPlus data operations (e.g. PLCs that -accept S7CommPlus sessions but return ERROR2 for GetMultiVariables), -the client transparently falls back to the legacy S7 protocol for -data block read/write operations. - -Status: V1 and V2 connections are functional. V3/TLS authentication planned. +This is an internal module used by the unified ``s7.Client``. It provides +raw S7CommPlus data operations without any fallback logic -- the unified +client is responsible for deciding when to fall back to legacy S7. Reference: thomas-v2/S7CommPlusDriver (C#, LGPL-3.0) """ @@ -37,46 +25,16 @@ class S7CommPlusClient: - """S7CommPlus client for S7-1200/1500 PLCs. - - Supports all S7CommPlus protocol versions: - - V1: S7-1200 FW V4.0+ - - V2: S7-1200/1500 with older firmware - - V3: S7-1200/1500 pre-TIA Portal V17 - - V3 + TLS: TIA Portal V17+ (recommended) - - The protocol version is auto-detected during connection. + """Pure S7CommPlus client without legacy fallback. - When the PLC does not support S7CommPlus data operations, the client - automatically falls back to legacy S7 protocol for db_read/db_write. - - Example:: - - client = S7CommPlusClient() - client.connect("192.168.1.10") - - # Read raw bytes from DB1 - data = client.db_read(1, 0, 4) - - # Write raw bytes to DB1 - client.db_write(1, 0, struct.pack(">f", 23.5)) - - client.disconnect() + Use ``s7.Client`` for automatic protocol selection. """ def __init__(self) -> None: self._connection: Optional[S7CommPlusConnection] = None - self._legacy_client: Optional[Any] = None - self._use_legacy_data: bool = False - self._host: str = "" - self._port: int = 102 - self._rack: int = 0 - self._slot: int = 1 @property def connected(self) -> bool: - if self._use_legacy_data and self._legacy_client is not None: - return bool(self._legacy_client.connected) return self._connection is not None and self._connection.connected @property @@ -94,9 +52,18 @@ def session_id(self) -> int: return self._connection.session_id @property - def using_legacy_fallback(self) -> bool: - """Whether the client is using legacy S7 protocol for data operations.""" - return self._use_legacy_data + def session_setup_ok(self) -> bool: + """Whether the S7CommPlus session setup succeeded for data operations.""" + if self._connection is None: + return False + return self._connection.session_setup_ok + + @property + def tls_active(self) -> bool: + """Whether TLS is active on the connection.""" + if self._connection is None: + return False + return self._connection.tls_active def connect( self, @@ -112,30 +79,18 @@ def connect( ) -> None: """Connect to an S7-1200/1500 PLC using S7CommPlus. - If the PLC does not support S7CommPlus data operations, a secondary - legacy S7 connection is established transparently for data access. - Args: host: PLC IP address or hostname port: TCP port (default 102) - rack: PLC rack number - slot: PLC slot number + rack: PLC rack number (unused, kept for API symmetry) + slot: PLC slot number (unused, kept for API symmetry) use_tls: Whether to activate TLS (required for V2) tls_cert: Path to client TLS certificate (PEM) tls_key: Path to client private key (PEM) tls_ca: Path to CA certificate for PLC verification (PEM) password: PLC password for legitimation (V2+ with TLS) """ - self._host = host - self._port = port - self._rack = rack - self._slot = slot - - self._connection = S7CommPlusConnection( - host=host, - port=port, - ) - + self._connection = S7CommPlusConnection(host=host, port=port) self._connection.connect( use_tls=use_tls, tls_cert=tls_cert, @@ -143,49 +98,19 @@ def connect( tls_ca=tls_ca, ) - # Handle legitimation for password-protected PLCs if password is not None and self._connection.tls_active: logger.info("Performing PLC legitimation (password authentication)") self._connection.authenticate(password) - # Check if S7CommPlus session setup succeeded. If the PLC accepted the - # ServerSessionVersion echo, data operations should work. If it returned - # an error (e.g. ERROR2), fall back to legacy S7 protocol. - if not self._connection.session_setup_ok: - logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") - self._setup_legacy_fallback() - - def _setup_legacy_fallback(self) -> None: - """Establish a secondary legacy S7 connection for data operations.""" - from ..client import Client - - self._legacy_client = Client() - self._legacy_client.connect(self._host, self._rack, self._slot, self._port) - self._use_legacy_data = True - logger.info(f"Legacy S7 fallback connected to {self._host}:{self._port}") - def disconnect(self) -> None: """Disconnect from PLC.""" - if self._legacy_client is not None: - try: - self._legacy_client.disconnect() - except Exception: - pass - self._legacy_client = None - self._use_legacy_data = False - if self._connection: self._connection.disconnect() self._connection = None - # -- Data block read/write -- - def db_read(self, db_number: int, start: int, size: int) -> bytes: """Read raw bytes from a data block. - Uses S7CommPlus protocol when supported, otherwise falls back to - legacy S7 protocol transparently. - Args: db_number: Data block number start: Start byte offset @@ -194,18 +119,11 @@ def db_read(self, db_number: int, start: int, size: int) -> bytes: Returns: Raw bytes read from the data block """ - if self._use_legacy_data and self._legacy_client is not None: - return bytes(self._legacy_client.db_read(db_number, start, size)) - if self._connection is None: raise RuntimeError("Not connected") payload = _build_read_payload([(db_number, start, size)]) - logger.debug(f"db_read: db={db_number} start={start} size={size} payload={payload.hex(' ')}") - response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - logger.debug(f"db_read: response ({len(response)} bytes): {response.hex(' ')}") - results = _parse_read_response(response) if not results: raise RuntimeError("Read returned no data") @@ -216,70 +134,38 @@ def db_read(self, db_number: int, start: int, size: int) -> bytes: def db_write(self, db_number: int, start: int, data: bytes) -> None: """Write raw bytes to a data block. - Uses S7CommPlus protocol when supported, otherwise falls back to - legacy S7 protocol transparently. - Args: db_number: Data block number start: Start byte offset data: Bytes to write """ - if self._use_legacy_data and self._legacy_client is not None: - self._legacy_client.db_write(db_number, start, bytearray(data)) - return - if self._connection is None: raise RuntimeError("Not connected") payload = _build_write_payload([(db_number, start, data)]) - logger.debug( - f"db_write: db={db_number} start={start} data_len={len(data)} data={data.hex(' ')} payload={payload.hex(' ')}" - ) - response = self._connection.send_request(FunctionCode.SET_MULTI_VARIABLES, payload) - logger.debug(f"db_write: response ({len(response)} bytes): {response.hex(' ')}") - _parse_write_response(response) def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytes]: """Read multiple data block regions in a single request. - Uses S7CommPlus protocol when supported, otherwise falls back to - legacy S7 protocol (individual reads) transparently. - Args: items: List of (db_number, start_offset, size) tuples Returns: List of raw bytes for each item """ - if self._use_legacy_data and self._legacy_client is not None: - results = [] - for db_number, start, size in items: - data = self._legacy_client.db_read(db_number, start, size) - results.append(bytes(data)) - return results - if self._connection is None: raise RuntimeError("Not connected") payload = _build_read_payload(items) - logger.debug(f"db_read_multi: {len(items)} items: {items} payload={payload.hex(' ')}") - response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - logger.debug(f"db_read_multi: response ({len(response)} bytes): {response.hex(' ')}") - parsed = _parse_read_response(response) return [r if r is not None else b"" for r in parsed] - # -- Explore (browse PLC object tree) -- - def explore(self) -> bytes: """Browse the PLC object tree. - Returns the raw Explore response payload for parsing. - Full symbolic exploration will be implemented in a future version. - Returns: Raw response payload """ @@ -287,11 +173,8 @@ def explore(self) -> bytes: raise RuntimeError("Not connected") response = self._connection.send_request(FunctionCode.EXPLORE, b"") - logger.debug(f"explore: response ({len(response)} bytes): {response.hex(' ')}") return response - # -- Context manager -- - def __enter__(self) -> "S7CommPlusClient": return self @@ -310,10 +193,7 @@ def _build_read_payload(items: list[tuple[int, int, int]]) -> bytes: Returns: Encoded payload bytes (after the 14-byte request header) - - Reference: thomas-v2/S7CommPlusDriver/Core/GetMultiVariablesRequest.cs """ - # Encode all item addresses and compute total field count addresses: list[bytes] = [] total_field_count = 0 for db_number, start, size in items: @@ -321,24 +201,18 @@ def _build_read_payload(items: list[tuple[int, int, int]]) -> bytes: addr_bytes, field_count = encode_item_address( access_area=access_area, access_sub_area=Ids.DB_VALUE_ACTUAL, - lids=[start + 1, size], # LID byte offsets are 1-based in S7CommPlus + lids=[start + 1, size], ) addresses.append(addr_bytes) total_field_count += field_count payload = bytearray() - # LinkId (UInt32 fixed = 0, for reading variables) payload += struct.pack(">I", 0) - # Item count payload += encode_uint32_vlq(len(items)) - # Total field count across all items payload += encode_uint32_vlq(total_field_count) - # Item addresses for addr in addresses: payload += addr - # ObjectQualifier payload += encode_object_qualifier() - # Padding payload += struct.pack(">I", 0) return bytes(payload) @@ -352,21 +226,16 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]: Returns: List of raw bytes per item (None for errored items) - - Reference: thomas-v2/S7CommPlusDriver/Core/GetMultiVariablesResponse.cs """ offset = 0 - # ReturnValue (UInt64 VLQ) return_value, consumed = decode_uint64_vlq(response, offset) offset += consumed - logger.debug(f"_parse_read_response: return_value={return_value}") if return_value != 0: logger.error(f"_parse_read_response: PLC returned error: {return_value}") return [] - # Value list: ItemNumber (VLQ) + PValue, terminated by ItemNumber=0 values: dict[int, bytes] = {} while offset < len(response): item_nr, consumed = decode_uint32_vlq(response, offset) @@ -377,7 +246,6 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]: offset += consumed values[item_nr] = raw_bytes - # Error list: ErrorItemNumber (VLQ) + ErrorReturnValue (UInt64 VLQ), terminated by 0 errors: dict[int, int] = {} while offset < len(response): err_item_nr, consumed = decode_uint32_vlq(response, offset) @@ -387,9 +255,7 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]: err_value, consumed = decode_uint64_vlq(response, offset) offset += consumed errors[err_item_nr] = err_value - logger.debug(f"_parse_read_response: error item {err_item_nr}: {err_value}") - # Build result list (1-based item numbers) max_item = max(max(values.keys(), default=0), max(errors.keys(), default=0)) results: list[Optional[bytes]] = [] for i in range(1, max_item + 1): @@ -409,10 +275,7 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes: Returns: Encoded payload bytes - - Reference: thomas-v2/S7CommPlusDriver/Core/SetMultiVariablesRequest.cs """ - # Encode all item addresses and compute total field count addresses: list[bytes] = [] total_field_count = 0 for db_number, start, data in items: @@ -420,30 +283,22 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes: addr_bytes, field_count = encode_item_address( access_area=access_area, access_sub_area=Ids.DB_VALUE_ACTUAL, - lids=[start + 1, len(data)], # LID byte offsets are 1-based in S7CommPlus + lids=[start + 1, len(data)], ) addresses.append(addr_bytes) total_field_count += field_count payload = bytearray() - # InObjectId (UInt32 fixed = 0, for plain variable writes) payload += struct.pack(">I", 0) - # Item count payload += encode_uint32_vlq(len(items)) - # Total field count payload += encode_uint32_vlq(total_field_count) - # Item addresses for addr in addresses: payload += addr - # Value list: ItemNumber (1-based) + PValue for i, (_, _, data) in enumerate(items, 1): payload += encode_uint32_vlq(i) payload += encode_pvalue_blob(data) - # Fill byte payload += bytes([0x00]) - # ObjectQualifier payload += encode_object_qualifier() - # Padding payload += struct.pack(">I", 0) return bytes(payload) @@ -452,25 +307,17 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes: def _parse_write_response(response: bytes) -> None: """Parse a SetMultiVariables response payload. - Args: - response: Response payload (after the 14-byte response header) - Raises: RuntimeError: If the write failed - - Reference: thomas-v2/S7CommPlusDriver/Core/SetMultiVariablesResponse.cs """ offset = 0 - # ReturnValue (UInt64 VLQ) return_value, consumed = decode_uint64_vlq(response, offset) offset += consumed - logger.debug(f"_parse_write_response: return_value={return_value}") if return_value != 0: raise RuntimeError(f"Write failed with return value {return_value}") - # Error list: ErrorItemNumber (VLQ) + ErrorReturnValue (UInt64 VLQ) errors: list[tuple[int, int]] = [] while offset < len(response): err_item_nr, consumed = decode_uint32_vlq(response, offset) diff --git a/snap7/s7commplus/server.py b/s7/_s7commplus_server.py similarity index 100% rename from snap7/s7commplus/server.py rename to s7/_s7commplus_server.py diff --git a/s7/async_client.py b/s7/async_client.py new file mode 100644 index 00000000..c7bbeb7e --- /dev/null +++ b/s7/async_client.py @@ -0,0 +1,252 @@ +"""Unified async S7 client with protocol auto-discovery. + +Provides a single async client that automatically selects the best protocol +(S7CommPlus or legacy S7) for communicating with Siemens S7 PLCs. + +Usage:: + + from s7 import AsyncClient + + async with AsyncClient() as client: + await client.connect("192.168.1.10", 0, 1) + data = await client.db_read(1, 0, 4) +""" + +import logging +from typing import Any, Optional + +from snap7.async_client import AsyncClient as LegacyAsyncClient + +from ._protocol import Protocol +from ._s7commplus_async_client import S7CommPlusAsyncClient + +logger = logging.getLogger(__name__) + + +class AsyncClient: + """Unified async S7 client with protocol auto-discovery. + + Async counterpart of :class:`s7.Client`. Automatically selects the + best protocol for the target PLC using asyncio for non-blocking I/O. + + Methods not explicitly defined are delegated to the underlying + legacy async client via ``__getattr__``. + + Example:: + + from s7 import AsyncClient + + async with AsyncClient() as client: + await client.connect("192.168.1.10", 0, 1) + data = await client.db_read(1, 0, 4) + print(client.protocol) + """ + + def __init__(self) -> None: + self._legacy: Optional[LegacyAsyncClient] = None + self._plus: Optional[S7CommPlusAsyncClient] = None + self._protocol: Protocol = Protocol.AUTO + self._host: str = "" + self._port: int = 102 + self._rack: int = 0 + self._slot: int = 1 + + @property + def protocol(self) -> Protocol: + """The protocol currently in use for DB operations.""" + return self._protocol + + @property + def connected(self) -> bool: + """Whether the client is connected to a PLC.""" + if self._legacy is not None and self._legacy.connected: + return True + if self._plus is not None and self._plus.connected: + return True + return False + + async def connect( + self, + address: str, + rack: int = 0, + slot: int = 1, + tcp_port: int = 102, + *, + protocol: Protocol = Protocol.AUTO, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + ) -> "AsyncClient": + """Connect to an S7 PLC. + + Args: + address: PLC IP address or hostname. + rack: PLC rack number. + slot: PLC slot number. + tcp_port: TCP port (default 102). + protocol: Protocol selection. AUTO tries S7CommPlus first, + then falls back to legacy S7. + use_tls: Whether to activate TLS after InitSSL. + tls_cert: Path to client TLS certificate (PEM). + tls_key: Path to client private key (PEM). + tls_ca: Path to CA certificate for PLC verification (PEM). + + Returns: + self, for method chaining. + """ + self._host = address + self._port = tcp_port + self._rack = rack + self._slot = slot + + if protocol in (Protocol.AUTO, Protocol.S7COMMPLUS): + if await self._try_s7commplus( + address, + tcp_port, + rack, + slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + ): + self._protocol = Protocol.S7COMMPLUS + logger.info(f"Async connected to {address}:{tcp_port} using S7CommPlus") + else: + if protocol == Protocol.S7COMMPLUS: + raise RuntimeError( + f"S7CommPlus connection to {address}:{tcp_port} failed and protocol=S7COMMPLUS was explicitly requested" + ) + self._protocol = Protocol.LEGACY + logger.info(f"S7CommPlus not available, using legacy S7 for {address}:{tcp_port}") + else: + self._protocol = Protocol.LEGACY + + # Always connect legacy client + self._legacy = LegacyAsyncClient() + await self._legacy.connect(address, rack, slot, tcp_port) + logger.info(f"Async legacy S7 connected to {address}:{tcp_port}") + + return self + + async def _try_s7commplus( + self, + address: str, + tcp_port: int, + rack: int, + slot: int, + *, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + ) -> bool: + """Try to establish an S7CommPlus connection.""" + plus = S7CommPlusAsyncClient() + try: + await plus.connect( + host=address, + port=tcp_port, + rack=rack, + slot=slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + ) + except Exception as e: + logger.debug(f"S7CommPlus connection failed: {e}") + return False + + if not plus.session_setup_ok: + logger.debug("S7CommPlus session setup not OK, disconnecting") + await plus.disconnect() + return False + + self._plus = plus + return True + + async def disconnect(self) -> int: + """Disconnect from PLC. + + Returns: + 0 on success (matches snap7.AsyncClient). + """ + if self._plus is not None: + try: + await self._plus.disconnect() + except Exception: + pass + self._plus = None + + if self._legacy is not None: + try: + await self._legacy.disconnect() + except Exception: + pass + self._legacy = None + + self._protocol = Protocol.AUTO + return 0 + + async def db_read(self, db_number: int, start: int, size: int) -> bytearray: + """Read raw bytes from a data block.""" + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return bytearray(await self._plus.db_read(db_number, start, size)) + if self._legacy is not None: + return await self._legacy.db_read(db_number, start, size) + raise RuntimeError("Not connected") + + async def db_write(self, db_number: int, start: int, data: bytearray) -> int: + """Write raw bytes to a data block. + + Returns: + 0 on success (matches snap7.AsyncClient). + """ + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + await self._plus.db_write(db_number, start, bytes(data)) + return 0 + if self._legacy is not None: + return await self._legacy.db_write(db_number, start, data) + raise RuntimeError("Not connected") + + async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]: + """Read multiple data block regions in a single request.""" + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return [bytearray(r) for r in await self._plus.db_read_multi(items)] + if self._legacy is not None: + results = [] + for db, start, size in items: + results.append(await self._legacy.db_read(db, start, size)) + return results + raise RuntimeError("Not connected") + + async def explore(self) -> bytes: + """Browse the PLC object tree (S7CommPlus only). + + Raises: + RuntimeError: If not connected via S7CommPlus. + """ + if self._plus is None: + raise RuntimeError("explore() requires S7CommPlus connection") + return await self._plus.explore() + + def __getattr__(self, name: str) -> Any: + """Delegate unknown methods to the legacy client.""" + if name.startswith("_"): + raise AttributeError(name) + if self._legacy is not None: + return getattr(self._legacy, name) + raise AttributeError(f"'AsyncClient' object has no attribute {name!r} (not connected)") + + async def __aenter__(self) -> "AsyncClient": + return self + + async def __aexit__(self, *args: Any) -> None: + await self.disconnect() + + def __repr__(self) -> str: + if self.connected: + return f"" + return "" diff --git a/s7/client.py b/s7/client.py new file mode 100644 index 00000000..3b624b88 --- /dev/null +++ b/s7/client.py @@ -0,0 +1,266 @@ +"""Unified S7 client with protocol auto-discovery. + +Provides a single client that automatically selects the best protocol +(S7CommPlus or legacy S7) for communicating with Siemens S7 PLCs. + +Usage:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + data = client.db_read(1, 0, 4) +""" + +import logging +from typing import Any, Optional + +from snap7.client import Client as LegacyClient + +from ._protocol import Protocol +from ._s7commplus_client import S7CommPlusClient + +logger = logging.getLogger(__name__) + + +class Client: + """Unified S7 client with protocol auto-discovery. + + Automatically selects the best protocol for the target PLC: + - S7CommPlus for S7-1200/1500 PLCs with full data operations + - Legacy S7 for S7-300/400 or when S7CommPlus is unavailable + + Methods not explicitly defined are delegated to the underlying + legacy client via ``__getattr__``. + + Example:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + data = client.db_read(1, 0, 4) + print(client.protocol) + """ + + def __init__(self) -> None: + self._legacy: Optional[LegacyClient] = None + self._plus: Optional[S7CommPlusClient] = None + self._protocol: Protocol = Protocol.AUTO + self._host: str = "" + self._port: int = 102 + self._rack: int = 0 + self._slot: int = 1 + + @property + def protocol(self) -> Protocol: + """The protocol currently in use for DB operations.""" + return self._protocol + + @property + def connected(self) -> bool: + """Whether the client is connected to a PLC.""" + if self._legacy is not None and self._legacy.connected: + return True + if self._plus is not None and self._plus.connected: + return True + return False + + def connect( + self, + address: str, + rack: int = 0, + slot: int = 1, + tcp_port: int = 102, + *, + protocol: Protocol = Protocol.AUTO, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + password: Optional[str] = None, + ) -> "Client": + """Connect to an S7 PLC. + + Args: + address: PLC IP address or hostname. + rack: PLC rack number. + slot: PLC slot number. + tcp_port: TCP port (default 102). + protocol: Protocol selection. AUTO tries S7CommPlus first, + then falls back to legacy S7. + use_tls: Whether to activate TLS (required for V2+). + tls_cert: Path to client TLS certificate (PEM). + tls_key: Path to client private key (PEM). + tls_ca: Path to CA certificate for PLC verification (PEM). + password: PLC password for legitimation (V2+ with TLS). + + Returns: + self, for method chaining. + """ + self._host = address + self._port = tcp_port + self._rack = rack + self._slot = slot + + if protocol in (Protocol.AUTO, Protocol.S7COMMPLUS): + if self._try_s7commplus( + address, + tcp_port, + rack, + slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + password=password, + ): + self._protocol = Protocol.S7COMMPLUS + logger.info(f"Connected to {address}:{tcp_port} using S7CommPlus") + else: + if protocol == Protocol.S7COMMPLUS: + raise RuntimeError( + f"S7CommPlus connection to {address}:{tcp_port} failed and protocol=S7COMMPLUS was explicitly requested" + ) + self._protocol = Protocol.LEGACY + logger.info(f"S7CommPlus not available, using legacy S7 for {address}:{tcp_port}") + else: + self._protocol = Protocol.LEGACY + + # Always connect legacy client (needed for block ops, PLC control, etc.) + self._legacy = LegacyClient() + self._legacy.connect(address, rack, slot, tcp_port) + logger.info(f"Legacy S7 connected to {address}:{tcp_port}") + + return self + + def _try_s7commplus( + self, + address: str, + tcp_port: int, + rack: int, + slot: int, + *, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + password: Optional[str] = None, + ) -> bool: + """Try to establish an S7CommPlus connection. + + Returns True if S7CommPlus data operations are available. + """ + plus = S7CommPlusClient() + try: + plus.connect( + host=address, + port=tcp_port, + rack=rack, + slot=slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + password=password, + ) + except Exception as e: + logger.debug(f"S7CommPlus connection failed: {e}") + return False + + if not plus.session_setup_ok: + logger.debug("S7CommPlus session setup not OK, disconnecting") + plus.disconnect() + return False + + self._plus = plus + return True + + def disconnect(self) -> int: + """Disconnect from PLC. + + Returns: + 0 on success (matches snap7.Client). + """ + if self._plus is not None: + try: + self._plus.disconnect() + except Exception: + pass + self._plus = None + + if self._legacy is not None: + try: + self._legacy.disconnect() + except Exception: + pass + self._legacy = None + + self._protocol = Protocol.AUTO + return 0 + + def db_read(self, db_number: int, start: int, size: int) -> bytearray: + """Read raw bytes from a data block. + + Uses S7CommPlus when available, otherwise legacy S7. + """ + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return bytearray(self._plus.db_read(db_number, start, size)) + if self._legacy is not None: + return self._legacy.db_read(db_number, start, size) + raise RuntimeError("Not connected") + + def db_write(self, db_number: int, start: int, data: bytearray) -> int: + """Write raw bytes to a data block. + + Uses S7CommPlus when available, otherwise legacy S7. + + Returns: + 0 on success (matches snap7.Client). + """ + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + self._plus.db_write(db_number, start, bytes(data)) + return 0 + if self._legacy is not None: + return self._legacy.db_write(db_number, start, data) + raise RuntimeError("Not connected") + + def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]: + """Read multiple data block regions in a single request. + + Uses S7CommPlus native multi-read when available. + """ + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return [bytearray(r) for r in self._plus.db_read_multi(items)] + if self._legacy is not None: + return [self._legacy.db_read(db, start, size) for db, start, size in items] + raise RuntimeError("Not connected") + + def explore(self) -> bytes: + """Browse the PLC object tree (S7CommPlus only). + + Raises: + RuntimeError: If not connected via S7CommPlus. + """ + if self._plus is None: + raise RuntimeError("explore() requires S7CommPlus connection") + return self._plus.explore() + + def __getattr__(self, name: str) -> Any: + """Delegate unknown methods to the legacy client.""" + if name.startswith("_"): + raise AttributeError(name) + if self._legacy is not None: + return getattr(self._legacy, name) + raise AttributeError(f"'Client' object has no attribute {name!r} (not connected)") + + def __enter__(self) -> "Client": + return self + + def __exit__(self, *args: Any) -> None: + self.disconnect() + + def __repr__(self) -> str: + if self.connected: + return f"" + return "" diff --git a/snap7/s7commplus/codec.py b/s7/codec.py similarity index 100% rename from snap7/s7commplus/codec.py rename to s7/codec.py diff --git a/snap7/s7commplus/connection.py b/s7/connection.py similarity index 98% rename from snap7/s7commplus/connection.py rename to s7/connection.py index ab4d1b67..878890bd 100644 --- a/snap7/s7commplus/connection.py +++ b/s7/connection.py @@ -44,7 +44,7 @@ from typing import Optional, Type from types import TracebackType -from ..connection import ISOTCPConnection +from snap7.connection import ISOTCPConnection from .protocol import ( FunctionCode, Opcode, @@ -216,7 +216,7 @@ def connect( ) elif self._protocol_version == ProtocolVersion.V2: if not self._tls_active: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("PLC reports V2 protocol but TLS is not active. V2 requires TLS. Use use_tls=True.") # Enable IntegrityId tracking for V2+ @@ -254,12 +254,12 @@ def authenticate(self, password: str, username: str = "") -> None: S7ConnectionError: If not connected, TLS not active, or auth fails """ if not self._connected: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Not connected") if not self._tls_active or self._oms_secret is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Legitimation requires TLS. Connect with use_tls=True.") @@ -317,13 +317,13 @@ def _get_legitimation_challenge(self) -> bytes: offset += consumed if return_value != 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"GetVarSubStreamed for challenge failed: return_value={return_value}") # Value is a USIntArray (BLOB) - read flags + type + length + data if offset + 2 > len(resp_payload): - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Challenge response too short") @@ -370,7 +370,7 @@ def _send_legitimation_new(self, encrypted_response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legitimation rejected by PLC: return_value={return_value}") logger.debug(f"New legitimation return_value={return_value}") @@ -402,7 +402,7 @@ def _send_legitimation_legacy(self, response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}") logger.debug(f"Legacy legitimation return_value={return_value}") @@ -444,7 +444,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes: Response payload (after the 14-byte response header) """ if not self._connected: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Not connected") @@ -510,7 +510,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes: logger.debug(f" Response data ({len(response)} bytes): {response.hex(' ')}") if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Response too short") @@ -586,7 +586,7 @@ def _init_ssl(self) -> None: response = response_frame[consumed:] if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("InitSSL response too short") @@ -675,7 +675,7 @@ def _create_session(self) -> None: logger.debug(f"CreateObject response body ({len(response)} bytes): {response.hex(' ')}") if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("CreateObject response too short") @@ -911,7 +911,7 @@ def _setup_session(self) -> bool: response = response_frame[consumed : consumed + data_length] if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("SetupSession response too short") @@ -988,7 +988,7 @@ def _activate_tls( # Wrap the raw TCP socket used by ISOTCPConnection raw_socket = self._iso_conn.socket if raw_socket is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Cannot activate TLS: no TCP socket") diff --git a/snap7/s7commplus/legitimation.py b/s7/legitimation.py similarity index 100% rename from snap7/s7commplus/legitimation.py rename to s7/legitimation.py diff --git a/snap7/s7commplus/protocol.py b/s7/protocol.py similarity index 100% rename from snap7/s7commplus/protocol.py rename to s7/protocol.py diff --git a/s7/py.typed b/s7/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/s7/server.py b/s7/server.py new file mode 100644 index 00000000..a0cad1d3 --- /dev/null +++ b/s7/server.py @@ -0,0 +1,132 @@ +"""Unified S7 server supporting both legacy S7 and S7CommPlus clients. + +Wraps both a legacy :class:`snap7.server.Server` and an +:class:`S7CommPlusServer` so that test environments can serve both +protocol stacks simultaneously. + +Usage:: + + from s7 import Server + + server = Server() + server.start(tcp_port=102, s7commplus_port=11020) +""" + +import logging +from typing import Any, Optional + +from snap7.server import Server as LegacyServer + +from ._s7commplus_server import S7CommPlusServer, DataBlock + +logger = logging.getLogger(__name__) + + +class Server: + """Unified S7 server for testing. + + Runs a legacy S7 server and optionally an S7CommPlus server + side by side. + """ + + def __init__(self) -> None: + self._legacy = LegacyServer() + self._plus = S7CommPlusServer() + + @property + def legacy_server(self) -> LegacyServer: + """Direct access to the legacy S7 server.""" + return self._legacy + + @property + def s7commplus_server(self) -> S7CommPlusServer: + """Direct access to the S7CommPlus server.""" + return self._plus + + def register_db( + self, + db_number: int, + variables: dict[str, tuple[str, int]], + size: int = 0, + ) -> DataBlock: + """Register a data block on the S7CommPlus server. + + Args: + db_number: Data block number + variables: Dict of {name: (type_name, offset)} + size: Total DB size in bytes (auto-calculated if 0) + + Returns: + The created DataBlock + """ + return self._plus.register_db(db_number, variables, size) + + def register_raw_db(self, db_number: int, data: bytearray) -> DataBlock: + """Register a raw data block on the S7CommPlus server. + + Args: + db_number: Data block number + data: Raw bytearray backing the data block + + Returns: + The created DataBlock + """ + return self._plus.register_raw_db(db_number, data) + + def get_db(self, db_number: int) -> Optional[DataBlock]: + """Get a registered data block.""" + return self._plus.get_db(db_number) + + def start( + self, + tcp_port: int = 102, + s7commplus_port: Optional[int] = None, + *, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + ) -> None: + """Start the server(s). + + Args: + tcp_port: Port for the legacy S7 server. + s7commplus_port: Port for the S7CommPlus server. If None, + only the legacy server is started. + use_tls: Whether to enable TLS on the S7CommPlus server. + tls_cert: Path to TLS certificate (PEM). + tls_key: Path to TLS private key (PEM). + """ + self._legacy.start(tcp_port=tcp_port) + logger.info(f"Legacy S7 server started on port {tcp_port}") + + if s7commplus_port is not None: + self._plus.start( + port=s7commplus_port, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + ) + logger.info(f"S7CommPlus server started on port {s7commplus_port}") + + def stop(self) -> None: + """Stop all servers.""" + try: + self._plus.stop() + except Exception: + pass + try: + self._legacy.stop() + except Exception: + pass + + def __getattr__(self, name: str) -> Any: + """Delegate unknown methods to the legacy server.""" + if name.startswith("_"): + raise AttributeError(name) + return getattr(self._legacy, name) + + def __enter__(self) -> "Server": + return self + + def __exit__(self, *args: Any) -> None: + self.stop() diff --git a/snap7/s7commplus/vlq.py b/s7/vlq.py similarity index 100% rename from snap7/s7commplus/vlq.py rename to s7/vlq.py diff --git a/snap7/s7commplus/__init__.py b/snap7/s7commplus/__init__.py deleted file mode 100644 index ab49d09c..00000000 --- a/snap7/s7commplus/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -S7CommPlus protocol implementation for S7-1200/1500 PLCs. - -S7CommPlus (protocol ID 0x72) is the successor to S7comm (protocol ID 0x32), -used by Siemens S7-1200 (firmware >= V4.0) and S7-1500 PLCs for full -engineering access (program download/upload, symbolic addressing, etc.). - -Supported PLC / firmware targets:: - - V1: S7-1200 FW V4.0+ (simple session handshake) - V2: S7-1200/1500 older FW (session authentication) - V3: S7-1200/1500 pre-TIA V17 (public-key key exchange) - V3 + TLS: TIA Portal V17+ (TLS 1.3 with per-device certs) - -Protocol stack:: - - +-------------------------------+ - | S7CommPlus (Protocol ID 0x72)| - +-------------------------------+ - | TLS 1.3 (optional, V17+) | - +-------------------------------+ - | COTP (ISO 8073) | - +-------------------------------+ - | TPKT (RFC 1006) | - +-------------------------------+ - | TCP (port 102) | - +-------------------------------+ - -The wire protocol (VLQ encoding, data types, function codes, object model) -is the same across all versions -- only the session authentication differs. - -Status: V1 connection functional, V2 (TLS + IntegrityId) scaffolding complete. - -Reference implementation: - https://github.com/thomas-v2/S7CommPlusDriver (C#, LGPL-3.0) -""" diff --git a/tests/conftest.py b/tests/conftest.py index 4e53e6d3..527c8906 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -69,8 +69,8 @@ def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item for mod_name in [ "tests.test_client_e2e", "test_client_e2e", - "tests.test_s7commplus_e2e", - "test_s7commplus_e2e", + "tests.test_s7_e2e", + "test_s7_e2e", ]: e2e = sys.modules.get(mod_name) if e2e is not None: diff --git a/tests/test_coverage_gaps.py b/tests/test_coverage_gaps.py index 3802b4c7..cbd19b06 100644 --- a/tests/test_coverage_gaps.py +++ b/tests/test_coverage_gaps.py @@ -19,8 +19,8 @@ from snap7.error import S7ConnectionError from snap7.server import Server from snap7.type import SrvArea -from snap7.s7commplus.connection import S7CommPlusConnection -from snap7.s7commplus.legitimation import ( +from s7.connection import S7CommPlusConnection +from s7.legitimation import ( LegitimationState, build_legacy_response, ) @@ -158,8 +158,8 @@ def test_legitimation_state_rotate_changes_key(self) -> None: # S7CommPlus async client # ============================================================================ -from snap7.s7commplus.server import S7CommPlusServer # noqa: E402 -from snap7.s7commplus.async_client import S7CommPlusAsyncClient # noqa: E402 +from s7._s7commplus_server import S7CommPlusServer # noqa: E402 +from s7._s7commplus_async_client import S7CommPlusAsyncClient # noqa: E402 ASYNC_TEST_PORT = 11125 @@ -227,13 +227,12 @@ async def test_properties(self, async_server: S7CommPlusServer) -> None: finally: await client.disconnect() - async def test_using_legacy_fallback_property(self, async_server: S7CommPlusServer) -> None: + async def test_session_setup_ok_property(self, async_server: S7CommPlusServer) -> None: client = S7CommPlusAsyncClient() await client.connect("127.0.0.1", port=ASYNC_TEST_PORT) try: - # Server supports S7CommPlus data ops, so no fallback - # (or fallback, depending on server implementation — just check the property works) - assert isinstance(client.using_legacy_fallback, bool) + # Server supports S7CommPlus data ops, so session setup should succeed + assert isinstance(client.session_setup_ok, bool) finally: await client.disconnect() diff --git a/tests/test_s7commplus_codec.py b/tests/test_s7_codec.py similarity index 99% rename from tests/test_s7commplus_codec.py rename to tests/test_s7_codec.py index 9b03881e..2bce0de0 100644 --- a/tests/test_s7commplus_codec.py +++ b/tests/test_s7_codec.py @@ -3,7 +3,7 @@ import struct import pytest -from snap7.s7commplus.codec import ( +from s7.codec import ( encode_header, decode_header, encode_request_header, @@ -35,8 +35,8 @@ encode_object_qualifier, _pvalue_element_size, ) -from snap7.s7commplus.protocol import PROTOCOL_ID, DataType, Opcode, FunctionCode, Ids -from snap7.s7commplus.vlq import encode_uint32_vlq, encode_int32_vlq, encode_uint64_vlq, encode_int64_vlq +from s7.protocol import PROTOCOL_ID, DataType, Opcode, FunctionCode, Ids +from s7.vlq import encode_uint32_vlq, encode_int32_vlq, encode_uint64_vlq, encode_int64_vlq class TestFrameHeader: diff --git a/tests/test_s7commplus_e2e.py b/tests/test_s7_e2e.py similarity index 97% rename from tests/test_s7commplus_e2e.py rename to tests/test_s7_e2e.py index f8c8bf0d..3ab8bbca 100644 --- a/tests/test_s7commplus_e2e.py +++ b/tests/test_s7_e2e.py @@ -2,7 +2,7 @@ These tests require a real PLC connection. Run with: - pytest tests/test_s7commplus_e2e.py --e2e --plc-ip=YOUR_PLC_IP + pytest tests/test_s7_e2e.py --e2e --plc-ip=YOUR_PLC_IP Available options: --e2e Enable e2e tests (required) @@ -47,14 +47,14 @@ import pytest -from snap7.s7commplus.client import S7CommPlusClient +from s7._s7commplus_client import S7CommPlusClient -# Enable DEBUG logging for all s7commplus modules so we get full hex dumps +# Enable DEBUG logging for all s7 modules so we get full hex dumps logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s %(message)s", ) -for _mod in ["snap7.s7commplus.client", "snap7.s7commplus.connection", "snap7.connection"]: +for _mod in ["s7._s7commplus_client", "s7.connection", "snap7.connection"]: logging.getLogger(_mod).setLevel(logging.DEBUG) # ============================================================================= @@ -496,8 +496,8 @@ def test_diag_raw_get_multi_variables(self) -> None: This tries several payload encodings to see which ones the PLC accepts. """ - from snap7.s7commplus.protocol import FunctionCode - from snap7.s7commplus.vlq import encode_uint32_vlq + from s7.protocol import FunctionCode + from s7.vlq import encode_uint32_vlq print(f"\n{'=' * 60}") print("DIAGNOSTIC: Raw GetMultiVariables payload experiments") @@ -523,7 +523,7 @@ def test_diag_raw_get_multi_variables(self) -> None: # Try to parse return code if len(response) > 0: - from snap7.s7commplus.vlq import decode_uint32_vlq + from s7.vlq import decode_uint32_vlq rc, consumed = decode_uint32_vlq(response, 0) print(f" Return code (VLQ): {rc} (0x{rc:X})") @@ -537,7 +537,7 @@ def test_diag_raw_get_multi_variables(self) -> None: def test_diag_raw_set_variable(self) -> None: """Try SetVariable (0x04F2) instead of SetMultiVariables to see if PLC responds differently.""" - from snap7.s7commplus.protocol import FunctionCode + from s7.protocol import FunctionCode print(f"\n{'=' * 60}") print("DIAGNOSTIC: Raw SetVariable / GetVariable experiments") @@ -563,8 +563,8 @@ def test_diag_raw_set_variable(self) -> None: def test_diag_explore_then_read(self) -> None: """Explore first to discover object IDs, then try reading using those IDs.""" - from snap7.s7commplus.protocol import FunctionCode, ElementID - from snap7.s7commplus.vlq import encode_uint32_vlq, decode_uint32_vlq + from s7.protocol import FunctionCode, ElementID + from s7.vlq import encode_uint32_vlq, decode_uint32_vlq print(f"\n{'=' * 60}") print("DIAGNOSTIC: Explore -> extract object IDs -> try reading") diff --git a/tests/test_s7commplus_server.py b/tests/test_s7_server.py similarity index 97% rename from tests/test_s7commplus_server.py rename to tests/test_s7_server.py index 2f08f575..3b980e13 100644 --- a/tests/test_s7commplus_server.py +++ b/tests/test_s7_server.py @@ -7,10 +7,10 @@ import pytest import asyncio -from snap7.s7commplus.server import S7CommPlusServer, CPUState, DataBlock -from snap7.s7commplus.client import S7CommPlusClient -from snap7.s7commplus.async_client import S7CommPlusAsyncClient -from snap7.s7commplus.protocol import ProtocolVersion +from s7._s7commplus_server import S7CommPlusServer, CPUState, DataBlock +from s7._s7commplus_client import S7CommPlusClient +from s7._s7commplus_async_client import S7CommPlusAsyncClient +from s7.protocol import ProtocolVersion # Use a high port to avoid conflicts TEST_PORT = 11120 diff --git a/tests/test_async_tls.py b/tests/test_s7_tls.py similarity index 97% rename from tests/test_async_tls.py rename to tests/test_s7_tls.py index 8c1bd007..7abc7f79 100644 --- a/tests/test_async_tls.py +++ b/tests/test_s7_tls.py @@ -8,9 +8,9 @@ import pytest from snap7.error import S7ConnectionError -from snap7.s7commplus.async_client import S7CommPlusAsyncClient -from snap7.s7commplus.server import S7CommPlusServer -from snap7.s7commplus.protocol import ProtocolVersion +from s7._s7commplus_async_client import S7CommPlusAsyncClient +from s7._s7commplus_server import S7CommPlusServer +from s7.protocol import ProtocolVersion TEST_PORT_V2 = 11130 TEST_PORT_V2_TLS = 11131 diff --git a/tests/test_s7commplus_unit.py b/tests/test_s7_unit.py similarity index 97% rename from tests/test_s7commplus_unit.py rename to tests/test_s7_unit.py index f7c5e57e..f11f0ae3 100644 --- a/tests/test_s7commplus_unit.py +++ b/tests/test_s7_unit.py @@ -3,17 +3,17 @@ import struct import pytest -from snap7.s7commplus.client import ( +from s7._s7commplus_client import ( S7CommPlusClient, _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response, ) -from snap7.s7commplus.codec import encode_pvalue_blob -from snap7.s7commplus.connection import S7CommPlusConnection, _element_size -from snap7.s7commplus.protocol import DataType, ElementID, ObjectId -from snap7.s7commplus.vlq import ( +from s7.codec import encode_pvalue_blob +from s7.connection import S7CommPlusConnection, _element_size +from s7.protocol import DataType, ElementID, ObjectId +from s7.vlq import ( encode_uint32_vlq, encode_uint64_vlq, encode_int32_vlq, @@ -269,7 +269,7 @@ def test_lword(self, conn: S7CommPlusConnection) -> None: assert new_offset == len(vlq) def test_lint(self, conn: S7CommPlusConnection) -> None: - from snap7.s7commplus.vlq import encode_int64_vlq + from s7.vlq import encode_int64_vlq vlq = encode_int64_vlq(-(2**40)) new_offset = conn._skip_typed_value(vlq, 0, DataType.LINT, 0x00) @@ -288,7 +288,7 @@ def test_timestamp(self, conn: S7CommPlusConnection) -> None: assert conn._skip_typed_value(data, 0, DataType.TIMESTAMP, 0x00) == 8 def test_timespan(self, conn: S7CommPlusConnection) -> None: - from snap7.s7commplus.vlq import encode_int64_vlq + from s7.vlq import encode_int64_vlq vlq = encode_int64_vlq(5000) # TIMESPAN uses uint64_vlq for skipping in _skip_typed_value @@ -430,7 +430,7 @@ def test_properties_not_connected(self) -> None: assert client.connected is False assert client.protocol_version == 0 assert client.session_id == 0 - assert client.using_legacy_fallback is False + assert client.session_setup_ok is False def test_db_read_not_connected(self) -> None: client = S7CommPlusClient() diff --git a/tests/test_s7commplus_v2.py b/tests/test_s7_v2.py similarity index 94% rename from tests/test_s7commplus_v2.py rename to tests/test_s7_v2.py index 1a9fc8e7..e8a2250f 100644 --- a/tests/test_s7commplus_v2.py +++ b/tests/test_s7_v2.py @@ -8,20 +8,20 @@ import pytest -from snap7.s7commplus.protocol import ( +from s7.protocol import ( FunctionCode, LegitimationId, ProtocolVersion, READ_FUNCTION_CODES, ) -from snap7.s7commplus.legitimation import ( +from s7.legitimation import ( LegitimationState, build_legacy_response, derive_legitimation_key, _build_legitimation_payload, ) -from snap7.s7commplus.vlq import encode_uint32_vlq, decode_uint32_vlq -from snap7.s7commplus.connection import S7CommPlusConnection +from s7.vlq import encode_uint32_vlq, decode_uint32_vlq +from s7.connection import S7CommPlusConnection class TestReadFunctionCodes: @@ -243,7 +243,7 @@ class TestBuildNewResponse: """Test AES-256-CBC legitimation response building.""" def test_new_response_returns_bytes(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response result = build_new_response( password="test", @@ -253,7 +253,7 @@ def test_new_response_returns_bytes(self) -> None: assert isinstance(result, bytes) def test_new_response_is_aes_block_aligned(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response result = build_new_response( password="test", @@ -264,7 +264,7 @@ def test_new_response_is_aes_block_aligned(self) -> None: assert len(result) % 16 == 0 def test_new_response_different_passwords_differ(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response challenge = b"\xab" * 16 oms = b"\xcd" * 32 @@ -273,7 +273,7 @@ def test_new_response_different_passwords_differ(self) -> None: assert r1 != r2 def test_new_response_different_secrets_differ(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response challenge = b"\xab" * 16 r1 = build_new_response("test", challenge, b"\x00" * 32) @@ -281,7 +281,7 @@ def test_new_response_different_secrets_differ(self) -> None: assert r1 != r2 def test_new_response_with_username(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response result = build_new_response( password="test", @@ -294,7 +294,7 @@ def test_new_response_with_username(self) -> None: def test_new_response_decryptable(self) -> None: """Verify the response can be decrypted back to the original payload.""" - from snap7.s7commplus.legitimation import ( + from s7.legitimation import ( build_new_response, derive_legitimation_key, _build_legitimation_payload, diff --git a/tests/test_s7commplus_vlq.py b/tests/test_s7_vlq.py similarity index 99% rename from tests/test_s7commplus_vlq.py rename to tests/test_s7_vlq.py index d7dbb596..70ed5917 100644 --- a/tests/test_s7commplus_vlq.py +++ b/tests/test_s7_vlq.py @@ -2,7 +2,7 @@ import pytest -from snap7.s7commplus.vlq import ( +from s7.vlq import ( encode_uint32_vlq, decode_uint32_vlq, encode_int32_vlq, diff --git a/tox.ini b/tox.ini index 4b10afdb..0b56471b 100644 --- a/tox.ini +++ b/tox.ini @@ -19,18 +19,18 @@ commands = [testenv:mypy] basepython = python3.13 extras = test -commands = mypy {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example +commands = mypy {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example [testenv:lint-ruff] basepython = python3.13 extras = test commands = - ruff check {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example - ruff format --diff {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example + ruff check {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example + ruff format --diff {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example [testenv:ruff] basepython = python3.13 extras = test commands = - ruff format {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example - ruff check --fix {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example + ruff format {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example + ruff check --fix {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example