From ceefe5d10455a204105becbc47d99f4829286c2f Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Fri, 27 Mar 2026 08:42:08 +0200 Subject: [PATCH 1/4] Complete S7CommPlus V1 session handshake by echoing ServerSessionVersion Instead of probing data operations and falling back on ERROR2, properly complete the V1 session setup by parsing ServerSessionVersion (attribute 306) from the CreateObject response and echoing it back via SetMultiVariables. This should fix the ERROR2 (0x05A9) rejections from real S7-1200/1500 PLCs with firmware v4.0+. Co-Authored-By: Claude Opus 4.6 --- snap7/s7commplus/async_client.py | 136 +++++++++++++++++++++++++------ snap7/s7commplus/client.py | 41 ++-------- snap7/s7commplus/connection.py | 20 ++++- 3 files changed, 134 insertions(+), 63 deletions(-) diff --git a/snap7/s7commplus/async_client.py b/snap7/s7commplus/async_client.py index 4ace2241..70f831e1 100644 --- a/snap7/s7commplus/async_client.py +++ b/snap7/s7commplus/async_client.py @@ -166,10 +166,10 @@ async def connect( if use_tls: await self._activate_tls(tls_cert=tls_cert, tls_key=tls_key, tls_ca=tls_ca) - # Step 4: S7CommPlus session setup + # Step 4: S7CommPlus session setup (CreateObject) await self._create_session() - # Step 5: Version-specific post-setup + # Step 5: Version-specific validation (before session setup handshake) if self._protocol_version >= ProtocolVersion.V3: if not use_tls: logger.warning( @@ -187,15 +187,22 @@ async def connect( logger.info("V2 IntegrityId tracking enabled") self._connected = True + + # Step 6: Session setup - echo ServerSessionVersion back to PLC + if self._server_session_version is not None: + session_setup_ok = await self._setup_session() + else: + logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete") + session_setup_ok = False logger.info( f"Async S7CommPlus connected to {host}:{port}, " f"version=V{self._protocol_version}, session={self._session_id}, " f"tls={self._tls_active}" ) - # Probe S7CommPlus data operations - if not await self._probe_s7commplus_data(): - logger.info("S7CommPlus data operations not supported, falling back to legacy S7 protocol") + # Check if S7CommPlus session setup succeeded + if not session_setup_ok: + logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") await self._setup_legacy_fallback() except Exception: @@ -409,25 +416,6 @@ async def _send_legitimation_legacy(self, response: bytes) -> None: raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}") logger.debug(f"Legacy legitimation return_value={return_value}") - async def _probe_s7commplus_data(self) -> bool: - """Test if the PLC supports S7CommPlus data operations.""" - try: - payload = struct.pack(">I", 0) + encode_uint32_vlq(0) + encode_uint32_vlq(0) - payload += encode_object_qualifier() - payload += struct.pack(">I", 0) - - response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - if len(response) < 1: - return False - return_value, _ = decode_uint64_vlq(response, 0) - if return_value != 0: - logger.debug(f"S7CommPlus probe: PLC returned error {return_value}") - return False - return True - except Exception as e: - logger.debug(f"S7CommPlus probe failed: {e}") - return False - async def _setup_legacy_fallback(self) -> None: """Establish a secondary legacy S7 connection for data operations.""" from ..client import Client @@ -737,6 +725,106 @@ async def _create_session(self) -> None: self._session_id = struct.unpack_from(">I", response, 9)[0] self._protocol_version = version + # Parse ServerSessionVersion from response payload + self._parse_create_object_response(response[14:]) + + def _parse_create_object_response(self, payload: bytes) -> None: + """Parse CreateObject response to extract ServerSessionVersion (attribute 306).""" + offset = 0 + while offset < len(payload): + tag = payload[offset] + + if tag == ElementID.ATTRIBUTE: + offset += 1 + if offset >= len(payload): + break + attr_id, consumed = decode_uint32_vlq(payload, offset) + offset += consumed + + if attr_id == ObjectId.SERVER_SESSION_VERSION: + if offset + 2 > len(payload): + break + _flags = payload[offset] + datatype = payload[offset + 1] + offset += 2 + if datatype in (DataType.UDINT, DataType.DWORD): + value, consumed = decode_uint32_vlq(payload, offset) + offset += consumed + self._server_session_version = value + logger.info(f"ServerSessionVersion = {value}") + return + else: + # Skip attribute value + if offset + 2 > len(payload): + break + _flags = payload[offset] + _dt = payload[offset + 1] + offset += 2 + # Best-effort skip: advance past common VLQ-encoded values + if offset < len(payload): + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed + + elif tag == ElementID.START_OF_OBJECT: + offset += 1 + if offset + 4 > len(payload): + break + offset += 4 # RelationId + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed # ClassId + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed # ClassFlags + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed # AttributeId + + elif tag == ElementID.TERMINATING_OBJECT: + offset += 1 + elif tag == 0x00: + offset += 1 + else: + offset += 1 + + logger.debug("ServerSessionVersion not found in CreateObject response") + + async def _setup_session(self) -> bool: + """Echo ServerSessionVersion back to the PLC via SetMultiVariables. + + Without this step, the PLC rejects all subsequent data operations + with ERROR2 (0x05A9). + + Returns: + True if session setup succeeded. + """ + if self._server_session_version is None: + return False + + payload = bytearray() + payload += struct.pack(">I", self._session_id) + payload += encode_uint32_vlq(1) # Item count + payload += encode_uint32_vlq(1) # Total address field count + payload += encode_uint32_vlq(ObjectId.SERVER_SESSION_VERSION) + payload += encode_uint32_vlq(1) # ItemNumber + payload += bytes([0x00, DataType.UDINT]) + payload += encode_uint32_vlq(self._server_session_version) + payload += bytes([0x00]) # Fill byte + payload += encode_object_qualifier() + payload += struct.pack(">I", 0) # Trailing padding + + try: + resp_payload = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, bytes(payload)) + if len(resp_payload) >= 1: + return_value, _ = decode_uint64_vlq(resp_payload, 0) + if return_value != 0: + logger.warning(f"SetupSession: PLC returned error {return_value}") + return False + else: + logger.info("Session setup completed successfully") + return True + return False + except Exception as e: + logger.warning(f"SetupSession failed: {e}") + return False + async def _delete_session(self) -> None: """Send DeleteObject to close the session.""" seq_num = self._next_sequence_number() diff --git a/snap7/s7commplus/client.py b/snap7/s7commplus/client.py index 44112c9c..a3f32ab4 100644 --- a/snap7/s7commplus/client.py +++ b/snap7/s7commplus/client.py @@ -148,44 +148,13 @@ def connect( logger.info("Performing PLC legitimation (password authentication)") self._connection.authenticate(password) - # Probe S7CommPlus data operations with a minimal request - if not self._probe_s7commplus_data(): - logger.info("S7CommPlus data operations not supported, falling back to legacy S7 protocol") + # Check if S7CommPlus session setup succeeded. If the PLC accepted the + # ServerSessionVersion echo, data operations should work. If it returned + # an error (e.g. ERROR2), fall back to legacy S7 protocol. + if not self._connection.session_setup_ok: + logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") self._setup_legacy_fallback() - def _probe_s7commplus_data(self) -> bool: - """Test if the PLC supports S7CommPlus data operations. - - Sends a minimal GetMultiVariables request with zero items. If the PLC - responds with ERROR2 or a non-zero return code, data operations are - not supported. - - Returns: - True if S7CommPlus data operations work. - """ - if self._connection is None: - return False - - try: - # Send a minimal GetMultiVariables with 0 items - payload = struct.pack(">I", 0) + encode_uint32_vlq(0) + encode_uint32_vlq(0) - payload += encode_object_qualifier() - payload += struct.pack(">I", 0) - - response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - - # Check if we got a valid response (return value = 0) - if len(response) < 1: - return False - return_value, _ = decode_uint64_vlq(response, 0) - if return_value != 0: - logger.debug(f"S7CommPlus probe: PLC returned error {return_value}") - return False - return True - except Exception as e: - logger.debug(f"S7CommPlus probe failed: {e}") - return False - def _setup_legacy_fallback(self) -> None: """Establish a secondary legacy S7 connection for data operations.""" from ..client import Client diff --git a/snap7/s7commplus/connection.py b/snap7/s7commplus/connection.py index 1b0a013e..ab4d1b67 100644 --- a/snap7/s7commplus/connection.py +++ b/snap7/s7commplus/connection.py @@ -112,6 +112,7 @@ def __init__( self._tls_active: bool = False self._connected = False self._server_session_version: Optional[int] = None + self._session_setup_ok: bool = False # V2+ IntegrityId tracking self._integrity_id_read: int = 0 @@ -150,6 +151,11 @@ def integrity_id_write(self) -> int: """Current write IntegrityId counter (V2+).""" return self._integrity_id_write + @property + def session_setup_ok(self) -> bool: + """Whether the session setup (ServerSessionVersion echo) succeeded.""" + return self._session_setup_ok + @property def oms_secret(self) -> Optional[bytes]: """OMS exporter secret from TLS session (for legitimation).""" @@ -197,9 +203,10 @@ def connect( # Step 5: Session setup - echo ServerSessionVersion back to PLC if self._server_session_version is not None: - self._setup_session() + self._session_setup_ok = self._setup_session() else: logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete") + self._session_setup_ok = False # Step 6: Version-specific post-setup if self._protocol_version >= ProtocolVersion.V3: @@ -409,6 +416,7 @@ def disconnect(self) -> None: pass self._connected = False + self._session_setup_ok = False self._tls_active = False self._ssl_socket = None self._oms_secret = None @@ -836,17 +844,20 @@ def _skip_typed_value(self, data: bytes, offset: int, datatype: int, flags: int) # Unknown type - can't skip reliably return offset - def _setup_session(self) -> None: + def _setup_session(self) -> bool: """Send SetMultiVariables to echo ServerSessionVersion back to the PLC. This completes the session handshake by writing the ServerSessionVersion attribute back to the session object. Without this step, the PLC rejects all subsequent data operations with ERROR2 (0x05A9). + Returns: + True if session setup succeeded (return_value == 0). + Reference: thomas-v2/S7CommPlusDriver SetSessionSetupData """ if self._server_session_version is None: - return + return False seq_num = self._next_sequence_number() @@ -913,8 +924,11 @@ def _setup_session(self) -> None: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value != 0: logger.warning(f"SetupSession: PLC returned error {return_value}") + return False else: logger.info("Session setup completed successfully") + return True + return False def _delete_session(self) -> None: """Send DeleteObject to close the session.""" From d1199f54faf0822a0285b5fadf0752d9401056ba Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Fri, 27 Mar 2026 10:09:30 +0200 Subject: [PATCH 2/4] Move S7CommPlus into s7/ package, simplify to 2-layer architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Collapse 3 layers (snap7, snap7.s7commplus, s7) into 2: - snap7/ stays untouched as legacy S7 protocol - s7/ is the new unified frontend with S7CommPlus + legacy fallback Key changes: - Move all S7CommPlus protocol code from snap7/s7commplus/ to s7/ - Remove duplicated fallback logic — now lives only in s7.Client - Pure S7CommPlus clients (_s7commplus_client.py) have no fallback - s7.Client tries S7CommPlus first, falls back to snap7.Client - Rename tests: test_s7commplus_* → test_s7_* - Update all imports, docs, tooling (mypy, ruff, tox) Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 42 ++- Makefile | 4 +- doc/API/s7commplus.rst | 82 ++---- doc/limitations.rst | 2 +- doc/plc-support.rst | 4 +- pyproject.toml | 3 +- s7/__init__.py | 35 +++ s7/_protocol.py | 17 ++ .../_s7commplus_async_client.py | 269 ++++-------------- .../client.py => s7/_s7commplus_client.py | 199 ++----------- .../server.py => s7/_s7commplus_server.py | 0 s7/async_client.py | 244 ++++++++++++++++ s7/client.py | 259 +++++++++++++++++ {snap7/s7commplus => s7}/codec.py | 0 {snap7/s7commplus => s7}/connection.py | 28 +- {snap7/s7commplus => s7}/legitimation.py | 0 {snap7/s7commplus => s7}/protocol.py | 0 s7/py.typed | 0 s7/server.py | 132 +++++++++ {snap7/s7commplus => s7}/vlq.py | 0 snap7/s7commplus/__init__.py | 36 --- tests/conftest.py | 4 +- tests/test_coverage_gaps.py | 15 +- ...t_s7commplus_codec.py => test_s7_codec.py} | 6 +- ...{test_s7commplus_e2e.py => test_s7_e2e.py} | 20 +- ...s7commplus_server.py => test_s7_server.py} | 8 +- tests/{test_async_tls.py => test_s7_tls.py} | 6 +- ...est_s7commplus_unit.py => test_s7_unit.py} | 16 +- .../{test_s7commplus_v2.py => test_s7_v2.py} | 20 +- ...{test_s7commplus_vlq.py => test_s7_vlq.py} | 2 +- tox.ini | 10 +- 31 files changed, 896 insertions(+), 567 deletions(-) create mode 100644 s7/__init__.py create mode 100644 s7/_protocol.py rename snap7/s7commplus/async_client.py => s7/_s7commplus_async_client.py (73%) rename snap7/s7commplus/client.py => s7/_s7commplus_client.py (57%) rename snap7/s7commplus/server.py => s7/_s7commplus_server.py (100%) create mode 100644 s7/async_client.py create mode 100644 s7/client.py rename {snap7/s7commplus => s7}/codec.py (100%) rename {snap7/s7commplus => s7}/connection.py (98%) rename {snap7/s7commplus => s7}/legitimation.py (100%) rename {snap7/s7commplus => s7}/protocol.py (100%) create mode 100644 s7/py.typed create mode 100644 s7/server.py rename {snap7/s7commplus => s7}/vlq.py (100%) delete mode 100644 snap7/s7commplus/__init__.py rename tests/{test_s7commplus_codec.py => test_s7_codec.py} (99%) rename tests/{test_s7commplus_e2e.py => test_s7_e2e.py} (97%) rename tests/{test_s7commplus_server.py => test_s7_server.py} (97%) rename tests/{test_async_tls.py => test_s7_tls.py} (97%) rename tests/{test_s7commplus_unit.py => test_s7_unit.py} (97%) rename tests/{test_s7commplus_v2.py => test_s7_v2.py} (94%) rename tests/{test_s7commplus_vlq.py => test_s7_vlq.py} (99%) diff --git a/CLAUDE.md b/CLAUDE.md index 5353da6e..4998f4a8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -8,6 +8,7 @@ Python-snap7 is a pure Python S7 communication library for interfacing with Siem ## Key Architecture +### snap7/ — Legacy S7 protocol (S7-300/400, PUT/GET on S7-1200/1500) - **snap7/client.py**: Main Client class for connecting to S7 PLCs - **snap7/server.py**: Server implementation for PLC simulation - **snap7/logo.py**: Logo PLC communication @@ -19,6 +20,20 @@ Python-snap7 is a pure Python S7 communication library for interfacing with Siem - **snap7/type.py**: Type definitions and enums (Area, Block, WordLen, etc.) - **snap7/error.py**: Error handling and exceptions +### s7/ — Unified client with S7CommPlus + legacy fallback +- **s7/client.py**: Unified Client — tries S7CommPlus, falls back to snap7.Client +- **s7/async_client.py**: Unified AsyncClient — same pattern, async +- **s7/server.py**: Unified Server wrapping both legacy and S7CommPlus +- **s7/_protocol.py**: Protocol enum (AUTO/LEGACY/S7COMMPLUS) +- **s7/_s7commplus_client.py**: Pure S7CommPlus sync client (internal) +- **s7/_s7commplus_async_client.py**: Pure S7CommPlus async client (internal) +- **s7/_s7commplus_server.py**: S7CommPlus server emulator (internal) +- **s7/connection.py**: S7CommPlus low-level connection +- **s7/protocol.py**: S7CommPlus protocol constants/enums +- **s7/codec.py**: S7CommPlus encoding/decoding +- **s7/vlq.py**: Variable-Length Quantity encoding +- **s7/legitimation.py**: Authentication helpers + ## Implementation Details ### Protocol Stack @@ -41,24 +56,31 @@ The library implements the complete S7 protocol stack: - Block operations (list, info, upload, download) - Date/time operations -### Usage +### Usage (unified s7 package — recommended for S7-1200/1500) + +```python +from s7 import Client + +client = Client() +client.connect("192.168.1.10", 0, 1) # auto-detects S7CommPlus vs legacy +data = client.db_read(1, 0, 4) +client.disconnect() +``` + +### Usage (legacy snap7 package — S7-300/400) ```python import snap7 -# Create and connect client client = snap7.Client() client.connect("192.168.1.10", 0, 1) -# Read/write operations data = client.db_read(1, 0, 4) client.db_write(1, 0, bytearray([1, 2, 3, 4])) -# Memory area access marker_data = client.mb_read(0, 4) client.mb_write(0, 4, bytearray([1, 2, 3, 4])) -# Disconnect client.disconnect() ``` @@ -98,15 +120,15 @@ pytest tests/test_client.py ### Code Quality ```bash # Type checking -mypy snap7 tests example +mypy snap7 s7 tests example # Linting and formatting check -ruff check snap7 tests example -ruff format --diff snap7 tests example +ruff check snap7 s7 tests example +ruff format --diff snap7 s7 tests example # Auto-format code -ruff format snap7 tests example -ruff check --fix snap7 tests example +ruff format snap7 s7 tests example +ruff check --fix snap7 s7 tests example ``` ### Development with tox diff --git a/Makefile b/Makefile index ab74d2ee..fcaf4b07 100644 --- a/Makefile +++ b/Makefile @@ -29,8 +29,8 @@ doc: .venv/bin/sphinx-build .PHONY: check check: .venv/bin/pytest - uv run ruff check snap7 tests example - uv run ruff format --diff snap7 tests example + uv run ruff check snap7 s7 tests example + uv run ruff format --diff snap7 s7 tests example .PHONY: ruff ruff: .venv/bin/tox diff --git a/doc/API/s7commplus.rst b/doc/API/s7commplus.rst index bd5ceecb..48066e91 100644 --- a/doc/API/s7commplus.rst +++ b/doc/API/s7commplus.rst @@ -7,24 +7,21 @@ S7CommPlus (S7-1200/1500) releases. If you encounter problems, please `open an issue `_. -The :mod:`snap7.s7commplus` package provides support for Siemens S7-1200 and -S7-1500 PLCs, which use the S7CommPlus protocol instead of the classic S7 -protocol used by S7-300/400. - -Both synchronous and asynchronous clients are available. When a PLC does not -support S7CommPlus data operations, the clients automatically fall back to the -legacy S7 protocol transparently. +The ``s7`` package provides a unified client for Siemens S7-1200 and S7-1500 +PLCs. It automatically tries the S7CommPlus protocol first and falls back to +the legacy S7 protocol when needed. Synchronous client ------------------ .. code-block:: python - from snap7.s7commplus.client import S7CommPlusClient + from s7 import Client - client = S7CommPlusClient() - client.connect("192.168.1.10") + client = Client() + client.connect("192.168.1.10", 0, 1) data = client.db_read(1, 0, 4) + print(client.protocol) # Protocol.S7COMMPLUS or Protocol.LEGACY client.disconnect() Asynchronous client @@ -33,11 +30,11 @@ Asynchronous client .. code-block:: python import asyncio - from snap7.s7commplus.async_client import S7CommPlusAsyncClient + from s7 import AsyncClient async def main(): - client = S7CommPlusAsyncClient() - await client.connect("192.168.1.10") + client = AsyncClient() + await client.connect("192.168.1.10", 0, 1) data = await client.db_read(1, 0, 4) await client.disconnect() @@ -51,10 +48,10 @@ S7-1500 PLCs with firmware 2.x use S7CommPlus V2, which requires TLS. Pass .. code-block:: python - from snap7.s7commplus.client import S7CommPlusClient + from s7 import Client - client = S7CommPlusClient() - client.connect("192.168.1.10", use_tls=True) + client = Client() + client.connect("192.168.1.10", 0, 1, use_tls=True) data = client.db_read(1, 0, 4) client.disconnect() @@ -63,7 +60,7 @@ For PLCs with custom certificates, provide the certificate paths: .. code-block:: python client.connect( - "192.168.1.10", + "192.168.1.10", 0, 1, use_tls=True, tls_cert="/path/to/client.pem", tls_key="/path/to/client.key", @@ -73,56 +70,39 @@ For PLCs with custom certificates, provide the certificate paths: Password authentication ----------------------- -Password-protected PLCs require authentication after connecting. Call -``authenticate()`` before performing data operations: +Password-protected PLCs require the ``password`` keyword argument: .. code-block:: python - from snap7.s7commplus.client import S7CommPlusClient - - client = S7CommPlusClient() - client.connect("192.168.1.10", use_tls=True) - client.authenticate(password="my_plc_password") + from s7 import Client + client = Client() + client.connect("192.168.1.10", 0, 1, use_tls=True, password="my_plc_password") data = client.db_read(1, 0, 4) client.disconnect() -The method auto-detects whether to use legacy (SHA-1 XOR) or new-style -(AES-256-CBC) authentication based on the PLC firmware version. For new-style -authentication, you can also provide a username: - -.. code-block:: python - - client.authenticate(password="my_password", username="admin") - -.. note:: - - Authentication requires TLS to be active. Calling ``authenticate()`` - without ``use_tls=True`` will raise :class:`~snap7.error.S7ConnectionError`. +Protocol selection +------------------ +By default the client uses ``Protocol.AUTO`` which tries S7CommPlus first. +You can force a specific protocol: -Legacy fallback ---------------- +.. code-block:: python -If the PLC returns an error for S7CommPlus data operations (common with some -firmware versions), the client automatically falls back to the classic S7 -protocol. You can check whether fallback is active: + from s7 import Client, Protocol -.. code-block:: python + # Force legacy S7 only + client = Client() + client.connect("192.168.1.10", 0, 1, protocol=Protocol.LEGACY) - client.connect("192.168.1.10") - if client.using_legacy_fallback: - print("Using legacy S7 protocol") + # Force S7CommPlus (raises on failure) + client.connect("192.168.1.10", 0, 1, protocol=Protocol.S7COMMPLUS) API reference ------------- -.. automodule:: snap7.s7commplus.client - :members: - -.. automodule:: snap7.s7commplus.async_client +.. automodule:: s7.client :members: -.. automodule:: snap7.s7commplus.connection +.. automodule:: s7.async_client :members: - :exclude-members: S7CommPlusConnection diff --git a/doc/limitations.rst b/doc/limitations.rst index c270ab06..03a220a8 100644 --- a/doc/limitations.rst +++ b/doc/limitations.rst @@ -26,5 +26,5 @@ are **not possible** with this protocol: individual blocks, but this is not a complete backup. * - Access S7-1200/1500 PLCs with S7CommPlus security - python-snap7 supports S7CommPlus V1 and V2 (with TLS) via - :mod:`snap7.s7commplus`. V3 is not yet supported. For PLCs that only + the ``s7`` package. V3 is not yet supported. For PLCs that only support V3, enable PUT/GET as a fallback or use OPC UA. diff --git a/doc/plc-support.rst b/doc/plc-support.rst index 1d2b0e59..3df53e30 100644 --- a/doc/plc-support.rst +++ b/doc/plc-support.rst @@ -59,7 +59,7 @@ Supported PLCs - No - V2 - **Full** (S7CommPlus V2) - - S7CommPlus V2 with TLS is supported via :mod:`snap7.s7commplus`. + - S7CommPlus V2 with TLS is supported via the ``s7`` package. * - S7-1500 (FW 3.x+) - ~2022 - PUT/GET only @@ -143,7 +143,7 @@ Siemens has evolved their PLC communication protocols over time: python-snap7 implements the **classic S7 protocol** and **S7CommPlus V1/V2**. The classic protocol remains available on most PLC families via the PUT/GET mechanism. S7CommPlus V1 and V2 (with TLS) are supported via the -:mod:`snap7.s7commplus` package. For PLCs that require S7CommPlus V3 (such +``s7`` package. For PLCs that require S7CommPlus V3 (such as the S7-1500R/H), consider using OPC UA as an alternative. diff --git a/pyproject.toml b/pyproject.toml index b865de58..ca23f81a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,9 +40,10 @@ discovery = ["pnio-dcp"] [tool.setuptools.package-data] snap7 = ["py.typed"] +s7 = ["py.typed"] [tool.setuptools.packages.find] -include = ["snap7*"] +include = ["snap7*", "s7*"] [project.scripts] snap7-server = "snap7.server:mainloop" diff --git a/s7/__init__.py b/s7/__init__.py new file mode 100644 index 00000000..1cfaa189 --- /dev/null +++ b/s7/__init__.py @@ -0,0 +1,35 @@ +"""Unified S7 communication library. + +Provides protocol-agnostic access to Siemens S7 PLCs with automatic +protocol discovery (S7CommPlus vs legacy S7). + +Usage:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + data = client.db_read(1, 0, 4) +""" + +from .client import Client +from .async_client import AsyncClient +from .server import Server +from ._protocol import Protocol + +from snap7.type import Area, Block, WordLen, SrvEvent, SrvArea +from snap7.util.db import Row, DB + +__all__ = [ + "Client", + "AsyncClient", + "Server", + "Protocol", + "Area", + "Block", + "WordLen", + "SrvEvent", + "SrvArea", + "Row", + "DB", +] diff --git a/s7/_protocol.py b/s7/_protocol.py new file mode 100644 index 00000000..bad2be02 --- /dev/null +++ b/s7/_protocol.py @@ -0,0 +1,17 @@ +"""Protocol enum for unified S7 client.""" + +from enum import Enum + + +class Protocol(Enum): + """S7 communication protocol selection. + + Attributes: + AUTO: Try S7CommPlus first, fall back to legacy S7 if unsupported. + LEGACY: Use legacy S7 protocol only (S7-300/400, basic S7-1200/1500). + S7COMMPLUS: Use S7CommPlus protocol only (S7-1200/1500 with full access). + """ + + AUTO = "auto" + LEGACY = "legacy" + S7COMMPLUS = "s7commplus" diff --git a/snap7/s7commplus/async_client.py b/s7/_s7commplus_async_client.py similarity index 73% rename from snap7/s7commplus/async_client.py rename to s7/_s7commplus_async_client.py index 70f831e1..25559c44 100644 --- a/snap7/s7commplus/async_client.py +++ b/s7/_s7commplus_async_client.py @@ -1,19 +1,10 @@ -""" -Async S7CommPlus client for S7-1200/1500 PLCs. - -Provides the same API as S7CommPlusClient but using asyncio for -non-blocking I/O. Uses asyncio.Lock for concurrent safety. +"""Pure async S7CommPlus client for S7-1200/1500 PLCs (no legacy fallback). -When a PLC does not support S7CommPlus data operations, the client -transparently falls back to the legacy S7 protocol for data block -read/write operations (using synchronous calls in an executor). +This is an internal module used by the unified ``s7.AsyncClient``. It provides +raw S7CommPlus data operations without any fallback logic -- the unified +client is responsible for deciding when to fall back to legacy S7. -Example:: - - async with S7CommPlusAsyncClient() as client: - await client.connect("192.168.1.10") - data = await client.db_read(1, 0, 4) - await client.db_write(1, 0, struct.pack(">f", 23.5)) +Reference: thomas-v2/S7CommPlusDriver (C#, LGPL-3.0) """ import asyncio @@ -35,7 +26,7 @@ ) from .codec import encode_header, decode_header, encode_typed_value, encode_object_qualifier from .vlq import encode_uint32_vlq, decode_uint32_vlq, decode_uint64_vlq -from .client import _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response +from ._s7commplus_client import _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response logger = logging.getLogger(__name__) @@ -46,17 +37,9 @@ class S7CommPlusAsyncClient: - """Async S7CommPlus client for S7-1200/1500 PLCs. - - Supports all S7CommPlus protocol versions (V1/V2/V3/TLS). The protocol - version is auto-detected from the PLC's CreateObject response during - connection setup. + """Pure async S7CommPlus client without legacy fallback. - Uses asyncio for all I/O operations and asyncio.Lock for - concurrent safety when shared between multiple coroutines. - - When the PLC does not support S7CommPlus data operations, the client - automatically falls back to legacy S7 protocol for db_read/db_write. + Use ``s7.AsyncClient`` for automatic protocol selection. """ def __init__(self) -> None: @@ -67,12 +50,6 @@ def __init__(self) -> None: self._protocol_version: int = 0 self._connected = False self._lock = asyncio.Lock() - self._legacy_client: Optional[Any] = None - self._use_legacy_data: bool = False - self._host: str = "" - self._port: int = 102 - self._rack: int = 0 - self._slot: int = 1 # V2+ IntegrityId tracking self._integrity_id_read: int = 0 @@ -83,11 +60,10 @@ def __init__(self) -> None: self._tls_active: bool = False self._oms_secret: Optional[bytes] = None self._server_session_version: Optional[int] = None + self._session_setup_ok: bool = False @property def connected(self) -> bool: - if self._use_legacy_data and self._legacy_client is not None: - return bool(self._legacy_client.connected) return self._connected @property @@ -99,9 +75,9 @@ def session_id(self) -> int: return self._session_id @property - def using_legacy_fallback(self) -> bool: - """Whether the client is using legacy S7 protocol for data operations.""" - return self._use_legacy_data + def session_setup_ok(self) -> bool: + """Whether the S7CommPlus session setup succeeded for data operations.""" + return self._session_setup_ok @property def tls_active(self) -> bool: @@ -125,32 +101,19 @@ async def connect( tls_key: Optional[str] = None, tls_ca: Optional[str] = None, ) -> None: - """Connect to an S7-1200/1500 PLC. - - The connection sequence: - 1. COTP connection (same as legacy S7comm) - 2. InitSSL handshake - 3. TLS activation (if use_tls=True, required for V2) - 4. CreateObject to establish S7CommPlus session - 5. Enable IntegrityId tracking (V2+) - - If the PLC does not support S7CommPlus data operations, a secondary - legacy S7 connection is established transparently for data access. + """Connect to an S7-1200/1500 PLC using S7CommPlus. Args: host: PLC IP address or hostname port: TCP port (default 102) - rack: PLC rack number - slot: PLC slot number + rack: PLC rack number (unused, kept for API symmetry) + slot: PLC slot number (unused, kept for API symmetry) use_tls: Whether to activate TLS after InitSSL. tls_cert: Path to client TLS certificate (PEM) tls_key: Path to client private key (PEM) tls_ca: Path to CA certificate for PLC verification (PEM) """ self._host = host - self._port = port - self._rack = rack - self._slot = slot # TCP connect self._reader, self._writer = await asyncio.open_connection(host, port) @@ -169,7 +132,7 @@ async def connect( # Step 4: S7CommPlus session setup (CreateObject) await self._create_session() - # Step 5: Version-specific validation (before session setup handshake) + # Step 5: Version-specific validation if self._protocol_version >= ProtocolVersion.V3: if not use_tls: logger.warning( @@ -177,10 +140,9 @@ async def connect( ) elif self._protocol_version == ProtocolVersion.V2: if not self._tls_active: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("PLC reports V2 protocol but TLS is not active. V2 requires TLS. Use use_tls=True.") - # Enable IntegrityId tracking for V2+ self._with_integrity_id = True self._integrity_id_read = 0 self._integrity_id_write = 0 @@ -190,21 +152,16 @@ async def connect( # Step 6: Session setup - echo ServerSessionVersion back to PLC if self._server_session_version is not None: - session_setup_ok = await self._setup_session() + self._session_setup_ok = await self._setup_session() else: logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete") - session_setup_ok = False + self._session_setup_ok = False logger.info( f"Async S7CommPlus connected to {host}:{port}, " f"version=V{self._protocol_version}, session={self._session_id}, " f"tls={self._tls_active}" ) - # Check if S7CommPlus session setup succeeded - if not session_setup_ok: - logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") - await self._setup_legacy_fallback() - except Exception: await self.disconnect() raise @@ -212,12 +169,6 @@ async def connect( async def authenticate(self, password: str, username: str = "") -> None: """Perform PLC password authentication (legitimation). - Must be called after connect() and before data operations on - password-protected PLCs. Requires TLS to be active (V2+). - - The method auto-detects legacy vs new legitimation based on - the PLC's firmware version. - Args: password: PLC password username: Username for new-style auth (optional) @@ -226,20 +177,18 @@ async def authenticate(self, password: str, username: str = "") -> None: S7ConnectionError: If not connected, TLS not active, or auth fails """ if not self._connected: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Not connected") if not self._tls_active or self._oms_secret is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Legitimation requires TLS. Connect with use_tls=True.") - # Step 1: Get challenge from PLC challenge = await self._get_legitimation_challenge() logger.info(f"Received legitimation challenge ({len(challenge)} bytes)") - # Step 2: Build response (auto-detect legacy vs new) from .legitimation import build_legacy_response, build_new_response if username: @@ -261,28 +210,17 @@ async def _activate_tls( tls_key: Optional[str] = None, tls_ca: Optional[str] = None, ) -> None: - """Activate TLS 1.3 over the COTP connection. - - Called after InitSSL and before CreateObject. Uses asyncio's - start_tls() to upgrade the existing connection to TLS. - - Args: - tls_cert: Path to client TLS certificate (PEM) - tls_key: Path to client private key (PEM) - tls_ca: Path to CA certificate for PLC verification (PEM) - """ + """Activate TLS 1.3 over the COTP connection.""" if self._writer is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Cannot activate TLS: not connected") ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.minimum_version = ssl.TLSVersion.TLSv1_3 - # TLS 1.3 ciphersuites are configured differently from TLS 1.2 if hasattr(ctx, "set_ciphersuites"): ctx.set_ciphersuites("TLS_AES_256_GCM_SHA384:TLS_AES_128_GCM_SHA256") - # If set_ciphersuites not available, TLS 1.3 uses its mandatory defaults if tls_cert and tls_key: ctx.load_cert_chain(tls_cert, tls_key) @@ -293,7 +231,6 @@ async def _activate_tls( ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - # Upgrade existing transport to TLS using asyncio start_tls transport = self._writer.transport loop = asyncio.get_event_loop() new_transport = await loop.start_tls( @@ -303,13 +240,11 @@ async def _activate_tls( server_hostname=self._host, ) - # Update reader/writer to use the TLS transport self._writer._transport = new_transport self._tls_active = True - # Extract OMS exporter secret for legitimation key derivation if new_transport is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("TLS handshake failed: no transport returned") @@ -325,13 +260,7 @@ async def _activate_tls( logger.info("TLS 1.3 activated on async COTP connection") async def _get_legitimation_challenge(self) -> bytes: - """Request legitimation challenge from PLC. - - Sends GetVarSubStreamed with address ServerSessionRequest (303). - - Returns: - Challenge bytes from PLC - """ + """Request legitimation challenge from PLC.""" from .protocol import LegitimationId, DataType as DT payload = bytearray() @@ -348,12 +277,12 @@ async def _get_legitimation_challenge(self) -> bytes: offset += consumed if return_value != 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"GetVarSubStreamed for challenge failed: return_value={return_value}") if offset + 2 > len(resp_payload): - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Challenge response too short") @@ -388,7 +317,7 @@ async def _send_legitimation_new(self, encrypted_response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legitimation rejected by PLC: return_value={return_value}") logger.debug(f"New legitimation return_value={return_value}") @@ -411,32 +340,13 @@ async def _send_legitimation_legacy(self, response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}") logger.debug(f"Legacy legitimation return_value={return_value}") - async def _setup_legacy_fallback(self) -> None: - """Establish a secondary legacy S7 connection for data operations.""" - from ..client import Client - - loop = asyncio.get_event_loop() - client = Client() - await loop.run_in_executor(None, lambda: client.connect(self._host, self._rack, self._slot, self._port)) - self._legacy_client = client - self._use_legacy_data = True - logger.info(f"Legacy S7 fallback connected to {self._host}:{self._port}") - async def disconnect(self) -> None: """Disconnect from PLC.""" - if self._legacy_client is not None: - try: - self._legacy_client.disconnect() - except Exception: - pass - self._legacy_client = None - self._use_legacy_data = False - if self._connected and self._session_id: try: await self._delete_session() @@ -453,6 +363,7 @@ async def disconnect(self) -> None: self._tls_active = False self._oms_secret = None self._server_session_version = None + self._session_setup_ok = False if self._writer: try: @@ -464,22 +375,7 @@ async def disconnect(self) -> None: self._reader = None async def db_read(self, db_number: int, start: int, size: int) -> bytes: - """Read raw bytes from a data block. - - Args: - db_number: Data block number - start: Start byte offset - size: Number of bytes to read - - Returns: - Raw bytes read from the data block - """ - if self._use_legacy_data and self._legacy_client is not None: - client = self._legacy_client - loop = asyncio.get_event_loop() - data = await loop.run_in_executor(None, lambda: client.db_read(db_number, start, size)) - return bytes(data) - + """Read raw bytes from a data block.""" payload = _build_read_payload([(db_number, start, size)]) response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload) @@ -491,67 +387,26 @@ async def db_read(self, db_number: int, start: int, size: int) -> bytes: return results[0] async def db_write(self, db_number: int, start: int, data: bytes) -> None: - """Write raw bytes to a data block. - - Args: - db_number: Data block number - start: Start byte offset - data: Bytes to write - """ - if self._use_legacy_data and self._legacy_client is not None: - client = self._legacy_client - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, lambda: client.db_write(db_number, start, bytearray(data))) - return - + """Write raw bytes to a data block.""" payload = _build_write_payload([(db_number, start, data)]) response = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, payload) _parse_write_response(response) async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytes]: - """Read multiple data block regions in a single request. - - Args: - items: List of (db_number, start_offset, size) tuples - - Returns: - List of raw bytes for each item - """ - if self._use_legacy_data and self._legacy_client is not None: - client = self._legacy_client - loop = asyncio.get_event_loop() - multi_results: list[bytes] = [] - for db_number, start, size in items: - - def _read(db: int = db_number, s: int = start, sz: int = size) -> bytearray: - return bytearray(client.db_read(db, s, sz)) - - data = await loop.run_in_executor(None, _read) - multi_results.append(bytes(data)) - return multi_results - + """Read multiple data block regions in a single request.""" payload = _build_read_payload(items) response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - parsed = _parse_read_response(response) return [r if r is not None else b"" for r in parsed] async def explore(self) -> bytes: - """Browse the PLC object tree. - - Returns: - Raw response payload - """ + """Browse the PLC object tree.""" return await self._send_request(FunctionCode.EXPLORE, b"") # -- Internal methods -- async def _send_request(self, function_code: int, payload: bytes) -> bytes: - """Send an S7CommPlus request and receive the response. - - For V2+ with IntegrityId tracking, inserts IntegrityId after the - 14-byte request header and strips it from the response. - """ + """Send an S7CommPlus request and receive the response.""" async with self._lock: if not self._connected or self._writer is None or self._reader is None: raise RuntimeError("Not connected") @@ -569,7 +424,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: 0x36, ) - # For V2+ with IntegrityId, insert after header integrity_id_bytes = b"" if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2: is_read = function_code in READ_FUNCTION_CODES @@ -582,7 +436,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: frame += struct.pack(">BBH", 0x72, self._protocol_version, 0x0000) await self._send_cotp_dt(frame) - # Increment appropriate IntegrityId counter if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2: if function_code in READ_FUNCTION_CODES: self._integrity_id_read = (self._integrity_id_read + 1) & 0xFFFFFFFF @@ -597,7 +450,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: if len(response) < 14: raise RuntimeError("Response too short") - # For V2+, skip IntegrityId in response resp_offset = 14 if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2: if resp_offset < len(response): @@ -611,7 +463,6 @@ async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None: if self._writer is None or self._reader is None: raise RuntimeError("Not connected") - # Build COTP CR base_pdu = struct.pack(">BBHHB", 6, _COTP_CR, 0x0000, 0x0001, 0x00) calling_tsap = struct.pack(">BBH", 0xC1, 2, local_tsap) called_tsap = struct.pack(">BB", 0xC2, len(remote_tsap)) + remote_tsap @@ -620,12 +471,10 @@ async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None: params = calling_tsap + called_tsap + pdu_size_param cr_pdu = struct.pack(">B", 6 + len(params)) + base_pdu[1:] + params - # Send TPKT + CR tpkt = struct.pack(">BBH", 3, 0, 4 + len(cr_pdu)) + cr_pdu self._writer.write(tpkt) await self._writer.drain() - # Receive TPKT + CC tpkt_header = await self._reader.readexactly(4) _, _, length = struct.unpack(">BBH", tpkt_header) payload = await self._reader.readexactly(length - 4) @@ -645,7 +494,7 @@ async def _init_ssl(self) -> None: 0x0000, seq_num, 0x00000000, - 0x30, # Transport flags for InitSSL + 0x30, ) request += struct.pack(">I", 0) @@ -666,7 +515,6 @@ async def _create_session(self) -> None: """Send CreateObject to establish S7CommPlus session.""" seq_num = self._next_sequence_number() - # Build CreateObject request header request = struct.pack( ">BHHHHIB", Opcode.REQUEST, @@ -674,32 +522,24 @@ async def _create_session(self) -> None: FunctionCode.CREATE_OBJECT, 0x0000, seq_num, - ObjectId.OBJECT_NULL_SERVER_SESSION, # SessionId = 288 + ObjectId.OBJECT_NULL_SERVER_SESSION, 0x36, ) - # RequestId: ObjectServerSessionContainer (285) request += struct.pack(">I", ObjectId.OBJECT_SERVER_SESSION_CONTAINER) - - # RequestValue: ValueUDInt(0) request += bytes([0x00, DataType.UDINT]) + encode_uint32_vlq(0) - - # Unknown padding request += struct.pack(">I", 0) - # RequestObject: NullServerSession PObject request += bytes([ElementID.START_OF_OBJECT]) request += struct.pack(">I", ObjectId.GET_NEW_RID_ON_SERVER) request += encode_uint32_vlq(ObjectId.CLASS_SERVER_SESSION) - request += encode_uint32_vlq(0) # ClassFlags - request += encode_uint32_vlq(0) # AttributeId + request += encode_uint32_vlq(0) + request += encode_uint32_vlq(0) - # Attribute: ServerSessionClientRID = 0x80c3c901 request += bytes([ElementID.ATTRIBUTE]) request += encode_uint32_vlq(ObjectId.SERVER_SESSION_CLIENT_RID) request += encode_typed_value(DataType.RID, 0x80C3C901) - # Nested: ClassSubscriptions request += bytes([ElementID.START_OF_OBJECT]) request += struct.pack(">I", ObjectId.GET_NEW_RID_ON_SERVER) request += encode_uint32_vlq(ObjectId.CLASS_SUBSCRIPTIONS) @@ -710,7 +550,6 @@ async def _create_session(self) -> None: request += bytes([ElementID.TERMINATING_OBJECT]) request += struct.pack(">I", 0) - # Frame header + trailer frame = encode_header(ProtocolVersion.V1, len(request)) + request frame += struct.pack(">BBH", 0x72, ProtocolVersion.V1, 0x0000) await self._send_cotp_dt(frame) @@ -725,7 +564,6 @@ async def _create_session(self) -> None: self._session_id = struct.unpack_from(">I", response, 9)[0] self._protocol_version = version - # Parse ServerSessionVersion from response payload self._parse_create_object_response(response[14:]) def _parse_create_object_response(self, payload: bytes) -> None: @@ -754,13 +592,11 @@ def _parse_create_object_response(self, payload: bytes) -> None: logger.info(f"ServerSessionVersion = {value}") return else: - # Skip attribute value if offset + 2 > len(payload): break _flags = payload[offset] _dt = payload[offset + 1] offset += 2 - # Best-effort skip: advance past common VLQ-encoded values if offset < len(payload): _, consumed = decode_uint32_vlq(payload, offset) offset += consumed @@ -769,13 +605,13 @@ def _parse_create_object_response(self, payload: bytes) -> None: offset += 1 if offset + 4 > len(payload): break - offset += 4 # RelationId + offset += 4 _, consumed = decode_uint32_vlq(payload, offset) - offset += consumed # ClassId + offset += consumed _, consumed = decode_uint32_vlq(payload, offset) - offset += consumed # ClassFlags + offset += consumed _, consumed = decode_uint32_vlq(payload, offset) - offset += consumed # AttributeId + offset += consumed elif tag == ElementID.TERMINATING_OBJECT: offset += 1 @@ -787,28 +623,21 @@ def _parse_create_object_response(self, payload: bytes) -> None: logger.debug("ServerSessionVersion not found in CreateObject response") async def _setup_session(self) -> bool: - """Echo ServerSessionVersion back to the PLC via SetMultiVariables. - - Without this step, the PLC rejects all subsequent data operations - with ERROR2 (0x05A9). - - Returns: - True if session setup succeeded. - """ + """Echo ServerSessionVersion back to the PLC via SetMultiVariables.""" if self._server_session_version is None: return False payload = bytearray() payload += struct.pack(">I", self._session_id) - payload += encode_uint32_vlq(1) # Item count - payload += encode_uint32_vlq(1) # Total address field count + payload += encode_uint32_vlq(1) + payload += encode_uint32_vlq(1) payload += encode_uint32_vlq(ObjectId.SERVER_SESSION_VERSION) - payload += encode_uint32_vlq(1) # ItemNumber + payload += encode_uint32_vlq(1) payload += bytes([0x00, DataType.UDINT]) payload += encode_uint32_vlq(self._server_session_version) - payload += bytes([0x00]) # Fill byte + payload += bytes([0x00]) payload += encode_object_qualifier() - payload += struct.pack(">I", 0) # Trailing padding + payload += struct.pack(">I", 0) try: resp_payload = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, bytes(payload)) diff --git a/snap7/s7commplus/client.py b/s7/_s7commplus_client.py similarity index 57% rename from snap7/s7commplus/client.py rename to s7/_s7commplus_client.py index a3f32ab4..d70e9458 100644 --- a/snap7/s7commplus/client.py +++ b/s7/_s7commplus_client.py @@ -1,20 +1,8 @@ -""" -S7CommPlus client for S7-1200/1500 PLCs. - -Provides high-level operations over the S7CommPlus protocol, similar to -the existing snap7.Client but targeting S7-1200/1500 PLCs with full -engineering access (symbolic addressing, optimized data blocks, etc.). - -Supports all S7CommPlus protocol versions (V1/V2/V3/TLS). The protocol -version is auto-detected from the PLC's CreateObject response during -connection setup. +"""Pure S7CommPlus client for S7-1200/1500 PLCs (no legacy fallback). -When a PLC does not support S7CommPlus data operations (e.g. PLCs that -accept S7CommPlus sessions but return ERROR2 for GetMultiVariables), -the client transparently falls back to the legacy S7 protocol for -data block read/write operations. - -Status: V1 and V2 connections are functional. V3/TLS authentication planned. +This is an internal module used by the unified ``s7.Client``. It provides +raw S7CommPlus data operations without any fallback logic -- the unified +client is responsible for deciding when to fall back to legacy S7. Reference: thomas-v2/S7CommPlusDriver (C#, LGPL-3.0) """ @@ -37,46 +25,16 @@ class S7CommPlusClient: - """S7CommPlus client for S7-1200/1500 PLCs. - - Supports all S7CommPlus protocol versions: - - V1: S7-1200 FW V4.0+ - - V2: S7-1200/1500 with older firmware - - V3: S7-1200/1500 pre-TIA Portal V17 - - V3 + TLS: TIA Portal V17+ (recommended) - - The protocol version is auto-detected during connection. + """Pure S7CommPlus client without legacy fallback. - When the PLC does not support S7CommPlus data operations, the client - automatically falls back to legacy S7 protocol for db_read/db_write. - - Example:: - - client = S7CommPlusClient() - client.connect("192.168.1.10") - - # Read raw bytes from DB1 - data = client.db_read(1, 0, 4) - - # Write raw bytes to DB1 - client.db_write(1, 0, struct.pack(">f", 23.5)) - - client.disconnect() + Use ``s7.Client`` for automatic protocol selection. """ def __init__(self) -> None: self._connection: Optional[S7CommPlusConnection] = None - self._legacy_client: Optional[Any] = None - self._use_legacy_data: bool = False - self._host: str = "" - self._port: int = 102 - self._rack: int = 0 - self._slot: int = 1 @property def connected(self) -> bool: - if self._use_legacy_data and self._legacy_client is not None: - return bool(self._legacy_client.connected) return self._connection is not None and self._connection.connected @property @@ -94,9 +52,18 @@ def session_id(self) -> int: return self._connection.session_id @property - def using_legacy_fallback(self) -> bool: - """Whether the client is using legacy S7 protocol for data operations.""" - return self._use_legacy_data + def session_setup_ok(self) -> bool: + """Whether the S7CommPlus session setup succeeded for data operations.""" + if self._connection is None: + return False + return self._connection.session_setup_ok + + @property + def tls_active(self) -> bool: + """Whether TLS is active on the connection.""" + if self._connection is None: + return False + return self._connection.tls_active def connect( self, @@ -112,30 +79,18 @@ def connect( ) -> None: """Connect to an S7-1200/1500 PLC using S7CommPlus. - If the PLC does not support S7CommPlus data operations, a secondary - legacy S7 connection is established transparently for data access. - Args: host: PLC IP address or hostname port: TCP port (default 102) - rack: PLC rack number - slot: PLC slot number + rack: PLC rack number (unused, kept for API symmetry) + slot: PLC slot number (unused, kept for API symmetry) use_tls: Whether to activate TLS (required for V2) tls_cert: Path to client TLS certificate (PEM) tls_key: Path to client private key (PEM) tls_ca: Path to CA certificate for PLC verification (PEM) password: PLC password for legitimation (V2+ with TLS) """ - self._host = host - self._port = port - self._rack = rack - self._slot = slot - - self._connection = S7CommPlusConnection( - host=host, - port=port, - ) - + self._connection = S7CommPlusConnection(host=host, port=port) self._connection.connect( use_tls=use_tls, tls_cert=tls_cert, @@ -143,49 +98,19 @@ def connect( tls_ca=tls_ca, ) - # Handle legitimation for password-protected PLCs if password is not None and self._connection.tls_active: logger.info("Performing PLC legitimation (password authentication)") self._connection.authenticate(password) - # Check if S7CommPlus session setup succeeded. If the PLC accepted the - # ServerSessionVersion echo, data operations should work. If it returned - # an error (e.g. ERROR2), fall back to legacy S7 protocol. - if not self._connection.session_setup_ok: - logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") - self._setup_legacy_fallback() - - def _setup_legacy_fallback(self) -> None: - """Establish a secondary legacy S7 connection for data operations.""" - from ..client import Client - - self._legacy_client = Client() - self._legacy_client.connect(self._host, self._rack, self._slot, self._port) - self._use_legacy_data = True - logger.info(f"Legacy S7 fallback connected to {self._host}:{self._port}") - def disconnect(self) -> None: """Disconnect from PLC.""" - if self._legacy_client is not None: - try: - self._legacy_client.disconnect() - except Exception: - pass - self._legacy_client = None - self._use_legacy_data = False - if self._connection: self._connection.disconnect() self._connection = None - # -- Data block read/write -- - def db_read(self, db_number: int, start: int, size: int) -> bytes: """Read raw bytes from a data block. - Uses S7CommPlus protocol when supported, otherwise falls back to - legacy S7 protocol transparently. - Args: db_number: Data block number start: Start byte offset @@ -194,18 +119,11 @@ def db_read(self, db_number: int, start: int, size: int) -> bytes: Returns: Raw bytes read from the data block """ - if self._use_legacy_data and self._legacy_client is not None: - return bytes(self._legacy_client.db_read(db_number, start, size)) - if self._connection is None: raise RuntimeError("Not connected") payload = _build_read_payload([(db_number, start, size)]) - logger.debug(f"db_read: db={db_number} start={start} size={size} payload={payload.hex(' ')}") - response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - logger.debug(f"db_read: response ({len(response)} bytes): {response.hex(' ')}") - results = _parse_read_response(response) if not results: raise RuntimeError("Read returned no data") @@ -216,70 +134,38 @@ def db_read(self, db_number: int, start: int, size: int) -> bytes: def db_write(self, db_number: int, start: int, data: bytes) -> None: """Write raw bytes to a data block. - Uses S7CommPlus protocol when supported, otherwise falls back to - legacy S7 protocol transparently. - Args: db_number: Data block number start: Start byte offset data: Bytes to write """ - if self._use_legacy_data and self._legacy_client is not None: - self._legacy_client.db_write(db_number, start, bytearray(data)) - return - if self._connection is None: raise RuntimeError("Not connected") payload = _build_write_payload([(db_number, start, data)]) - logger.debug( - f"db_write: db={db_number} start={start} data_len={len(data)} data={data.hex(' ')} payload={payload.hex(' ')}" - ) - response = self._connection.send_request(FunctionCode.SET_MULTI_VARIABLES, payload) - logger.debug(f"db_write: response ({len(response)} bytes): {response.hex(' ')}") - _parse_write_response(response) def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytes]: """Read multiple data block regions in a single request. - Uses S7CommPlus protocol when supported, otherwise falls back to - legacy S7 protocol (individual reads) transparently. - Args: items: List of (db_number, start_offset, size) tuples Returns: List of raw bytes for each item """ - if self._use_legacy_data and self._legacy_client is not None: - results = [] - for db_number, start, size in items: - data = self._legacy_client.db_read(db_number, start, size) - results.append(bytes(data)) - return results - if self._connection is None: raise RuntimeError("Not connected") payload = _build_read_payload(items) - logger.debug(f"db_read_multi: {len(items)} items: {items} payload={payload.hex(' ')}") - response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - logger.debug(f"db_read_multi: response ({len(response)} bytes): {response.hex(' ')}") - parsed = _parse_read_response(response) return [r if r is not None else b"" for r in parsed] - # -- Explore (browse PLC object tree) -- - def explore(self) -> bytes: """Browse the PLC object tree. - Returns the raw Explore response payload for parsing. - Full symbolic exploration will be implemented in a future version. - Returns: Raw response payload """ @@ -287,11 +173,8 @@ def explore(self) -> bytes: raise RuntimeError("Not connected") response = self._connection.send_request(FunctionCode.EXPLORE, b"") - logger.debug(f"explore: response ({len(response)} bytes): {response.hex(' ')}") return response - # -- Context manager -- - def __enter__(self) -> "S7CommPlusClient": return self @@ -310,10 +193,7 @@ def _build_read_payload(items: list[tuple[int, int, int]]) -> bytes: Returns: Encoded payload bytes (after the 14-byte request header) - - Reference: thomas-v2/S7CommPlusDriver/Core/GetMultiVariablesRequest.cs """ - # Encode all item addresses and compute total field count addresses: list[bytes] = [] total_field_count = 0 for db_number, start, size in items: @@ -321,24 +201,18 @@ def _build_read_payload(items: list[tuple[int, int, int]]) -> bytes: addr_bytes, field_count = encode_item_address( access_area=access_area, access_sub_area=Ids.DB_VALUE_ACTUAL, - lids=[start + 1, size], # LID byte offsets are 1-based in S7CommPlus + lids=[start + 1, size], ) addresses.append(addr_bytes) total_field_count += field_count payload = bytearray() - # LinkId (UInt32 fixed = 0, for reading variables) payload += struct.pack(">I", 0) - # Item count payload += encode_uint32_vlq(len(items)) - # Total field count across all items payload += encode_uint32_vlq(total_field_count) - # Item addresses for addr in addresses: payload += addr - # ObjectQualifier payload += encode_object_qualifier() - # Padding payload += struct.pack(">I", 0) return bytes(payload) @@ -352,21 +226,16 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]: Returns: List of raw bytes per item (None for errored items) - - Reference: thomas-v2/S7CommPlusDriver/Core/GetMultiVariablesResponse.cs """ offset = 0 - # ReturnValue (UInt64 VLQ) return_value, consumed = decode_uint64_vlq(response, offset) offset += consumed - logger.debug(f"_parse_read_response: return_value={return_value}") if return_value != 0: logger.error(f"_parse_read_response: PLC returned error: {return_value}") return [] - # Value list: ItemNumber (VLQ) + PValue, terminated by ItemNumber=0 values: dict[int, bytes] = {} while offset < len(response): item_nr, consumed = decode_uint32_vlq(response, offset) @@ -377,7 +246,6 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]: offset += consumed values[item_nr] = raw_bytes - # Error list: ErrorItemNumber (VLQ) + ErrorReturnValue (UInt64 VLQ), terminated by 0 errors: dict[int, int] = {} while offset < len(response): err_item_nr, consumed = decode_uint32_vlq(response, offset) @@ -387,9 +255,7 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]: err_value, consumed = decode_uint64_vlq(response, offset) offset += consumed errors[err_item_nr] = err_value - logger.debug(f"_parse_read_response: error item {err_item_nr}: {err_value}") - # Build result list (1-based item numbers) max_item = max(max(values.keys(), default=0), max(errors.keys(), default=0)) results: list[Optional[bytes]] = [] for i in range(1, max_item + 1): @@ -409,10 +275,7 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes: Returns: Encoded payload bytes - - Reference: thomas-v2/S7CommPlusDriver/Core/SetMultiVariablesRequest.cs """ - # Encode all item addresses and compute total field count addresses: list[bytes] = [] total_field_count = 0 for db_number, start, data in items: @@ -420,30 +283,22 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes: addr_bytes, field_count = encode_item_address( access_area=access_area, access_sub_area=Ids.DB_VALUE_ACTUAL, - lids=[start + 1, len(data)], # LID byte offsets are 1-based in S7CommPlus + lids=[start + 1, len(data)], ) addresses.append(addr_bytes) total_field_count += field_count payload = bytearray() - # InObjectId (UInt32 fixed = 0, for plain variable writes) payload += struct.pack(">I", 0) - # Item count payload += encode_uint32_vlq(len(items)) - # Total field count payload += encode_uint32_vlq(total_field_count) - # Item addresses for addr in addresses: payload += addr - # Value list: ItemNumber (1-based) + PValue for i, (_, _, data) in enumerate(items, 1): payload += encode_uint32_vlq(i) payload += encode_pvalue_blob(data) - # Fill byte payload += bytes([0x00]) - # ObjectQualifier payload += encode_object_qualifier() - # Padding payload += struct.pack(">I", 0) return bytes(payload) @@ -452,25 +307,17 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes: def _parse_write_response(response: bytes) -> None: """Parse a SetMultiVariables response payload. - Args: - response: Response payload (after the 14-byte response header) - Raises: RuntimeError: If the write failed - - Reference: thomas-v2/S7CommPlusDriver/Core/SetMultiVariablesResponse.cs """ offset = 0 - # ReturnValue (UInt64 VLQ) return_value, consumed = decode_uint64_vlq(response, offset) offset += consumed - logger.debug(f"_parse_write_response: return_value={return_value}") if return_value != 0: raise RuntimeError(f"Write failed with return value {return_value}") - # Error list: ErrorItemNumber (VLQ) + ErrorReturnValue (UInt64 VLQ) errors: list[tuple[int, int]] = [] while offset < len(response): err_item_nr, consumed = decode_uint32_vlq(response, offset) diff --git a/snap7/s7commplus/server.py b/s7/_s7commplus_server.py similarity index 100% rename from snap7/s7commplus/server.py rename to s7/_s7commplus_server.py diff --git a/s7/async_client.py b/s7/async_client.py new file mode 100644 index 00000000..88995abf --- /dev/null +++ b/s7/async_client.py @@ -0,0 +1,244 @@ +"""Unified async S7 client with protocol auto-discovery. + +Provides a single async client that automatically selects the best protocol +(S7CommPlus or legacy S7) for communicating with Siemens S7 PLCs. + +Usage:: + + from s7 import AsyncClient + + async with AsyncClient() as client: + await client.connect("192.168.1.10", 0, 1) + data = await client.db_read(1, 0, 4) +""" + +import logging +from typing import Any, Optional + +from snap7.async_client import AsyncClient as LegacyAsyncClient + +from ._protocol import Protocol +from ._s7commplus_async_client import S7CommPlusAsyncClient + +logger = logging.getLogger(__name__) + + +class AsyncClient: + """Unified async S7 client with protocol auto-discovery. + + Async counterpart of :class:`s7.Client`. Automatically selects the + best protocol for the target PLC using asyncio for non-blocking I/O. + + Methods not explicitly defined are delegated to the underlying + legacy async client via ``__getattr__``. + + Example:: + + from s7 import AsyncClient + + async with AsyncClient() as client: + await client.connect("192.168.1.10", 0, 1) + data = await client.db_read(1, 0, 4) + print(client.protocol) + """ + + def __init__(self) -> None: + self._legacy: Optional[LegacyAsyncClient] = None + self._plus: Optional[S7CommPlusAsyncClient] = None + self._protocol: Protocol = Protocol.AUTO + self._host: str = "" + self._port: int = 102 + self._rack: int = 0 + self._slot: int = 1 + + @property + def protocol(self) -> Protocol: + """The protocol currently in use for DB operations.""" + return self._protocol + + @property + def connected(self) -> bool: + """Whether the client is connected to a PLC.""" + if self._legacy is not None and self._legacy.connected: + return True + if self._plus is not None and self._plus.connected: + return True + return False + + async def connect( + self, + address: str, + rack: int = 0, + slot: int = 1, + tcp_port: int = 102, + *, + protocol: Protocol = Protocol.AUTO, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + ) -> "AsyncClient": + """Connect to an S7 PLC. + + Args: + address: PLC IP address or hostname. + rack: PLC rack number. + slot: PLC slot number. + tcp_port: TCP port (default 102). + protocol: Protocol selection. AUTO tries S7CommPlus first, + then falls back to legacy S7. + use_tls: Whether to activate TLS after InitSSL. + tls_cert: Path to client TLS certificate (PEM). + tls_key: Path to client private key (PEM). + tls_ca: Path to CA certificate for PLC verification (PEM). + + Returns: + self, for method chaining. + """ + self._host = address + self._port = tcp_port + self._rack = rack + self._slot = slot + + if protocol in (Protocol.AUTO, Protocol.S7COMMPLUS): + if await self._try_s7commplus( + address, + tcp_port, + rack, + slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + ): + self._protocol = Protocol.S7COMMPLUS + logger.info(f"Async connected to {address}:{tcp_port} using S7CommPlus") + else: + if protocol == Protocol.S7COMMPLUS: + raise RuntimeError( + f"S7CommPlus connection to {address}:{tcp_port} failed and protocol=S7COMMPLUS was explicitly requested" + ) + self._protocol = Protocol.LEGACY + logger.info(f"S7CommPlus not available, using legacy S7 for {address}:{tcp_port}") + else: + self._protocol = Protocol.LEGACY + + # Always connect legacy client + self._legacy = LegacyAsyncClient() + await self._legacy.connect(address, rack, slot, tcp_port) + logger.info(f"Async legacy S7 connected to {address}:{tcp_port}") + + return self + + async def _try_s7commplus( + self, + address: str, + tcp_port: int, + rack: int, + slot: int, + *, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + ) -> bool: + """Try to establish an S7CommPlus connection.""" + plus = S7CommPlusAsyncClient() + try: + await plus.connect( + host=address, + port=tcp_port, + rack=rack, + slot=slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + ) + except Exception as e: + logger.debug(f"S7CommPlus connection failed: {e}") + return False + + if not plus.session_setup_ok: + logger.debug("S7CommPlus session setup not OK, disconnecting") + await plus.disconnect() + return False + + self._plus = plus + return True + + async def disconnect(self) -> None: + """Disconnect from PLC.""" + if self._plus is not None: + try: + await self._plus.disconnect() + except Exception: + pass + self._plus = None + + if self._legacy is not None: + try: + await self._legacy.disconnect() + except Exception: + pass + self._legacy = None + + self._protocol = Protocol.AUTO + + async def db_read(self, db_number: int, start: int, size: int) -> bytearray: + """Read raw bytes from a data block.""" + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return bytearray(await self._plus.db_read(db_number, start, size)) + if self._legacy is not None: + return await self._legacy.db_read(db_number, start, size) + raise RuntimeError("Not connected") + + async def db_write(self, db_number: int, start: int, data: bytearray) -> None: + """Write raw bytes to a data block.""" + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + await self._plus.db_write(db_number, start, bytes(data)) + return + if self._legacy is not None: + await self._legacy.db_write(db_number, start, data) + return + raise RuntimeError("Not connected") + + async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]: + """Read multiple data block regions in a single request.""" + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return [bytearray(r) for r in await self._plus.db_read_multi(items)] + if self._legacy is not None: + results = [] + for db, start, size in items: + results.append(await self._legacy.db_read(db, start, size)) + return results + raise RuntimeError("Not connected") + + async def explore(self) -> bytes: + """Browse the PLC object tree (S7CommPlus only). + + Raises: + RuntimeError: If not connected via S7CommPlus. + """ + if self._plus is None: + raise RuntimeError("explore() requires S7CommPlus connection") + return await self._plus.explore() + + def __getattr__(self, name: str) -> Any: + """Delegate unknown methods to the legacy client.""" + if name.startswith("_"): + raise AttributeError(name) + if self._legacy is not None: + return getattr(self._legacy, name) + raise AttributeError(f"'AsyncClient' object has no attribute {name!r} (not connected)") + + async def __aenter__(self) -> "AsyncClient": + return self + + async def __aexit__(self, *args: Any) -> None: + await self.disconnect() + + def __repr__(self) -> str: + if self.connected: + return f"" + return "" diff --git a/s7/client.py b/s7/client.py new file mode 100644 index 00000000..d74d86e7 --- /dev/null +++ b/s7/client.py @@ -0,0 +1,259 @@ +"""Unified S7 client with protocol auto-discovery. + +Provides a single client that automatically selects the best protocol +(S7CommPlus or legacy S7) for communicating with Siemens S7 PLCs. + +Usage:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + data = client.db_read(1, 0, 4) +""" + +import logging +from typing import Any, Optional + +from snap7.client import Client as LegacyClient + +from ._protocol import Protocol +from ._s7commplus_client import S7CommPlusClient + +logger = logging.getLogger(__name__) + + +class Client: + """Unified S7 client with protocol auto-discovery. + + Automatically selects the best protocol for the target PLC: + - S7CommPlus for S7-1200/1500 PLCs with full data operations + - Legacy S7 for S7-300/400 or when S7CommPlus is unavailable + + Methods not explicitly defined are delegated to the underlying + legacy client via ``__getattr__``. + + Example:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + data = client.db_read(1, 0, 4) + print(client.protocol) + """ + + def __init__(self) -> None: + self._legacy: Optional[LegacyClient] = None + self._plus: Optional[S7CommPlusClient] = None + self._protocol: Protocol = Protocol.AUTO + self._host: str = "" + self._port: int = 102 + self._rack: int = 0 + self._slot: int = 1 + + @property + def protocol(self) -> Protocol: + """The protocol currently in use for DB operations.""" + return self._protocol + + @property + def connected(self) -> bool: + """Whether the client is connected to a PLC.""" + if self._legacy is not None and self._legacy.connected: + return True + if self._plus is not None and self._plus.connected: + return True + return False + + def connect( + self, + address: str, + rack: int = 0, + slot: int = 1, + tcp_port: int = 102, + *, + protocol: Protocol = Protocol.AUTO, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + password: Optional[str] = None, + ) -> "Client": + """Connect to an S7 PLC. + + Args: + address: PLC IP address or hostname. + rack: PLC rack number. + slot: PLC slot number. + tcp_port: TCP port (default 102). + protocol: Protocol selection. AUTO tries S7CommPlus first, + then falls back to legacy S7. + use_tls: Whether to activate TLS (required for V2+). + tls_cert: Path to client TLS certificate (PEM). + tls_key: Path to client private key (PEM). + tls_ca: Path to CA certificate for PLC verification (PEM). + password: PLC password for legitimation (V2+ with TLS). + + Returns: + self, for method chaining. + """ + self._host = address + self._port = tcp_port + self._rack = rack + self._slot = slot + + if protocol in (Protocol.AUTO, Protocol.S7COMMPLUS): + if self._try_s7commplus( + address, + tcp_port, + rack, + slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + password=password, + ): + self._protocol = Protocol.S7COMMPLUS + logger.info(f"Connected to {address}:{tcp_port} using S7CommPlus") + else: + if protocol == Protocol.S7COMMPLUS: + raise RuntimeError( + f"S7CommPlus connection to {address}:{tcp_port} failed and protocol=S7COMMPLUS was explicitly requested" + ) + self._protocol = Protocol.LEGACY + logger.info(f"S7CommPlus not available, using legacy S7 for {address}:{tcp_port}") + else: + self._protocol = Protocol.LEGACY + + # Always connect legacy client (needed for block ops, PLC control, etc.) + self._legacy = LegacyClient() + self._legacy.connect(address, rack, slot, tcp_port) + logger.info(f"Legacy S7 connected to {address}:{tcp_port}") + + return self + + def _try_s7commplus( + self, + address: str, + tcp_port: int, + rack: int, + slot: int, + *, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None, + password: Optional[str] = None, + ) -> bool: + """Try to establish an S7CommPlus connection. + + Returns True if S7CommPlus data operations are available. + """ + plus = S7CommPlusClient() + try: + plus.connect( + host=address, + port=tcp_port, + rack=rack, + slot=slot, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + tls_ca=tls_ca, + password=password, + ) + except Exception as e: + logger.debug(f"S7CommPlus connection failed: {e}") + return False + + if not plus.session_setup_ok: + logger.debug("S7CommPlus session setup not OK, disconnecting") + plus.disconnect() + return False + + self._plus = plus + return True + + def disconnect(self) -> None: + """Disconnect from PLC.""" + if self._plus is not None: + try: + self._plus.disconnect() + except Exception: + pass + self._plus = None + + if self._legacy is not None: + try: + self._legacy.disconnect() + except Exception: + pass + self._legacy = None + + self._protocol = Protocol.AUTO + + def db_read(self, db_number: int, start: int, size: int) -> bytearray: + """Read raw bytes from a data block. + + Uses S7CommPlus when available, otherwise legacy S7. + """ + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return bytearray(self._plus.db_read(db_number, start, size)) + if self._legacy is not None: + return self._legacy.db_read(db_number, start, size) + raise RuntimeError("Not connected") + + def db_write(self, db_number: int, start: int, data: bytearray) -> None: + """Write raw bytes to a data block. + + Uses S7CommPlus when available, otherwise legacy S7. + """ + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + self._plus.db_write(db_number, start, bytes(data)) + return + if self._legacy is not None: + self._legacy.db_write(db_number, start, data) + return + raise RuntimeError("Not connected") + + def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]: + """Read multiple data block regions in a single request. + + Uses S7CommPlus native multi-read when available. + """ + if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: + return [bytearray(r) for r in self._plus.db_read_multi(items)] + if self._legacy is not None: + return [self._legacy.db_read(db, start, size) for db, start, size in items] + raise RuntimeError("Not connected") + + def explore(self) -> bytes: + """Browse the PLC object tree (S7CommPlus only). + + Raises: + RuntimeError: If not connected via S7CommPlus. + """ + if self._plus is None: + raise RuntimeError("explore() requires S7CommPlus connection") + return self._plus.explore() + + def __getattr__(self, name: str) -> Any: + """Delegate unknown methods to the legacy client.""" + if name.startswith("_"): + raise AttributeError(name) + if self._legacy is not None: + return getattr(self._legacy, name) + raise AttributeError(f"'Client' object has no attribute {name!r} (not connected)") + + def __enter__(self) -> "Client": + return self + + def __exit__(self, *args: Any) -> None: + self.disconnect() + + def __repr__(self) -> str: + if self.connected: + return f"" + return "" diff --git a/snap7/s7commplus/codec.py b/s7/codec.py similarity index 100% rename from snap7/s7commplus/codec.py rename to s7/codec.py diff --git a/snap7/s7commplus/connection.py b/s7/connection.py similarity index 98% rename from snap7/s7commplus/connection.py rename to s7/connection.py index ab4d1b67..878890bd 100644 --- a/snap7/s7commplus/connection.py +++ b/s7/connection.py @@ -44,7 +44,7 @@ from typing import Optional, Type from types import TracebackType -from ..connection import ISOTCPConnection +from snap7.connection import ISOTCPConnection from .protocol import ( FunctionCode, Opcode, @@ -216,7 +216,7 @@ def connect( ) elif self._protocol_version == ProtocolVersion.V2: if not self._tls_active: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("PLC reports V2 protocol but TLS is not active. V2 requires TLS. Use use_tls=True.") # Enable IntegrityId tracking for V2+ @@ -254,12 +254,12 @@ def authenticate(self, password: str, username: str = "") -> None: S7ConnectionError: If not connected, TLS not active, or auth fails """ if not self._connected: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Not connected") if not self._tls_active or self._oms_secret is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Legitimation requires TLS. Connect with use_tls=True.") @@ -317,13 +317,13 @@ def _get_legitimation_challenge(self) -> bytes: offset += consumed if return_value != 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"GetVarSubStreamed for challenge failed: return_value={return_value}") # Value is a USIntArray (BLOB) - read flags + type + length + data if offset + 2 > len(resp_payload): - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Challenge response too short") @@ -370,7 +370,7 @@ def _send_legitimation_new(self, encrypted_response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legitimation rejected by PLC: return_value={return_value}") logger.debug(f"New legitimation return_value={return_value}") @@ -402,7 +402,7 @@ def _send_legitimation_legacy(self, response: bytes) -> None: if len(resp_payload) >= 1: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value < 0: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}") logger.debug(f"Legacy legitimation return_value={return_value}") @@ -444,7 +444,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes: Response payload (after the 14-byte response header) """ if not self._connected: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Not connected") @@ -510,7 +510,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes: logger.debug(f" Response data ({len(response)} bytes): {response.hex(' ')}") if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Response too short") @@ -586,7 +586,7 @@ def _init_ssl(self) -> None: response = response_frame[consumed:] if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("InitSSL response too short") @@ -675,7 +675,7 @@ def _create_session(self) -> None: logger.debug(f"CreateObject response body ({len(response)} bytes): {response.hex(' ')}") if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("CreateObject response too short") @@ -911,7 +911,7 @@ def _setup_session(self) -> bool: response = response_frame[consumed : consumed + data_length] if len(response) < 14: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("SetupSession response too short") @@ -988,7 +988,7 @@ def _activate_tls( # Wrap the raw TCP socket used by ISOTCPConnection raw_socket = self._iso_conn.socket if raw_socket is None: - from ..error import S7ConnectionError + from snap7.error import S7ConnectionError raise S7ConnectionError("Cannot activate TLS: no TCP socket") diff --git a/snap7/s7commplus/legitimation.py b/s7/legitimation.py similarity index 100% rename from snap7/s7commplus/legitimation.py rename to s7/legitimation.py diff --git a/snap7/s7commplus/protocol.py b/s7/protocol.py similarity index 100% rename from snap7/s7commplus/protocol.py rename to s7/protocol.py diff --git a/s7/py.typed b/s7/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/s7/server.py b/s7/server.py new file mode 100644 index 00000000..a0cad1d3 --- /dev/null +++ b/s7/server.py @@ -0,0 +1,132 @@ +"""Unified S7 server supporting both legacy S7 and S7CommPlus clients. + +Wraps both a legacy :class:`snap7.server.Server` and an +:class:`S7CommPlusServer` so that test environments can serve both +protocol stacks simultaneously. + +Usage:: + + from s7 import Server + + server = Server() + server.start(tcp_port=102, s7commplus_port=11020) +""" + +import logging +from typing import Any, Optional + +from snap7.server import Server as LegacyServer + +from ._s7commplus_server import S7CommPlusServer, DataBlock + +logger = logging.getLogger(__name__) + + +class Server: + """Unified S7 server for testing. + + Runs a legacy S7 server and optionally an S7CommPlus server + side by side. + """ + + def __init__(self) -> None: + self._legacy = LegacyServer() + self._plus = S7CommPlusServer() + + @property + def legacy_server(self) -> LegacyServer: + """Direct access to the legacy S7 server.""" + return self._legacy + + @property + def s7commplus_server(self) -> S7CommPlusServer: + """Direct access to the S7CommPlus server.""" + return self._plus + + def register_db( + self, + db_number: int, + variables: dict[str, tuple[str, int]], + size: int = 0, + ) -> DataBlock: + """Register a data block on the S7CommPlus server. + + Args: + db_number: Data block number + variables: Dict of {name: (type_name, offset)} + size: Total DB size in bytes (auto-calculated if 0) + + Returns: + The created DataBlock + """ + return self._plus.register_db(db_number, variables, size) + + def register_raw_db(self, db_number: int, data: bytearray) -> DataBlock: + """Register a raw data block on the S7CommPlus server. + + Args: + db_number: Data block number + data: Raw bytearray backing the data block + + Returns: + The created DataBlock + """ + return self._plus.register_raw_db(db_number, data) + + def get_db(self, db_number: int) -> Optional[DataBlock]: + """Get a registered data block.""" + return self._plus.get_db(db_number) + + def start( + self, + tcp_port: int = 102, + s7commplus_port: Optional[int] = None, + *, + use_tls: bool = False, + tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + ) -> None: + """Start the server(s). + + Args: + tcp_port: Port for the legacy S7 server. + s7commplus_port: Port for the S7CommPlus server. If None, + only the legacy server is started. + use_tls: Whether to enable TLS on the S7CommPlus server. + tls_cert: Path to TLS certificate (PEM). + tls_key: Path to TLS private key (PEM). + """ + self._legacy.start(tcp_port=tcp_port) + logger.info(f"Legacy S7 server started on port {tcp_port}") + + if s7commplus_port is not None: + self._plus.start( + port=s7commplus_port, + use_tls=use_tls, + tls_cert=tls_cert, + tls_key=tls_key, + ) + logger.info(f"S7CommPlus server started on port {s7commplus_port}") + + def stop(self) -> None: + """Stop all servers.""" + try: + self._plus.stop() + except Exception: + pass + try: + self._legacy.stop() + except Exception: + pass + + def __getattr__(self, name: str) -> Any: + """Delegate unknown methods to the legacy server.""" + if name.startswith("_"): + raise AttributeError(name) + return getattr(self._legacy, name) + + def __enter__(self) -> "Server": + return self + + def __exit__(self, *args: Any) -> None: + self.stop() diff --git a/snap7/s7commplus/vlq.py b/s7/vlq.py similarity index 100% rename from snap7/s7commplus/vlq.py rename to s7/vlq.py diff --git a/snap7/s7commplus/__init__.py b/snap7/s7commplus/__init__.py deleted file mode 100644 index ab49d09c..00000000 --- a/snap7/s7commplus/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -S7CommPlus protocol implementation for S7-1200/1500 PLCs. - -S7CommPlus (protocol ID 0x72) is the successor to S7comm (protocol ID 0x32), -used by Siemens S7-1200 (firmware >= V4.0) and S7-1500 PLCs for full -engineering access (program download/upload, symbolic addressing, etc.). - -Supported PLC / firmware targets:: - - V1: S7-1200 FW V4.0+ (simple session handshake) - V2: S7-1200/1500 older FW (session authentication) - V3: S7-1200/1500 pre-TIA V17 (public-key key exchange) - V3 + TLS: TIA Portal V17+ (TLS 1.3 with per-device certs) - -Protocol stack:: - - +-------------------------------+ - | S7CommPlus (Protocol ID 0x72)| - +-------------------------------+ - | TLS 1.3 (optional, V17+) | - +-------------------------------+ - | COTP (ISO 8073) | - +-------------------------------+ - | TPKT (RFC 1006) | - +-------------------------------+ - | TCP (port 102) | - +-------------------------------+ - -The wire protocol (VLQ encoding, data types, function codes, object model) -is the same across all versions -- only the session authentication differs. - -Status: V1 connection functional, V2 (TLS + IntegrityId) scaffolding complete. - -Reference implementation: - https://github.com/thomas-v2/S7CommPlusDriver (C#, LGPL-3.0) -""" diff --git a/tests/conftest.py b/tests/conftest.py index 4e53e6d3..527c8906 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -69,8 +69,8 @@ def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item for mod_name in [ "tests.test_client_e2e", "test_client_e2e", - "tests.test_s7commplus_e2e", - "test_s7commplus_e2e", + "tests.test_s7_e2e", + "test_s7_e2e", ]: e2e = sys.modules.get(mod_name) if e2e is not None: diff --git a/tests/test_coverage_gaps.py b/tests/test_coverage_gaps.py index 3802b4c7..cbd19b06 100644 --- a/tests/test_coverage_gaps.py +++ b/tests/test_coverage_gaps.py @@ -19,8 +19,8 @@ from snap7.error import S7ConnectionError from snap7.server import Server from snap7.type import SrvArea -from snap7.s7commplus.connection import S7CommPlusConnection -from snap7.s7commplus.legitimation import ( +from s7.connection import S7CommPlusConnection +from s7.legitimation import ( LegitimationState, build_legacy_response, ) @@ -158,8 +158,8 @@ def test_legitimation_state_rotate_changes_key(self) -> None: # S7CommPlus async client # ============================================================================ -from snap7.s7commplus.server import S7CommPlusServer # noqa: E402 -from snap7.s7commplus.async_client import S7CommPlusAsyncClient # noqa: E402 +from s7._s7commplus_server import S7CommPlusServer # noqa: E402 +from s7._s7commplus_async_client import S7CommPlusAsyncClient # noqa: E402 ASYNC_TEST_PORT = 11125 @@ -227,13 +227,12 @@ async def test_properties(self, async_server: S7CommPlusServer) -> None: finally: await client.disconnect() - async def test_using_legacy_fallback_property(self, async_server: S7CommPlusServer) -> None: + async def test_session_setup_ok_property(self, async_server: S7CommPlusServer) -> None: client = S7CommPlusAsyncClient() await client.connect("127.0.0.1", port=ASYNC_TEST_PORT) try: - # Server supports S7CommPlus data ops, so no fallback - # (or fallback, depending on server implementation — just check the property works) - assert isinstance(client.using_legacy_fallback, bool) + # Server supports S7CommPlus data ops, so session setup should succeed + assert isinstance(client.session_setup_ok, bool) finally: await client.disconnect() diff --git a/tests/test_s7commplus_codec.py b/tests/test_s7_codec.py similarity index 99% rename from tests/test_s7commplus_codec.py rename to tests/test_s7_codec.py index 9b03881e..2bce0de0 100644 --- a/tests/test_s7commplus_codec.py +++ b/tests/test_s7_codec.py @@ -3,7 +3,7 @@ import struct import pytest -from snap7.s7commplus.codec import ( +from s7.codec import ( encode_header, decode_header, encode_request_header, @@ -35,8 +35,8 @@ encode_object_qualifier, _pvalue_element_size, ) -from snap7.s7commplus.protocol import PROTOCOL_ID, DataType, Opcode, FunctionCode, Ids -from snap7.s7commplus.vlq import encode_uint32_vlq, encode_int32_vlq, encode_uint64_vlq, encode_int64_vlq +from s7.protocol import PROTOCOL_ID, DataType, Opcode, FunctionCode, Ids +from s7.vlq import encode_uint32_vlq, encode_int32_vlq, encode_uint64_vlq, encode_int64_vlq class TestFrameHeader: diff --git a/tests/test_s7commplus_e2e.py b/tests/test_s7_e2e.py similarity index 97% rename from tests/test_s7commplus_e2e.py rename to tests/test_s7_e2e.py index f8c8bf0d..3ab8bbca 100644 --- a/tests/test_s7commplus_e2e.py +++ b/tests/test_s7_e2e.py @@ -2,7 +2,7 @@ These tests require a real PLC connection. Run with: - pytest tests/test_s7commplus_e2e.py --e2e --plc-ip=YOUR_PLC_IP + pytest tests/test_s7_e2e.py --e2e --plc-ip=YOUR_PLC_IP Available options: --e2e Enable e2e tests (required) @@ -47,14 +47,14 @@ import pytest -from snap7.s7commplus.client import S7CommPlusClient +from s7._s7commplus_client import S7CommPlusClient -# Enable DEBUG logging for all s7commplus modules so we get full hex dumps +# Enable DEBUG logging for all s7 modules so we get full hex dumps logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s %(message)s", ) -for _mod in ["snap7.s7commplus.client", "snap7.s7commplus.connection", "snap7.connection"]: +for _mod in ["s7._s7commplus_client", "s7.connection", "snap7.connection"]: logging.getLogger(_mod).setLevel(logging.DEBUG) # ============================================================================= @@ -496,8 +496,8 @@ def test_diag_raw_get_multi_variables(self) -> None: This tries several payload encodings to see which ones the PLC accepts. """ - from snap7.s7commplus.protocol import FunctionCode - from snap7.s7commplus.vlq import encode_uint32_vlq + from s7.protocol import FunctionCode + from s7.vlq import encode_uint32_vlq print(f"\n{'=' * 60}") print("DIAGNOSTIC: Raw GetMultiVariables payload experiments") @@ -523,7 +523,7 @@ def test_diag_raw_get_multi_variables(self) -> None: # Try to parse return code if len(response) > 0: - from snap7.s7commplus.vlq import decode_uint32_vlq + from s7.vlq import decode_uint32_vlq rc, consumed = decode_uint32_vlq(response, 0) print(f" Return code (VLQ): {rc} (0x{rc:X})") @@ -537,7 +537,7 @@ def test_diag_raw_get_multi_variables(self) -> None: def test_diag_raw_set_variable(self) -> None: """Try SetVariable (0x04F2) instead of SetMultiVariables to see if PLC responds differently.""" - from snap7.s7commplus.protocol import FunctionCode + from s7.protocol import FunctionCode print(f"\n{'=' * 60}") print("DIAGNOSTIC: Raw SetVariable / GetVariable experiments") @@ -563,8 +563,8 @@ def test_diag_raw_set_variable(self) -> None: def test_diag_explore_then_read(self) -> None: """Explore first to discover object IDs, then try reading using those IDs.""" - from snap7.s7commplus.protocol import FunctionCode, ElementID - from snap7.s7commplus.vlq import encode_uint32_vlq, decode_uint32_vlq + from s7.protocol import FunctionCode, ElementID + from s7.vlq import encode_uint32_vlq, decode_uint32_vlq print(f"\n{'=' * 60}") print("DIAGNOSTIC: Explore -> extract object IDs -> try reading") diff --git a/tests/test_s7commplus_server.py b/tests/test_s7_server.py similarity index 97% rename from tests/test_s7commplus_server.py rename to tests/test_s7_server.py index 2f08f575..3b980e13 100644 --- a/tests/test_s7commplus_server.py +++ b/tests/test_s7_server.py @@ -7,10 +7,10 @@ import pytest import asyncio -from snap7.s7commplus.server import S7CommPlusServer, CPUState, DataBlock -from snap7.s7commplus.client import S7CommPlusClient -from snap7.s7commplus.async_client import S7CommPlusAsyncClient -from snap7.s7commplus.protocol import ProtocolVersion +from s7._s7commplus_server import S7CommPlusServer, CPUState, DataBlock +from s7._s7commplus_client import S7CommPlusClient +from s7._s7commplus_async_client import S7CommPlusAsyncClient +from s7.protocol import ProtocolVersion # Use a high port to avoid conflicts TEST_PORT = 11120 diff --git a/tests/test_async_tls.py b/tests/test_s7_tls.py similarity index 97% rename from tests/test_async_tls.py rename to tests/test_s7_tls.py index 8c1bd007..7abc7f79 100644 --- a/tests/test_async_tls.py +++ b/tests/test_s7_tls.py @@ -8,9 +8,9 @@ import pytest from snap7.error import S7ConnectionError -from snap7.s7commplus.async_client import S7CommPlusAsyncClient -from snap7.s7commplus.server import S7CommPlusServer -from snap7.s7commplus.protocol import ProtocolVersion +from s7._s7commplus_async_client import S7CommPlusAsyncClient +from s7._s7commplus_server import S7CommPlusServer +from s7.protocol import ProtocolVersion TEST_PORT_V2 = 11130 TEST_PORT_V2_TLS = 11131 diff --git a/tests/test_s7commplus_unit.py b/tests/test_s7_unit.py similarity index 97% rename from tests/test_s7commplus_unit.py rename to tests/test_s7_unit.py index f7c5e57e..f11f0ae3 100644 --- a/tests/test_s7commplus_unit.py +++ b/tests/test_s7_unit.py @@ -3,17 +3,17 @@ import struct import pytest -from snap7.s7commplus.client import ( +from s7._s7commplus_client import ( S7CommPlusClient, _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response, ) -from snap7.s7commplus.codec import encode_pvalue_blob -from snap7.s7commplus.connection import S7CommPlusConnection, _element_size -from snap7.s7commplus.protocol import DataType, ElementID, ObjectId -from snap7.s7commplus.vlq import ( +from s7.codec import encode_pvalue_blob +from s7.connection import S7CommPlusConnection, _element_size +from s7.protocol import DataType, ElementID, ObjectId +from s7.vlq import ( encode_uint32_vlq, encode_uint64_vlq, encode_int32_vlq, @@ -269,7 +269,7 @@ def test_lword(self, conn: S7CommPlusConnection) -> None: assert new_offset == len(vlq) def test_lint(self, conn: S7CommPlusConnection) -> None: - from snap7.s7commplus.vlq import encode_int64_vlq + from s7.vlq import encode_int64_vlq vlq = encode_int64_vlq(-(2**40)) new_offset = conn._skip_typed_value(vlq, 0, DataType.LINT, 0x00) @@ -288,7 +288,7 @@ def test_timestamp(self, conn: S7CommPlusConnection) -> None: assert conn._skip_typed_value(data, 0, DataType.TIMESTAMP, 0x00) == 8 def test_timespan(self, conn: S7CommPlusConnection) -> None: - from snap7.s7commplus.vlq import encode_int64_vlq + from s7.vlq import encode_int64_vlq vlq = encode_int64_vlq(5000) # TIMESPAN uses uint64_vlq for skipping in _skip_typed_value @@ -430,7 +430,7 @@ def test_properties_not_connected(self) -> None: assert client.connected is False assert client.protocol_version == 0 assert client.session_id == 0 - assert client.using_legacy_fallback is False + assert client.session_setup_ok is False def test_db_read_not_connected(self) -> None: client = S7CommPlusClient() diff --git a/tests/test_s7commplus_v2.py b/tests/test_s7_v2.py similarity index 94% rename from tests/test_s7commplus_v2.py rename to tests/test_s7_v2.py index 1a9fc8e7..e8a2250f 100644 --- a/tests/test_s7commplus_v2.py +++ b/tests/test_s7_v2.py @@ -8,20 +8,20 @@ import pytest -from snap7.s7commplus.protocol import ( +from s7.protocol import ( FunctionCode, LegitimationId, ProtocolVersion, READ_FUNCTION_CODES, ) -from snap7.s7commplus.legitimation import ( +from s7.legitimation import ( LegitimationState, build_legacy_response, derive_legitimation_key, _build_legitimation_payload, ) -from snap7.s7commplus.vlq import encode_uint32_vlq, decode_uint32_vlq -from snap7.s7commplus.connection import S7CommPlusConnection +from s7.vlq import encode_uint32_vlq, decode_uint32_vlq +from s7.connection import S7CommPlusConnection class TestReadFunctionCodes: @@ -243,7 +243,7 @@ class TestBuildNewResponse: """Test AES-256-CBC legitimation response building.""" def test_new_response_returns_bytes(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response result = build_new_response( password="test", @@ -253,7 +253,7 @@ def test_new_response_returns_bytes(self) -> None: assert isinstance(result, bytes) def test_new_response_is_aes_block_aligned(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response result = build_new_response( password="test", @@ -264,7 +264,7 @@ def test_new_response_is_aes_block_aligned(self) -> None: assert len(result) % 16 == 0 def test_new_response_different_passwords_differ(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response challenge = b"\xab" * 16 oms = b"\xcd" * 32 @@ -273,7 +273,7 @@ def test_new_response_different_passwords_differ(self) -> None: assert r1 != r2 def test_new_response_different_secrets_differ(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response challenge = b"\xab" * 16 r1 = build_new_response("test", challenge, b"\x00" * 32) @@ -281,7 +281,7 @@ def test_new_response_different_secrets_differ(self) -> None: assert r1 != r2 def test_new_response_with_username(self) -> None: - from snap7.s7commplus.legitimation import build_new_response + from s7.legitimation import build_new_response result = build_new_response( password="test", @@ -294,7 +294,7 @@ def test_new_response_with_username(self) -> None: def test_new_response_decryptable(self) -> None: """Verify the response can be decrypted back to the original payload.""" - from snap7.s7commplus.legitimation import ( + from s7.legitimation import ( build_new_response, derive_legitimation_key, _build_legitimation_payload, diff --git a/tests/test_s7commplus_vlq.py b/tests/test_s7_vlq.py similarity index 99% rename from tests/test_s7commplus_vlq.py rename to tests/test_s7_vlq.py index d7dbb596..70ed5917 100644 --- a/tests/test_s7commplus_vlq.py +++ b/tests/test_s7_vlq.py @@ -2,7 +2,7 @@ import pytest -from snap7.s7commplus.vlq import ( +from s7.vlq import ( encode_uint32_vlq, decode_uint32_vlq, encode_int32_vlq, diff --git a/tox.ini b/tox.ini index 4b10afdb..0b56471b 100644 --- a/tox.ini +++ b/tox.ini @@ -19,18 +19,18 @@ commands = [testenv:mypy] basepython = python3.13 extras = test -commands = mypy {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example +commands = mypy {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example [testenv:lint-ruff] basepython = python3.13 extras = test commands = - ruff check {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example - ruff format --diff {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example + ruff check {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example + ruff format --diff {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example [testenv:ruff] basepython = python3.13 extras = test commands = - ruff format {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example - ruff check --fix {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example + ruff format {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example + ruff check --fix {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example From adacd76ab1bad325020b3cbe29822a0cbe76ed26 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Fri, 27 Mar 2026 10:55:56 +0200 Subject: [PATCH 3/4] Fix return types for drop-in compatibility and update docs - s7.Client.db_write() and s7.AsyncClient.db_write() now return int (0) matching snap7.Client behavior - s7.AsyncClient.disconnect() now returns int (0) matching snap7.AsyncClient - doc/introduction.rst: explain two-package architecture (snap7 vs s7) - doc/connecting.rst: add tip about s7 package for S7-1200/1500 - Keep S7CommPlus marked as experimental throughout Co-Authored-By: Claude Opus 4.6 --- doc/connecting.rst | 14 ++++++++++++++ doc/introduction.rst | 14 ++++++++++++++ s7/async_client.py | 22 +++++++++++++++------- s7/client.py | 19 +++++++++++++------ 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/doc/connecting.rst b/doc/connecting.rst index 8eefe4a6..34f2310e 100644 --- a/doc/connecting.rst +++ b/doc/connecting.rst @@ -78,6 +78,20 @@ S7-1200 / S7-1500 client = snap7.Client() client.connect("192.168.1.10", 0, 1) +.. tip:: + + For S7-1200/1500 PLCs you can also use the **experimental** ``s7`` package, + which automatically tries the newer S7CommPlus protocol and falls back to + legacy S7 when needed:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) + print(client.protocol) # Protocol.S7COMMPLUS or Protocol.LEGACY + + See :doc:`API/s7commplus` for full details. + S7-200 / Logo (TSAP Connection) -------------------------------- diff --git a/doc/introduction.rst b/doc/introduction.rst index cf1d864b..e2583d46 100644 --- a/doc/introduction.rst +++ b/doc/introduction.rst @@ -14,6 +14,20 @@ backwards compatibility. python-snap7 requires Python 3.10+ and runs on Windows, macOS and Linux without any native dependencies. +The library provides two packages: + +- **snap7** -- the original S7 protocol implementation, supporting S7-300, + S7-400, S7-1200 and S7-1500 PLCs via the classic PUT/GET interface. +- **s7** -- a newer unified client that automatically tries the S7CommPlus + protocol (used natively by S7-1200/1500) and falls back to legacy S7 when + needed. ``s7.Client`` is a drop-in replacement for ``snap7.Client``. + +.. note:: + + The ``s7`` package and its S7CommPlus support are **experimental**. + The legacy ``snap7`` package remains fully supported and is the safe choice + for production use. See :doc:`API/s7commplus` for details. + .. note:: **Version 3.0 is a complete rewrite.** Previous versions of python-snap7 diff --git a/s7/async_client.py b/s7/async_client.py index 88995abf..c7bbeb7e 100644 --- a/s7/async_client.py +++ b/s7/async_client.py @@ -167,8 +167,12 @@ async def _try_s7commplus( self._plus = plus return True - async def disconnect(self) -> None: - """Disconnect from PLC.""" + async def disconnect(self) -> int: + """Disconnect from PLC. + + Returns: + 0 on success (matches snap7.AsyncClient). + """ if self._plus is not None: try: await self._plus.disconnect() @@ -184,6 +188,7 @@ async def disconnect(self) -> None: self._legacy = None self._protocol = Protocol.AUTO + return 0 async def db_read(self, db_number: int, start: int, size: int) -> bytearray: """Read raw bytes from a data block.""" @@ -193,14 +198,17 @@ async def db_read(self, db_number: int, start: int, size: int) -> bytearray: return await self._legacy.db_read(db_number, start, size) raise RuntimeError("Not connected") - async def db_write(self, db_number: int, start: int, data: bytearray) -> None: - """Write raw bytes to a data block.""" + async def db_write(self, db_number: int, start: int, data: bytearray) -> int: + """Write raw bytes to a data block. + + Returns: + 0 on success (matches snap7.AsyncClient). + """ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: await self._plus.db_write(db_number, start, bytes(data)) - return + return 0 if self._legacy is not None: - await self._legacy.db_write(db_number, start, data) - return + return await self._legacy.db_write(db_number, start, data) raise RuntimeError("Not connected") async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]: diff --git a/s7/client.py b/s7/client.py index d74d86e7..3b624b88 100644 --- a/s7/client.py +++ b/s7/client.py @@ -176,8 +176,12 @@ def _try_s7commplus( self._plus = plus return True - def disconnect(self) -> None: - """Disconnect from PLC.""" + def disconnect(self) -> int: + """Disconnect from PLC. + + Returns: + 0 on success (matches snap7.Client). + """ if self._plus is not None: try: self._plus.disconnect() @@ -193,6 +197,7 @@ def disconnect(self) -> None: self._legacy = None self._protocol = Protocol.AUTO + return 0 def db_read(self, db_number: int, start: int, size: int) -> bytearray: """Read raw bytes from a data block. @@ -205,17 +210,19 @@ def db_read(self, db_number: int, start: int, size: int) -> bytearray: return self._legacy.db_read(db_number, start, size) raise RuntimeError("Not connected") - def db_write(self, db_number: int, start: int, data: bytearray) -> None: + def db_write(self, db_number: int, start: int, data: bytearray) -> int: """Write raw bytes to a data block. Uses S7CommPlus when available, otherwise legacy S7. + + Returns: + 0 on success (matches snap7.Client). """ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None: self._plus.db_write(db_number, start, bytes(data)) - return + return 0 if self._legacy is not None: - self._legacy.db_write(db_number, start, data) - return + return self._legacy.db_write(db_number, start, data) raise RuntimeError("Not connected") def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]: From 651ca496f33cc602faa962c06d26a475fd97ef92 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Wed, 1 Apr 2026 09:08:46 +0200 Subject: [PATCH 4/4] Update README with restructured landing page and 3.1 release notes Reorganize README: introduction first, then installation, then release notes for 3.0 and the upcoming 3.1 (S7CommPlus support). Add call for testing with a list of PLCs that need verification. Co-Authored-By: Claude Opus 4.6 --- README.rst | 93 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 27 deletions(-) diff --git a/README.rst b/README.rst index db97e768..8a097efb 100644 --- a/README.rst +++ b/README.rst @@ -13,55 +13,94 @@ .. image:: https://readthedocs.org/projects/python-snap7/badge/ :target: https://python-snap7.readthedocs.io/en/latest/ -About -===== -Python-snap7 is a pure Python S7 communication library for interfacing with Siemens S7 PLCs. +python-snap7 +============ -The name "python-snap7" is historical — the library originally started as a Python wrapper -around the `Snap7 `_ C library. As of version 3.0, the C -library is no longer used, but the name is kept for backwards compatibility. +Python-snap7 is a pure Python S7 communication library for interfacing with +Siemens S7 PLCs. It supports Python 3.10+ and runs on Windows, Linux, and macOS +without any native dependencies. -Python-snap7 is tested with Python 3.10+, on Windows, Linux and OS X. +The name "python-snap7" is historical — the library originally started as a +Python wrapper around the `Snap7 `_ C library. +As of version 3.0, the C library is no longer used, but the name is kept for +backwards compatibility. The full documentation is available on `Read The Docs `_. -Version 3.0 - Pure Python Rewrite -================================== +Installation +============ + +Install using pip:: -Version 3.0 is a ground-up rewrite of python-snap7. The library no longer wraps the -C snap7 shared library — instead, the entire S7 protocol stack (TPKT, COTP, and S7) -is now implemented in pure Python. This is a **breaking change** from all previous -versions. + $ pip install python-snap7 + +No native libraries or platform-specific dependencies are required — python-snap7 +is a pure Python package that works on all platforms. + + +Version 3.0 — Pure Python Rewrite +================================== -**Why this matters:** +Version 3.0 was a ground-up rewrite of python-snap7. The library no longer wraps +the C snap7 shared library — instead, the entire S7 protocol stack (TPKT, COTP, +and S7) is implemented in pure Python. -* **Portability**: No more platform-specific shared libraries (`.dll`, `.so`, `.dylib`). - python-snap7 now works on any platform that runs Python — including ARM, Alpine Linux, - and other environments where the C library was difficult or impossible to install. +* **Portability**: No more platform-specific shared libraries (``.dll``, ``.so``, ``.dylib``). + Works on any platform that runs Python — including ARM, Alpine Linux, and other + environments where the C library was difficult or impossible to install. * **Easier installation**: Just ``pip install python-snap7``. No native dependencies, no compiler toolchains, no manual library setup. * **Easier to extend**: New features and protocol support can be added directly in Python. **If you experience issues with 3.0:** -1. Please report them on the `issue tracker `_ - with a clear description of the problem and the version you are using - (``python -c "import snap7; print(snap7.__version__)"``). +1. Please report them on the `issue tracker `_. 2. As a workaround, you can pin to the last pre-3.0 release:: $ pip install "python-snap7<3" - The latest stable pre-3.0 release is version 2.1.0. Documentation for pre-3.0 - versions is available at `Read The Docs `_. + Documentation for pre-3.0 versions is available at + `Read The Docs `_. -Installation -============ +Version 3.1 — S7CommPlus Protocol Support (unreleased) +======================================================= -Install using pip:: +Version 3.1 adds support for the S7CommPlus protocol (up to V3), which is required +for communicating with newer Siemens S7-1200 and S7-1500 PLCs that have PUT/GET +disabled. This is fully backwards compatible with 3.0. - $ pip install python-snap7 +The biggest change is the new ``s7`` module, which is now the recommended entry point +for connecting to any supported S7 PLC:: + + from s7 import Client + + client = Client() + client.connect("192.168.1.10", 0, 1) # auto-detects S7CommPlus vs legacy S7 + data = client.db_read(1, 0, 4) + client.disconnect() + +The ``s7.Client`` automatically tries S7CommPlus first, and falls back to legacy S7 +if the PLC does not support it. The existing ``snap7.Client`` continues to work +unchanged for legacy S7 connections. + +**Help us test!** Version 3.1 needs more real-world testing before release. If you +have access to any of the following PLCs, we would greatly appreciate testing and +feedback: + +* S7-1200 (any firmware version) +* S7-1500 (any firmware version) +* S7-1500 with TLS enabled +* S7-300 +* S7-400 +* S7-1200/1500 with PUT/GET disabled (S7CommPlus-only) +* LOGO! 0BA8 and newer + +Please report your results — whether it works or not — on the +`issue tracker `_. + +To install the development version:: -No native libraries or platform-specific dependencies are required — python-snap7 is a pure Python package that works on all platforms. + $ pip install git+https://github.com/gijzelaerr/python-snap7.git@master