diff --git a/CLAUDE.md b/CLAUDE.md
index 5353da6e..4998f4a8 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -8,6 +8,7 @@ Python-snap7 is a pure Python S7 communication library for interfacing with Siem
## Key Architecture
+### snap7/ — Legacy S7 protocol (S7-300/400, PUT/GET on S7-1200/1500)
- **snap7/client.py**: Main Client class for connecting to S7 PLCs
- **snap7/server.py**: Server implementation for PLC simulation
- **snap7/logo.py**: Logo PLC communication
@@ -19,6 +20,20 @@ Python-snap7 is a pure Python S7 communication library for interfacing with Siem
- **snap7/type.py**: Type definitions and enums (Area, Block, WordLen, etc.)
- **snap7/error.py**: Error handling and exceptions
+### s7/ — Unified client with S7CommPlus + legacy fallback
+- **s7/client.py**: Unified Client — tries S7CommPlus, falls back to snap7.Client
+- **s7/async_client.py**: Unified AsyncClient — same pattern, async
+- **s7/server.py**: Unified Server wrapping both legacy and S7CommPlus
+- **s7/_protocol.py**: Protocol enum (AUTO/LEGACY/S7COMMPLUS)
+- **s7/_s7commplus_client.py**: Pure S7CommPlus sync client (internal)
+- **s7/_s7commplus_async_client.py**: Pure S7CommPlus async client (internal)
+- **s7/_s7commplus_server.py**: S7CommPlus server emulator (internal)
+- **s7/connection.py**: S7CommPlus low-level connection
+- **s7/protocol.py**: S7CommPlus protocol constants/enums
+- **s7/codec.py**: S7CommPlus encoding/decoding
+- **s7/vlq.py**: Variable-Length Quantity encoding
+- **s7/legitimation.py**: Authentication helpers
+
## Implementation Details
### Protocol Stack
@@ -41,24 +56,31 @@ The library implements the complete S7 protocol stack:
- Block operations (list, info, upload, download)
- Date/time operations
-### Usage
+### Usage (unified s7 package — recommended for S7-1200/1500)
+
+```python
+from s7 import Client
+
+client = Client()
+client.connect("192.168.1.10", 0, 1) # auto-detects S7CommPlus vs legacy
+data = client.db_read(1, 0, 4)
+client.disconnect()
+```
+
+### Usage (legacy snap7 package — S7-300/400)
```python
import snap7
-# Create and connect client
client = snap7.Client()
client.connect("192.168.1.10", 0, 1)
-# Read/write operations
data = client.db_read(1, 0, 4)
client.db_write(1, 0, bytearray([1, 2, 3, 4]))
-# Memory area access
marker_data = client.mb_read(0, 4)
client.mb_write(0, 4, bytearray([1, 2, 3, 4]))
-# Disconnect
client.disconnect()
```
@@ -98,15 +120,15 @@ pytest tests/test_client.py
### Code Quality
```bash
# Type checking
-mypy snap7 tests example
+mypy snap7 s7 tests example
# Linting and formatting check
-ruff check snap7 tests example
-ruff format --diff snap7 tests example
+ruff check snap7 s7 tests example
+ruff format --diff snap7 s7 tests example
# Auto-format code
-ruff format snap7 tests example
-ruff check --fix snap7 tests example
+ruff format snap7 s7 tests example
+ruff check --fix snap7 s7 tests example
```
### Development with tox
diff --git a/Makefile b/Makefile
index ab74d2ee..fcaf4b07 100644
--- a/Makefile
+++ b/Makefile
@@ -29,8 +29,8 @@ doc: .venv/bin/sphinx-build
.PHONY: check
check: .venv/bin/pytest
- uv run ruff check snap7 tests example
- uv run ruff format --diff snap7 tests example
+ uv run ruff check snap7 s7 tests example
+ uv run ruff format --diff snap7 s7 tests example
.PHONY: ruff
ruff: .venv/bin/tox
diff --git a/README.rst b/README.rst
index db97e768..8a097efb 100644
--- a/README.rst
+++ b/README.rst
@@ -13,55 +13,94 @@
.. image:: https://readthedocs.org/projects/python-snap7/badge/
:target: https://python-snap7.readthedocs.io/en/latest/
-About
-=====
-Python-snap7 is a pure Python S7 communication library for interfacing with Siemens S7 PLCs.
+python-snap7
+============
-The name "python-snap7" is historical — the library originally started as a Python wrapper
-around the `Snap7 `_ C library. As of version 3.0, the C
-library is no longer used, but the name is kept for backwards compatibility.
+Python-snap7 is a pure Python S7 communication library for interfacing with
+Siemens S7 PLCs. It supports Python 3.10+ and runs on Windows, Linux, and macOS
+without any native dependencies.
-Python-snap7 is tested with Python 3.10+, on Windows, Linux and OS X.
+The name "python-snap7" is historical — the library originally started as a
+Python wrapper around the `Snap7 `_ C library.
+As of version 3.0, the C library is no longer used, but the name is kept for
+backwards compatibility.
The full documentation is available on `Read The Docs `_.
-Version 3.0 - Pure Python Rewrite
-==================================
+Installation
+============
+
+Install using pip::
-Version 3.0 is a ground-up rewrite of python-snap7. The library no longer wraps the
-C snap7 shared library — instead, the entire S7 protocol stack (TPKT, COTP, and S7)
-is now implemented in pure Python. This is a **breaking change** from all previous
-versions.
+ $ pip install python-snap7
+
+No native libraries or platform-specific dependencies are required — python-snap7
+is a pure Python package that works on all platforms.
+
+
+Version 3.0 — Pure Python Rewrite
+==================================
-**Why this matters:**
+Version 3.0 was a ground-up rewrite of python-snap7. The library no longer wraps
+the C snap7 shared library — instead, the entire S7 protocol stack (TPKT, COTP,
+and S7) is implemented in pure Python.
-* **Portability**: No more platform-specific shared libraries (`.dll`, `.so`, `.dylib`).
- python-snap7 now works on any platform that runs Python — including ARM, Alpine Linux,
- and other environments where the C library was difficult or impossible to install.
+* **Portability**: No more platform-specific shared libraries (``.dll``, ``.so``, ``.dylib``).
+ Works on any platform that runs Python — including ARM, Alpine Linux, and other
+ environments where the C library was difficult or impossible to install.
* **Easier installation**: Just ``pip install python-snap7``. No native dependencies,
no compiler toolchains, no manual library setup.
* **Easier to extend**: New features and protocol support can be added directly in Python.
**If you experience issues with 3.0:**
-1. Please report them on the `issue tracker `_
- with a clear description of the problem and the version you are using
- (``python -c "import snap7; print(snap7.__version__)"``).
+1. Please report them on the `issue tracker `_.
2. As a workaround, you can pin to the last pre-3.0 release::
$ pip install "python-snap7<3"
- The latest stable pre-3.0 release is version 2.1.0. Documentation for pre-3.0
- versions is available at `Read The Docs `_.
+ Documentation for pre-3.0 versions is available at
+ `Read The Docs `_.
-Installation
-============
+Version 3.1 — S7CommPlus Protocol Support (unreleased)
+=======================================================
-Install using pip::
+Version 3.1 adds support for the S7CommPlus protocol (up to V3), which is required
+for communicating with newer Siemens S7-1200 and S7-1500 PLCs that have PUT/GET
+disabled. This is fully backwards compatible with 3.0.
- $ pip install python-snap7
+The biggest change is the new ``s7`` module, which is now the recommended entry point
+for connecting to any supported S7 PLC::
+
+ from s7 import Client
+
+ client = Client()
+ client.connect("192.168.1.10", 0, 1) # auto-detects S7CommPlus vs legacy S7
+ data = client.db_read(1, 0, 4)
+ client.disconnect()
+
+The ``s7.Client`` automatically tries S7CommPlus first, and falls back to legacy S7
+if the PLC does not support it. The existing ``snap7.Client`` continues to work
+unchanged for legacy S7 connections.
+
+**Help us test!** Version 3.1 needs more real-world testing before release. If you
+have access to any of the following PLCs, we would greatly appreciate testing and
+feedback:
+
+* S7-1200 (any firmware version)
+* S7-1500 (any firmware version)
+* S7-1500 with TLS enabled
+* S7-300
+* S7-400
+* S7-1200/1500 with PUT/GET disabled (S7CommPlus-only)
+* LOGO! 0BA8 and newer
+
+Please report your results — whether it works or not — on the
+`issue tracker `_.
+
+To install the development version::
-No native libraries or platform-specific dependencies are required — python-snap7 is a pure Python package that works on all platforms.
+ $ pip install git+https://github.com/gijzelaerr/python-snap7.git@master
diff --git a/doc/API/s7commplus.rst b/doc/API/s7commplus.rst
index bd5ceecb..48066e91 100644
--- a/doc/API/s7commplus.rst
+++ b/doc/API/s7commplus.rst
@@ -7,24 +7,21 @@ S7CommPlus (S7-1200/1500)
releases. If you encounter problems, please `open an issue
`_.
-The :mod:`snap7.s7commplus` package provides support for Siemens S7-1200 and
-S7-1500 PLCs, which use the S7CommPlus protocol instead of the classic S7
-protocol used by S7-300/400.
-
-Both synchronous and asynchronous clients are available. When a PLC does not
-support S7CommPlus data operations, the clients automatically fall back to the
-legacy S7 protocol transparently.
+The ``s7`` package provides a unified client for Siemens S7-1200 and S7-1500
+PLCs. It automatically tries the S7CommPlus protocol first and falls back to
+the legacy S7 protocol when needed.
Synchronous client
------------------
.. code-block:: python
- from snap7.s7commplus.client import S7CommPlusClient
+ from s7 import Client
- client = S7CommPlusClient()
- client.connect("192.168.1.10")
+ client = Client()
+ client.connect("192.168.1.10", 0, 1)
data = client.db_read(1, 0, 4)
+ print(client.protocol) # Protocol.S7COMMPLUS or Protocol.LEGACY
client.disconnect()
Asynchronous client
@@ -33,11 +30,11 @@ Asynchronous client
.. code-block:: python
import asyncio
- from snap7.s7commplus.async_client import S7CommPlusAsyncClient
+ from s7 import AsyncClient
async def main():
- client = S7CommPlusAsyncClient()
- await client.connect("192.168.1.10")
+ client = AsyncClient()
+ await client.connect("192.168.1.10", 0, 1)
data = await client.db_read(1, 0, 4)
await client.disconnect()
@@ -51,10 +48,10 @@ S7-1500 PLCs with firmware 2.x use S7CommPlus V2, which requires TLS. Pass
.. code-block:: python
- from snap7.s7commplus.client import S7CommPlusClient
+ from s7 import Client
- client = S7CommPlusClient()
- client.connect("192.168.1.10", use_tls=True)
+ client = Client()
+ client.connect("192.168.1.10", 0, 1, use_tls=True)
data = client.db_read(1, 0, 4)
client.disconnect()
@@ -63,7 +60,7 @@ For PLCs with custom certificates, provide the certificate paths:
.. code-block:: python
client.connect(
- "192.168.1.10",
+ "192.168.1.10", 0, 1,
use_tls=True,
tls_cert="/path/to/client.pem",
tls_key="/path/to/client.key",
@@ -73,56 +70,39 @@ For PLCs with custom certificates, provide the certificate paths:
Password authentication
-----------------------
-Password-protected PLCs require authentication after connecting. Call
-``authenticate()`` before performing data operations:
+Password-protected PLCs require the ``password`` keyword argument:
.. code-block:: python
- from snap7.s7commplus.client import S7CommPlusClient
-
- client = S7CommPlusClient()
- client.connect("192.168.1.10", use_tls=True)
- client.authenticate(password="my_plc_password")
+ from s7 import Client
+ client = Client()
+ client.connect("192.168.1.10", 0, 1, use_tls=True, password="my_plc_password")
data = client.db_read(1, 0, 4)
client.disconnect()
-The method auto-detects whether to use legacy (SHA-1 XOR) or new-style
-(AES-256-CBC) authentication based on the PLC firmware version. For new-style
-authentication, you can also provide a username:
-
-.. code-block:: python
-
- client.authenticate(password="my_password", username="admin")
-
-.. note::
-
- Authentication requires TLS to be active. Calling ``authenticate()``
- without ``use_tls=True`` will raise :class:`~snap7.error.S7ConnectionError`.
+Protocol selection
+------------------
+By default the client uses ``Protocol.AUTO`` which tries S7CommPlus first.
+You can force a specific protocol:
-Legacy fallback
----------------
+.. code-block:: python
-If the PLC returns an error for S7CommPlus data operations (common with some
-firmware versions), the client automatically falls back to the classic S7
-protocol. You can check whether fallback is active:
+ from s7 import Client, Protocol
-.. code-block:: python
+ # Force legacy S7 only
+ client = Client()
+ client.connect("192.168.1.10", 0, 1, protocol=Protocol.LEGACY)
- client.connect("192.168.1.10")
- if client.using_legacy_fallback:
- print("Using legacy S7 protocol")
+ # Force S7CommPlus (raises on failure)
+ client.connect("192.168.1.10", 0, 1, protocol=Protocol.S7COMMPLUS)
API reference
-------------
-.. automodule:: snap7.s7commplus.client
- :members:
-
-.. automodule:: snap7.s7commplus.async_client
+.. automodule:: s7.client
:members:
-.. automodule:: snap7.s7commplus.connection
+.. automodule:: s7.async_client
:members:
- :exclude-members: S7CommPlusConnection
diff --git a/doc/connecting.rst b/doc/connecting.rst
index 8eefe4a6..34f2310e 100644
--- a/doc/connecting.rst
+++ b/doc/connecting.rst
@@ -78,6 +78,20 @@ S7-1200 / S7-1500
client = snap7.Client()
client.connect("192.168.1.10", 0, 1)
+.. tip::
+
+ For S7-1200/1500 PLCs you can also use the **experimental** ``s7`` package,
+ which automatically tries the newer S7CommPlus protocol and falls back to
+ legacy S7 when needed::
+
+ from s7 import Client
+
+ client = Client()
+ client.connect("192.168.1.10", 0, 1)
+ print(client.protocol) # Protocol.S7COMMPLUS or Protocol.LEGACY
+
+ See :doc:`API/s7commplus` for full details.
+
S7-200 / Logo (TSAP Connection)
--------------------------------
diff --git a/doc/introduction.rst b/doc/introduction.rst
index cf1d864b..e2583d46 100644
--- a/doc/introduction.rst
+++ b/doc/introduction.rst
@@ -14,6 +14,20 @@ backwards compatibility.
python-snap7 requires Python 3.10+ and runs on Windows, macOS and Linux
without any native dependencies.
+The library provides two packages:
+
+- **snap7** -- the original S7 protocol implementation, supporting S7-300,
+ S7-400, S7-1200 and S7-1500 PLCs via the classic PUT/GET interface.
+- **s7** -- a newer unified client that automatically tries the S7CommPlus
+ protocol (used natively by S7-1200/1500) and falls back to legacy S7 when
+ needed. ``s7.Client`` is a drop-in replacement for ``snap7.Client``.
+
+.. note::
+
+ The ``s7`` package and its S7CommPlus support are **experimental**.
+ The legacy ``snap7`` package remains fully supported and is the safe choice
+ for production use. See :doc:`API/s7commplus` for details.
+
.. note::
**Version 3.0 is a complete rewrite.** Previous versions of python-snap7
diff --git a/doc/limitations.rst b/doc/limitations.rst
index c270ab06..03a220a8 100644
--- a/doc/limitations.rst
+++ b/doc/limitations.rst
@@ -26,5 +26,5 @@ are **not possible** with this protocol:
individual blocks, but this is not a complete backup.
* - Access S7-1200/1500 PLCs with S7CommPlus security
- python-snap7 supports S7CommPlus V1 and V2 (with TLS) via
- :mod:`snap7.s7commplus`. V3 is not yet supported. For PLCs that only
+ the ``s7`` package. V3 is not yet supported. For PLCs that only
support V3, enable PUT/GET as a fallback or use OPC UA.
diff --git a/doc/plc-support.rst b/doc/plc-support.rst
index 1d2b0e59..3df53e30 100644
--- a/doc/plc-support.rst
+++ b/doc/plc-support.rst
@@ -59,7 +59,7 @@ Supported PLCs
- No
- V2
- **Full** (S7CommPlus V2)
- - S7CommPlus V2 with TLS is supported via :mod:`snap7.s7commplus`.
+ - S7CommPlus V2 with TLS is supported via the ``s7`` package.
* - S7-1500 (FW 3.x+)
- ~2022
- PUT/GET only
@@ -143,7 +143,7 @@ Siemens has evolved their PLC communication protocols over time:
python-snap7 implements the **classic S7 protocol** and **S7CommPlus V1/V2**.
The classic protocol remains available on most PLC families via the PUT/GET
mechanism. S7CommPlus V1 and V2 (with TLS) are supported via the
-:mod:`snap7.s7commplus` package. For PLCs that require S7CommPlus V3 (such
+``s7`` package. For PLCs that require S7CommPlus V3 (such
as the S7-1500R/H), consider using OPC UA as an alternative.
diff --git a/pyproject.toml b/pyproject.toml
index b865de58..ca23f81a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -40,9 +40,10 @@ discovery = ["pnio-dcp"]
[tool.setuptools.package-data]
snap7 = ["py.typed"]
+s7 = ["py.typed"]
[tool.setuptools.packages.find]
-include = ["snap7*"]
+include = ["snap7*", "s7*"]
[project.scripts]
snap7-server = "snap7.server:mainloop"
diff --git a/s7/__init__.py b/s7/__init__.py
new file mode 100644
index 00000000..1cfaa189
--- /dev/null
+++ b/s7/__init__.py
@@ -0,0 +1,35 @@
+"""Unified S7 communication library.
+
+Provides protocol-agnostic access to Siemens S7 PLCs with automatic
+protocol discovery (S7CommPlus vs legacy S7).
+
+Usage::
+
+ from s7 import Client
+
+ client = Client()
+ client.connect("192.168.1.10", 0, 1)
+ data = client.db_read(1, 0, 4)
+"""
+
+from .client import Client
+from .async_client import AsyncClient
+from .server import Server
+from ._protocol import Protocol
+
+from snap7.type import Area, Block, WordLen, SrvEvent, SrvArea
+from snap7.util.db import Row, DB
+
+__all__ = [
+ "Client",
+ "AsyncClient",
+ "Server",
+ "Protocol",
+ "Area",
+ "Block",
+ "WordLen",
+ "SrvEvent",
+ "SrvArea",
+ "Row",
+ "DB",
+]
diff --git a/s7/_protocol.py b/s7/_protocol.py
new file mode 100644
index 00000000..bad2be02
--- /dev/null
+++ b/s7/_protocol.py
@@ -0,0 +1,17 @@
+"""Protocol enum for unified S7 client."""
+
+from enum import Enum
+
+
+class Protocol(Enum):
+ """S7 communication protocol selection.
+
+ Attributes:
+ AUTO: Try S7CommPlus first, fall back to legacy S7 if unsupported.
+ LEGACY: Use legacy S7 protocol only (S7-300/400, basic S7-1200/1500).
+ S7COMMPLUS: Use S7CommPlus protocol only (S7-1200/1500 with full access).
+ """
+
+ AUTO = "auto"
+ LEGACY = "legacy"
+ S7COMMPLUS = "s7commplus"
diff --git a/snap7/s7commplus/async_client.py b/s7/_s7commplus_async_client.py
similarity index 73%
rename from snap7/s7commplus/async_client.py
rename to s7/_s7commplus_async_client.py
index 70f831e1..25559c44 100644
--- a/snap7/s7commplus/async_client.py
+++ b/s7/_s7commplus_async_client.py
@@ -1,19 +1,10 @@
-"""
-Async S7CommPlus client for S7-1200/1500 PLCs.
-
-Provides the same API as S7CommPlusClient but using asyncio for
-non-blocking I/O. Uses asyncio.Lock for concurrent safety.
+"""Pure async S7CommPlus client for S7-1200/1500 PLCs (no legacy fallback).
-When a PLC does not support S7CommPlus data operations, the client
-transparently falls back to the legacy S7 protocol for data block
-read/write operations (using synchronous calls in an executor).
+This is an internal module used by the unified ``s7.AsyncClient``. It provides
+raw S7CommPlus data operations without any fallback logic -- the unified
+client is responsible for deciding when to fall back to legacy S7.
-Example::
-
- async with S7CommPlusAsyncClient() as client:
- await client.connect("192.168.1.10")
- data = await client.db_read(1, 0, 4)
- await client.db_write(1, 0, struct.pack(">f", 23.5))
+Reference: thomas-v2/S7CommPlusDriver (C#, LGPL-3.0)
"""
import asyncio
@@ -35,7 +26,7 @@
)
from .codec import encode_header, decode_header, encode_typed_value, encode_object_qualifier
from .vlq import encode_uint32_vlq, decode_uint32_vlq, decode_uint64_vlq
-from .client import _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response
+from ._s7commplus_client import _build_read_payload, _parse_read_response, _build_write_payload, _parse_write_response
logger = logging.getLogger(__name__)
@@ -46,17 +37,9 @@
class S7CommPlusAsyncClient:
- """Async S7CommPlus client for S7-1200/1500 PLCs.
-
- Supports all S7CommPlus protocol versions (V1/V2/V3/TLS). The protocol
- version is auto-detected from the PLC's CreateObject response during
- connection setup.
+ """Pure async S7CommPlus client without legacy fallback.
- Uses asyncio for all I/O operations and asyncio.Lock for
- concurrent safety when shared between multiple coroutines.
-
- When the PLC does not support S7CommPlus data operations, the client
- automatically falls back to legacy S7 protocol for db_read/db_write.
+ Use ``s7.AsyncClient`` for automatic protocol selection.
"""
def __init__(self) -> None:
@@ -67,12 +50,6 @@ def __init__(self) -> None:
self._protocol_version: int = 0
self._connected = False
self._lock = asyncio.Lock()
- self._legacy_client: Optional[Any] = None
- self._use_legacy_data: bool = False
- self._host: str = ""
- self._port: int = 102
- self._rack: int = 0
- self._slot: int = 1
# V2+ IntegrityId tracking
self._integrity_id_read: int = 0
@@ -83,11 +60,10 @@ def __init__(self) -> None:
self._tls_active: bool = False
self._oms_secret: Optional[bytes] = None
self._server_session_version: Optional[int] = None
+ self._session_setup_ok: bool = False
@property
def connected(self) -> bool:
- if self._use_legacy_data and self._legacy_client is not None:
- return bool(self._legacy_client.connected)
return self._connected
@property
@@ -99,9 +75,9 @@ def session_id(self) -> int:
return self._session_id
@property
- def using_legacy_fallback(self) -> bool:
- """Whether the client is using legacy S7 protocol for data operations."""
- return self._use_legacy_data
+ def session_setup_ok(self) -> bool:
+ """Whether the S7CommPlus session setup succeeded for data operations."""
+ return self._session_setup_ok
@property
def tls_active(self) -> bool:
@@ -125,32 +101,19 @@ async def connect(
tls_key: Optional[str] = None,
tls_ca: Optional[str] = None,
) -> None:
- """Connect to an S7-1200/1500 PLC.
-
- The connection sequence:
- 1. COTP connection (same as legacy S7comm)
- 2. InitSSL handshake
- 3. TLS activation (if use_tls=True, required for V2)
- 4. CreateObject to establish S7CommPlus session
- 5. Enable IntegrityId tracking (V2+)
-
- If the PLC does not support S7CommPlus data operations, a secondary
- legacy S7 connection is established transparently for data access.
+ """Connect to an S7-1200/1500 PLC using S7CommPlus.
Args:
host: PLC IP address or hostname
port: TCP port (default 102)
- rack: PLC rack number
- slot: PLC slot number
+ rack: PLC rack number (unused, kept for API symmetry)
+ slot: PLC slot number (unused, kept for API symmetry)
use_tls: Whether to activate TLS after InitSSL.
tls_cert: Path to client TLS certificate (PEM)
tls_key: Path to client private key (PEM)
tls_ca: Path to CA certificate for PLC verification (PEM)
"""
self._host = host
- self._port = port
- self._rack = rack
- self._slot = slot
# TCP connect
self._reader, self._writer = await asyncio.open_connection(host, port)
@@ -169,7 +132,7 @@ async def connect(
# Step 4: S7CommPlus session setup (CreateObject)
await self._create_session()
- # Step 5: Version-specific validation (before session setup handshake)
+ # Step 5: Version-specific validation
if self._protocol_version >= ProtocolVersion.V3:
if not use_tls:
logger.warning(
@@ -177,10 +140,9 @@ async def connect(
)
elif self._protocol_version == ProtocolVersion.V2:
if not self._tls_active:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("PLC reports V2 protocol but TLS is not active. V2 requires TLS. Use use_tls=True.")
- # Enable IntegrityId tracking for V2+
self._with_integrity_id = True
self._integrity_id_read = 0
self._integrity_id_write = 0
@@ -190,21 +152,16 @@ async def connect(
# Step 6: Session setup - echo ServerSessionVersion back to PLC
if self._server_session_version is not None:
- session_setup_ok = await self._setup_session()
+ self._session_setup_ok = await self._setup_session()
else:
logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete")
- session_setup_ok = False
+ self._session_setup_ok = False
logger.info(
f"Async S7CommPlus connected to {host}:{port}, "
f"version=V{self._protocol_version}, session={self._session_id}, "
f"tls={self._tls_active}"
)
- # Check if S7CommPlus session setup succeeded
- if not session_setup_ok:
- logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol")
- await self._setup_legacy_fallback()
-
except Exception:
await self.disconnect()
raise
@@ -212,12 +169,6 @@ async def connect(
async def authenticate(self, password: str, username: str = "") -> None:
"""Perform PLC password authentication (legitimation).
- Must be called after connect() and before data operations on
- password-protected PLCs. Requires TLS to be active (V2+).
-
- The method auto-detects legacy vs new legitimation based on
- the PLC's firmware version.
-
Args:
password: PLC password
username: Username for new-style auth (optional)
@@ -226,20 +177,18 @@ async def authenticate(self, password: str, username: str = "") -> None:
S7ConnectionError: If not connected, TLS not active, or auth fails
"""
if not self._connected:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Not connected")
if not self._tls_active or self._oms_secret is None:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Legitimation requires TLS. Connect with use_tls=True.")
- # Step 1: Get challenge from PLC
challenge = await self._get_legitimation_challenge()
logger.info(f"Received legitimation challenge ({len(challenge)} bytes)")
- # Step 2: Build response (auto-detect legacy vs new)
from .legitimation import build_legacy_response, build_new_response
if username:
@@ -261,28 +210,17 @@ async def _activate_tls(
tls_key: Optional[str] = None,
tls_ca: Optional[str] = None,
) -> None:
- """Activate TLS 1.3 over the COTP connection.
-
- Called after InitSSL and before CreateObject. Uses asyncio's
- start_tls() to upgrade the existing connection to TLS.
-
- Args:
- tls_cert: Path to client TLS certificate (PEM)
- tls_key: Path to client private key (PEM)
- tls_ca: Path to CA certificate for PLC verification (PEM)
- """
+ """Activate TLS 1.3 over the COTP connection."""
if self._writer is None:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Cannot activate TLS: not connected")
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
- # TLS 1.3 ciphersuites are configured differently from TLS 1.2
if hasattr(ctx, "set_ciphersuites"):
ctx.set_ciphersuites("TLS_AES_256_GCM_SHA384:TLS_AES_128_GCM_SHA256")
- # If set_ciphersuites not available, TLS 1.3 uses its mandatory defaults
if tls_cert and tls_key:
ctx.load_cert_chain(tls_cert, tls_key)
@@ -293,7 +231,6 @@ async def _activate_tls(
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
- # Upgrade existing transport to TLS using asyncio start_tls
transport = self._writer.transport
loop = asyncio.get_event_loop()
new_transport = await loop.start_tls(
@@ -303,13 +240,11 @@ async def _activate_tls(
server_hostname=self._host,
)
- # Update reader/writer to use the TLS transport
self._writer._transport = new_transport
self._tls_active = True
- # Extract OMS exporter secret for legitimation key derivation
if new_transport is None:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("TLS handshake failed: no transport returned")
@@ -325,13 +260,7 @@ async def _activate_tls(
logger.info("TLS 1.3 activated on async COTP connection")
async def _get_legitimation_challenge(self) -> bytes:
- """Request legitimation challenge from PLC.
-
- Sends GetVarSubStreamed with address ServerSessionRequest (303).
-
- Returns:
- Challenge bytes from PLC
- """
+ """Request legitimation challenge from PLC."""
from .protocol import LegitimationId, DataType as DT
payload = bytearray()
@@ -348,12 +277,12 @@ async def _get_legitimation_challenge(self) -> bytes:
offset += consumed
if return_value != 0:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError(f"GetVarSubStreamed for challenge failed: return_value={return_value}")
if offset + 2 > len(resp_payload):
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Challenge response too short")
@@ -388,7 +317,7 @@ async def _send_legitimation_new(self, encrypted_response: bytes) -> None:
if len(resp_payload) >= 1:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value < 0:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError(f"Legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"New legitimation return_value={return_value}")
@@ -411,32 +340,13 @@ async def _send_legitimation_legacy(self, response: bytes) -> None:
if len(resp_payload) >= 1:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value < 0:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"Legacy legitimation return_value={return_value}")
- async def _setup_legacy_fallback(self) -> None:
- """Establish a secondary legacy S7 connection for data operations."""
- from ..client import Client
-
- loop = asyncio.get_event_loop()
- client = Client()
- await loop.run_in_executor(None, lambda: client.connect(self._host, self._rack, self._slot, self._port))
- self._legacy_client = client
- self._use_legacy_data = True
- logger.info(f"Legacy S7 fallback connected to {self._host}:{self._port}")
-
async def disconnect(self) -> None:
"""Disconnect from PLC."""
- if self._legacy_client is not None:
- try:
- self._legacy_client.disconnect()
- except Exception:
- pass
- self._legacy_client = None
- self._use_legacy_data = False
-
if self._connected and self._session_id:
try:
await self._delete_session()
@@ -453,6 +363,7 @@ async def disconnect(self) -> None:
self._tls_active = False
self._oms_secret = None
self._server_session_version = None
+ self._session_setup_ok = False
if self._writer:
try:
@@ -464,22 +375,7 @@ async def disconnect(self) -> None:
self._reader = None
async def db_read(self, db_number: int, start: int, size: int) -> bytes:
- """Read raw bytes from a data block.
-
- Args:
- db_number: Data block number
- start: Start byte offset
- size: Number of bytes to read
-
- Returns:
- Raw bytes read from the data block
- """
- if self._use_legacy_data and self._legacy_client is not None:
- client = self._legacy_client
- loop = asyncio.get_event_loop()
- data = await loop.run_in_executor(None, lambda: client.db_read(db_number, start, size))
- return bytes(data)
-
+ """Read raw bytes from a data block."""
payload = _build_read_payload([(db_number, start, size)])
response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload)
@@ -491,67 +387,26 @@ async def db_read(self, db_number: int, start: int, size: int) -> bytes:
return results[0]
async def db_write(self, db_number: int, start: int, data: bytes) -> None:
- """Write raw bytes to a data block.
-
- Args:
- db_number: Data block number
- start: Start byte offset
- data: Bytes to write
- """
- if self._use_legacy_data and self._legacy_client is not None:
- client = self._legacy_client
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(None, lambda: client.db_write(db_number, start, bytearray(data)))
- return
-
+ """Write raw bytes to a data block."""
payload = _build_write_payload([(db_number, start, data)])
response = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, payload)
_parse_write_response(response)
async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytes]:
- """Read multiple data block regions in a single request.
-
- Args:
- items: List of (db_number, start_offset, size) tuples
-
- Returns:
- List of raw bytes for each item
- """
- if self._use_legacy_data and self._legacy_client is not None:
- client = self._legacy_client
- loop = asyncio.get_event_loop()
- multi_results: list[bytes] = []
- for db_number, start, size in items:
-
- def _read(db: int = db_number, s: int = start, sz: int = size) -> bytearray:
- return bytearray(client.db_read(db, s, sz))
-
- data = await loop.run_in_executor(None, _read)
- multi_results.append(bytes(data))
- return multi_results
-
+ """Read multiple data block regions in a single request."""
payload = _build_read_payload(items)
response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload)
-
parsed = _parse_read_response(response)
return [r if r is not None else b"" for r in parsed]
async def explore(self) -> bytes:
- """Browse the PLC object tree.
-
- Returns:
- Raw response payload
- """
+ """Browse the PLC object tree."""
return await self._send_request(FunctionCode.EXPLORE, b"")
# -- Internal methods --
async def _send_request(self, function_code: int, payload: bytes) -> bytes:
- """Send an S7CommPlus request and receive the response.
-
- For V2+ with IntegrityId tracking, inserts IntegrityId after the
- 14-byte request header and strips it from the response.
- """
+ """Send an S7CommPlus request and receive the response."""
async with self._lock:
if not self._connected or self._writer is None or self._reader is None:
raise RuntimeError("Not connected")
@@ -569,7 +424,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
0x36,
)
- # For V2+ with IntegrityId, insert after header
integrity_id_bytes = b""
if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2:
is_read = function_code in READ_FUNCTION_CODES
@@ -582,7 +436,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
frame += struct.pack(">BBH", 0x72, self._protocol_version, 0x0000)
await self._send_cotp_dt(frame)
- # Increment appropriate IntegrityId counter
if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2:
if function_code in READ_FUNCTION_CODES:
self._integrity_id_read = (self._integrity_id_read + 1) & 0xFFFFFFFF
@@ -597,7 +450,6 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
if len(response) < 14:
raise RuntimeError("Response too short")
- # For V2+, skip IntegrityId in response
resp_offset = 14
if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2:
if resp_offset < len(response):
@@ -611,7 +463,6 @@ async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None:
if self._writer is None or self._reader is None:
raise RuntimeError("Not connected")
- # Build COTP CR
base_pdu = struct.pack(">BBHHB", 6, _COTP_CR, 0x0000, 0x0001, 0x00)
calling_tsap = struct.pack(">BBH", 0xC1, 2, local_tsap)
called_tsap = struct.pack(">BB", 0xC2, len(remote_tsap)) + remote_tsap
@@ -620,12 +471,10 @@ async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None:
params = calling_tsap + called_tsap + pdu_size_param
cr_pdu = struct.pack(">B", 6 + len(params)) + base_pdu[1:] + params
- # Send TPKT + CR
tpkt = struct.pack(">BBH", 3, 0, 4 + len(cr_pdu)) + cr_pdu
self._writer.write(tpkt)
await self._writer.drain()
- # Receive TPKT + CC
tpkt_header = await self._reader.readexactly(4)
_, _, length = struct.unpack(">BBH", tpkt_header)
payload = await self._reader.readexactly(length - 4)
@@ -645,7 +494,7 @@ async def _init_ssl(self) -> None:
0x0000,
seq_num,
0x00000000,
- 0x30, # Transport flags for InitSSL
+ 0x30,
)
request += struct.pack(">I", 0)
@@ -666,7 +515,6 @@ async def _create_session(self) -> None:
"""Send CreateObject to establish S7CommPlus session."""
seq_num = self._next_sequence_number()
- # Build CreateObject request header
request = struct.pack(
">BHHHHIB",
Opcode.REQUEST,
@@ -674,32 +522,24 @@ async def _create_session(self) -> None:
FunctionCode.CREATE_OBJECT,
0x0000,
seq_num,
- ObjectId.OBJECT_NULL_SERVER_SESSION, # SessionId = 288
+ ObjectId.OBJECT_NULL_SERVER_SESSION,
0x36,
)
- # RequestId: ObjectServerSessionContainer (285)
request += struct.pack(">I", ObjectId.OBJECT_SERVER_SESSION_CONTAINER)
-
- # RequestValue: ValueUDInt(0)
request += bytes([0x00, DataType.UDINT]) + encode_uint32_vlq(0)
-
- # Unknown padding
request += struct.pack(">I", 0)
- # RequestObject: NullServerSession PObject
request += bytes([ElementID.START_OF_OBJECT])
request += struct.pack(">I", ObjectId.GET_NEW_RID_ON_SERVER)
request += encode_uint32_vlq(ObjectId.CLASS_SERVER_SESSION)
- request += encode_uint32_vlq(0) # ClassFlags
- request += encode_uint32_vlq(0) # AttributeId
+ request += encode_uint32_vlq(0)
+ request += encode_uint32_vlq(0)
- # Attribute: ServerSessionClientRID = 0x80c3c901
request += bytes([ElementID.ATTRIBUTE])
request += encode_uint32_vlq(ObjectId.SERVER_SESSION_CLIENT_RID)
request += encode_typed_value(DataType.RID, 0x80C3C901)
- # Nested: ClassSubscriptions
request += bytes([ElementID.START_OF_OBJECT])
request += struct.pack(">I", ObjectId.GET_NEW_RID_ON_SERVER)
request += encode_uint32_vlq(ObjectId.CLASS_SUBSCRIPTIONS)
@@ -710,7 +550,6 @@ async def _create_session(self) -> None:
request += bytes([ElementID.TERMINATING_OBJECT])
request += struct.pack(">I", 0)
- # Frame header + trailer
frame = encode_header(ProtocolVersion.V1, len(request)) + request
frame += struct.pack(">BBH", 0x72, ProtocolVersion.V1, 0x0000)
await self._send_cotp_dt(frame)
@@ -725,7 +564,6 @@ async def _create_session(self) -> None:
self._session_id = struct.unpack_from(">I", response, 9)[0]
self._protocol_version = version
- # Parse ServerSessionVersion from response payload
self._parse_create_object_response(response[14:])
def _parse_create_object_response(self, payload: bytes) -> None:
@@ -754,13 +592,11 @@ def _parse_create_object_response(self, payload: bytes) -> None:
logger.info(f"ServerSessionVersion = {value}")
return
else:
- # Skip attribute value
if offset + 2 > len(payload):
break
_flags = payload[offset]
_dt = payload[offset + 1]
offset += 2
- # Best-effort skip: advance past common VLQ-encoded values
if offset < len(payload):
_, consumed = decode_uint32_vlq(payload, offset)
offset += consumed
@@ -769,13 +605,13 @@ def _parse_create_object_response(self, payload: bytes) -> None:
offset += 1
if offset + 4 > len(payload):
break
- offset += 4 # RelationId
+ offset += 4
_, consumed = decode_uint32_vlq(payload, offset)
- offset += consumed # ClassId
+ offset += consumed
_, consumed = decode_uint32_vlq(payload, offset)
- offset += consumed # ClassFlags
+ offset += consumed
_, consumed = decode_uint32_vlq(payload, offset)
- offset += consumed # AttributeId
+ offset += consumed
elif tag == ElementID.TERMINATING_OBJECT:
offset += 1
@@ -787,28 +623,21 @@ def _parse_create_object_response(self, payload: bytes) -> None:
logger.debug("ServerSessionVersion not found in CreateObject response")
async def _setup_session(self) -> bool:
- """Echo ServerSessionVersion back to the PLC via SetMultiVariables.
-
- Without this step, the PLC rejects all subsequent data operations
- with ERROR2 (0x05A9).
-
- Returns:
- True if session setup succeeded.
- """
+ """Echo ServerSessionVersion back to the PLC via SetMultiVariables."""
if self._server_session_version is None:
return False
payload = bytearray()
payload += struct.pack(">I", self._session_id)
- payload += encode_uint32_vlq(1) # Item count
- payload += encode_uint32_vlq(1) # Total address field count
+ payload += encode_uint32_vlq(1)
+ payload += encode_uint32_vlq(1)
payload += encode_uint32_vlq(ObjectId.SERVER_SESSION_VERSION)
- payload += encode_uint32_vlq(1) # ItemNumber
+ payload += encode_uint32_vlq(1)
payload += bytes([0x00, DataType.UDINT])
payload += encode_uint32_vlq(self._server_session_version)
- payload += bytes([0x00]) # Fill byte
+ payload += bytes([0x00])
payload += encode_object_qualifier()
- payload += struct.pack(">I", 0) # Trailing padding
+ payload += struct.pack(">I", 0)
try:
resp_payload = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, bytes(payload))
diff --git a/snap7/s7commplus/client.py b/s7/_s7commplus_client.py
similarity index 57%
rename from snap7/s7commplus/client.py
rename to s7/_s7commplus_client.py
index a3f32ab4..d70e9458 100644
--- a/snap7/s7commplus/client.py
+++ b/s7/_s7commplus_client.py
@@ -1,20 +1,8 @@
-"""
-S7CommPlus client for S7-1200/1500 PLCs.
-
-Provides high-level operations over the S7CommPlus protocol, similar to
-the existing snap7.Client but targeting S7-1200/1500 PLCs with full
-engineering access (symbolic addressing, optimized data blocks, etc.).
-
-Supports all S7CommPlus protocol versions (V1/V2/V3/TLS). The protocol
-version is auto-detected from the PLC's CreateObject response during
-connection setup.
+"""Pure S7CommPlus client for S7-1200/1500 PLCs (no legacy fallback).
-When a PLC does not support S7CommPlus data operations (e.g. PLCs that
-accept S7CommPlus sessions but return ERROR2 for GetMultiVariables),
-the client transparently falls back to the legacy S7 protocol for
-data block read/write operations.
-
-Status: V1 and V2 connections are functional. V3/TLS authentication planned.
+This is an internal module used by the unified ``s7.Client``. It provides
+raw S7CommPlus data operations without any fallback logic -- the unified
+client is responsible for deciding when to fall back to legacy S7.
Reference: thomas-v2/S7CommPlusDriver (C#, LGPL-3.0)
"""
@@ -37,46 +25,16 @@
class S7CommPlusClient:
- """S7CommPlus client for S7-1200/1500 PLCs.
-
- Supports all S7CommPlus protocol versions:
- - V1: S7-1200 FW V4.0+
- - V2: S7-1200/1500 with older firmware
- - V3: S7-1200/1500 pre-TIA Portal V17
- - V3 + TLS: TIA Portal V17+ (recommended)
-
- The protocol version is auto-detected during connection.
+ """Pure S7CommPlus client without legacy fallback.
- When the PLC does not support S7CommPlus data operations, the client
- automatically falls back to legacy S7 protocol for db_read/db_write.
-
- Example::
-
- client = S7CommPlusClient()
- client.connect("192.168.1.10")
-
- # Read raw bytes from DB1
- data = client.db_read(1, 0, 4)
-
- # Write raw bytes to DB1
- client.db_write(1, 0, struct.pack(">f", 23.5))
-
- client.disconnect()
+ Use ``s7.Client`` for automatic protocol selection.
"""
def __init__(self) -> None:
self._connection: Optional[S7CommPlusConnection] = None
- self._legacy_client: Optional[Any] = None
- self._use_legacy_data: bool = False
- self._host: str = ""
- self._port: int = 102
- self._rack: int = 0
- self._slot: int = 1
@property
def connected(self) -> bool:
- if self._use_legacy_data and self._legacy_client is not None:
- return bool(self._legacy_client.connected)
return self._connection is not None and self._connection.connected
@property
@@ -94,9 +52,18 @@ def session_id(self) -> int:
return self._connection.session_id
@property
- def using_legacy_fallback(self) -> bool:
- """Whether the client is using legacy S7 protocol for data operations."""
- return self._use_legacy_data
+ def session_setup_ok(self) -> bool:
+ """Whether the S7CommPlus session setup succeeded for data operations."""
+ if self._connection is None:
+ return False
+ return self._connection.session_setup_ok
+
+ @property
+ def tls_active(self) -> bool:
+ """Whether TLS is active on the connection."""
+ if self._connection is None:
+ return False
+ return self._connection.tls_active
def connect(
self,
@@ -112,30 +79,18 @@ def connect(
) -> None:
"""Connect to an S7-1200/1500 PLC using S7CommPlus.
- If the PLC does not support S7CommPlus data operations, a secondary
- legacy S7 connection is established transparently for data access.
-
Args:
host: PLC IP address or hostname
port: TCP port (default 102)
- rack: PLC rack number
- slot: PLC slot number
+ rack: PLC rack number (unused, kept for API symmetry)
+ slot: PLC slot number (unused, kept for API symmetry)
use_tls: Whether to activate TLS (required for V2)
tls_cert: Path to client TLS certificate (PEM)
tls_key: Path to client private key (PEM)
tls_ca: Path to CA certificate for PLC verification (PEM)
password: PLC password for legitimation (V2+ with TLS)
"""
- self._host = host
- self._port = port
- self._rack = rack
- self._slot = slot
-
- self._connection = S7CommPlusConnection(
- host=host,
- port=port,
- )
-
+ self._connection = S7CommPlusConnection(host=host, port=port)
self._connection.connect(
use_tls=use_tls,
tls_cert=tls_cert,
@@ -143,49 +98,19 @@ def connect(
tls_ca=tls_ca,
)
- # Handle legitimation for password-protected PLCs
if password is not None and self._connection.tls_active:
logger.info("Performing PLC legitimation (password authentication)")
self._connection.authenticate(password)
- # Check if S7CommPlus session setup succeeded. If the PLC accepted the
- # ServerSessionVersion echo, data operations should work. If it returned
- # an error (e.g. ERROR2), fall back to legacy S7 protocol.
- if not self._connection.session_setup_ok:
- logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol")
- self._setup_legacy_fallback()
-
- def _setup_legacy_fallback(self) -> None:
- """Establish a secondary legacy S7 connection for data operations."""
- from ..client import Client
-
- self._legacy_client = Client()
- self._legacy_client.connect(self._host, self._rack, self._slot, self._port)
- self._use_legacy_data = True
- logger.info(f"Legacy S7 fallback connected to {self._host}:{self._port}")
-
def disconnect(self) -> None:
"""Disconnect from PLC."""
- if self._legacy_client is not None:
- try:
- self._legacy_client.disconnect()
- except Exception:
- pass
- self._legacy_client = None
- self._use_legacy_data = False
-
if self._connection:
self._connection.disconnect()
self._connection = None
- # -- Data block read/write --
-
def db_read(self, db_number: int, start: int, size: int) -> bytes:
"""Read raw bytes from a data block.
- Uses S7CommPlus protocol when supported, otherwise falls back to
- legacy S7 protocol transparently.
-
Args:
db_number: Data block number
start: Start byte offset
@@ -194,18 +119,11 @@ def db_read(self, db_number: int, start: int, size: int) -> bytes:
Returns:
Raw bytes read from the data block
"""
- if self._use_legacy_data and self._legacy_client is not None:
- return bytes(self._legacy_client.db_read(db_number, start, size))
-
if self._connection is None:
raise RuntimeError("Not connected")
payload = _build_read_payload([(db_number, start, size)])
- logger.debug(f"db_read: db={db_number} start={start} size={size} payload={payload.hex(' ')}")
-
response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload)
- logger.debug(f"db_read: response ({len(response)} bytes): {response.hex(' ')}")
-
results = _parse_read_response(response)
if not results:
raise RuntimeError("Read returned no data")
@@ -216,70 +134,38 @@ def db_read(self, db_number: int, start: int, size: int) -> bytes:
def db_write(self, db_number: int, start: int, data: bytes) -> None:
"""Write raw bytes to a data block.
- Uses S7CommPlus protocol when supported, otherwise falls back to
- legacy S7 protocol transparently.
-
Args:
db_number: Data block number
start: Start byte offset
data: Bytes to write
"""
- if self._use_legacy_data and self._legacy_client is not None:
- self._legacy_client.db_write(db_number, start, bytearray(data))
- return
-
if self._connection is None:
raise RuntimeError("Not connected")
payload = _build_write_payload([(db_number, start, data)])
- logger.debug(
- f"db_write: db={db_number} start={start} data_len={len(data)} data={data.hex(' ')} payload={payload.hex(' ')}"
- )
-
response = self._connection.send_request(FunctionCode.SET_MULTI_VARIABLES, payload)
- logger.debug(f"db_write: response ({len(response)} bytes): {response.hex(' ')}")
-
_parse_write_response(response)
def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytes]:
"""Read multiple data block regions in a single request.
- Uses S7CommPlus protocol when supported, otherwise falls back to
- legacy S7 protocol (individual reads) transparently.
-
Args:
items: List of (db_number, start_offset, size) tuples
Returns:
List of raw bytes for each item
"""
- if self._use_legacy_data and self._legacy_client is not None:
- results = []
- for db_number, start, size in items:
- data = self._legacy_client.db_read(db_number, start, size)
- results.append(bytes(data))
- return results
-
if self._connection is None:
raise RuntimeError("Not connected")
payload = _build_read_payload(items)
- logger.debug(f"db_read_multi: {len(items)} items: {items} payload={payload.hex(' ')}")
-
response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload)
- logger.debug(f"db_read_multi: response ({len(response)} bytes): {response.hex(' ')}")
-
parsed = _parse_read_response(response)
return [r if r is not None else b"" for r in parsed]
- # -- Explore (browse PLC object tree) --
-
def explore(self) -> bytes:
"""Browse the PLC object tree.
- Returns the raw Explore response payload for parsing.
- Full symbolic exploration will be implemented in a future version.
-
Returns:
Raw response payload
"""
@@ -287,11 +173,8 @@ def explore(self) -> bytes:
raise RuntimeError("Not connected")
response = self._connection.send_request(FunctionCode.EXPLORE, b"")
- logger.debug(f"explore: response ({len(response)} bytes): {response.hex(' ')}")
return response
- # -- Context manager --
-
def __enter__(self) -> "S7CommPlusClient":
return self
@@ -310,10 +193,7 @@ def _build_read_payload(items: list[tuple[int, int, int]]) -> bytes:
Returns:
Encoded payload bytes (after the 14-byte request header)
-
- Reference: thomas-v2/S7CommPlusDriver/Core/GetMultiVariablesRequest.cs
"""
- # Encode all item addresses and compute total field count
addresses: list[bytes] = []
total_field_count = 0
for db_number, start, size in items:
@@ -321,24 +201,18 @@ def _build_read_payload(items: list[tuple[int, int, int]]) -> bytes:
addr_bytes, field_count = encode_item_address(
access_area=access_area,
access_sub_area=Ids.DB_VALUE_ACTUAL,
- lids=[start + 1, size], # LID byte offsets are 1-based in S7CommPlus
+ lids=[start + 1, size],
)
addresses.append(addr_bytes)
total_field_count += field_count
payload = bytearray()
- # LinkId (UInt32 fixed = 0, for reading variables)
payload += struct.pack(">I", 0)
- # Item count
payload += encode_uint32_vlq(len(items))
- # Total field count across all items
payload += encode_uint32_vlq(total_field_count)
- # Item addresses
for addr in addresses:
payload += addr
- # ObjectQualifier
payload += encode_object_qualifier()
- # Padding
payload += struct.pack(">I", 0)
return bytes(payload)
@@ -352,21 +226,16 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]:
Returns:
List of raw bytes per item (None for errored items)
-
- Reference: thomas-v2/S7CommPlusDriver/Core/GetMultiVariablesResponse.cs
"""
offset = 0
- # ReturnValue (UInt64 VLQ)
return_value, consumed = decode_uint64_vlq(response, offset)
offset += consumed
- logger.debug(f"_parse_read_response: return_value={return_value}")
if return_value != 0:
logger.error(f"_parse_read_response: PLC returned error: {return_value}")
return []
- # Value list: ItemNumber (VLQ) + PValue, terminated by ItemNumber=0
values: dict[int, bytes] = {}
while offset < len(response):
item_nr, consumed = decode_uint32_vlq(response, offset)
@@ -377,7 +246,6 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]:
offset += consumed
values[item_nr] = raw_bytes
- # Error list: ErrorItemNumber (VLQ) + ErrorReturnValue (UInt64 VLQ), terminated by 0
errors: dict[int, int] = {}
while offset < len(response):
err_item_nr, consumed = decode_uint32_vlq(response, offset)
@@ -387,9 +255,7 @@ def _parse_read_response(response: bytes) -> list[Optional[bytes]]:
err_value, consumed = decode_uint64_vlq(response, offset)
offset += consumed
errors[err_item_nr] = err_value
- logger.debug(f"_parse_read_response: error item {err_item_nr}: {err_value}")
- # Build result list (1-based item numbers)
max_item = max(max(values.keys(), default=0), max(errors.keys(), default=0))
results: list[Optional[bytes]] = []
for i in range(1, max_item + 1):
@@ -409,10 +275,7 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes:
Returns:
Encoded payload bytes
-
- Reference: thomas-v2/S7CommPlusDriver/Core/SetMultiVariablesRequest.cs
"""
- # Encode all item addresses and compute total field count
addresses: list[bytes] = []
total_field_count = 0
for db_number, start, data in items:
@@ -420,30 +283,22 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes:
addr_bytes, field_count = encode_item_address(
access_area=access_area,
access_sub_area=Ids.DB_VALUE_ACTUAL,
- lids=[start + 1, len(data)], # LID byte offsets are 1-based in S7CommPlus
+ lids=[start + 1, len(data)],
)
addresses.append(addr_bytes)
total_field_count += field_count
payload = bytearray()
- # InObjectId (UInt32 fixed = 0, for plain variable writes)
payload += struct.pack(">I", 0)
- # Item count
payload += encode_uint32_vlq(len(items))
- # Total field count
payload += encode_uint32_vlq(total_field_count)
- # Item addresses
for addr in addresses:
payload += addr
- # Value list: ItemNumber (1-based) + PValue
for i, (_, _, data) in enumerate(items, 1):
payload += encode_uint32_vlq(i)
payload += encode_pvalue_blob(data)
- # Fill byte
payload += bytes([0x00])
- # ObjectQualifier
payload += encode_object_qualifier()
- # Padding
payload += struct.pack(">I", 0)
return bytes(payload)
@@ -452,25 +307,17 @@ def _build_write_payload(items: list[tuple[int, int, bytes]]) -> bytes:
def _parse_write_response(response: bytes) -> None:
"""Parse a SetMultiVariables response payload.
- Args:
- response: Response payload (after the 14-byte response header)
-
Raises:
RuntimeError: If the write failed
-
- Reference: thomas-v2/S7CommPlusDriver/Core/SetMultiVariablesResponse.cs
"""
offset = 0
- # ReturnValue (UInt64 VLQ)
return_value, consumed = decode_uint64_vlq(response, offset)
offset += consumed
- logger.debug(f"_parse_write_response: return_value={return_value}")
if return_value != 0:
raise RuntimeError(f"Write failed with return value {return_value}")
- # Error list: ErrorItemNumber (VLQ) + ErrorReturnValue (UInt64 VLQ)
errors: list[tuple[int, int]] = []
while offset < len(response):
err_item_nr, consumed = decode_uint32_vlq(response, offset)
diff --git a/snap7/s7commplus/server.py b/s7/_s7commplus_server.py
similarity index 100%
rename from snap7/s7commplus/server.py
rename to s7/_s7commplus_server.py
diff --git a/s7/async_client.py b/s7/async_client.py
new file mode 100644
index 00000000..c7bbeb7e
--- /dev/null
+++ b/s7/async_client.py
@@ -0,0 +1,252 @@
+"""Unified async S7 client with protocol auto-discovery.
+
+Provides a single async client that automatically selects the best protocol
+(S7CommPlus or legacy S7) for communicating with Siemens S7 PLCs.
+
+Usage::
+
+ from s7 import AsyncClient
+
+ async with AsyncClient() as client:
+ await client.connect("192.168.1.10", 0, 1)
+ data = await client.db_read(1, 0, 4)
+"""
+
+import logging
+from typing import Any, Optional
+
+from snap7.async_client import AsyncClient as LegacyAsyncClient
+
+from ._protocol import Protocol
+from ._s7commplus_async_client import S7CommPlusAsyncClient
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncClient:
+ """Unified async S7 client with protocol auto-discovery.
+
+ Async counterpart of :class:`s7.Client`. Automatically selects the
+ best protocol for the target PLC using asyncio for non-blocking I/O.
+
+ Methods not explicitly defined are delegated to the underlying
+ legacy async client via ``__getattr__``.
+
+ Example::
+
+ from s7 import AsyncClient
+
+ async with AsyncClient() as client:
+ await client.connect("192.168.1.10", 0, 1)
+ data = await client.db_read(1, 0, 4)
+ print(client.protocol)
+ """
+
+ def __init__(self) -> None:
+ self._legacy: Optional[LegacyAsyncClient] = None
+ self._plus: Optional[S7CommPlusAsyncClient] = None
+ self._protocol: Protocol = Protocol.AUTO
+ self._host: str = ""
+ self._port: int = 102
+ self._rack: int = 0
+ self._slot: int = 1
+
+ @property
+ def protocol(self) -> Protocol:
+ """The protocol currently in use for DB operations."""
+ return self._protocol
+
+ @property
+ def connected(self) -> bool:
+ """Whether the client is connected to a PLC."""
+ if self._legacy is not None and self._legacy.connected:
+ return True
+ if self._plus is not None and self._plus.connected:
+ return True
+ return False
+
+ async def connect(
+ self,
+ address: str,
+ rack: int = 0,
+ slot: int = 1,
+ tcp_port: int = 102,
+ *,
+ protocol: Protocol = Protocol.AUTO,
+ use_tls: bool = False,
+ tls_cert: Optional[str] = None,
+ tls_key: Optional[str] = None,
+ tls_ca: Optional[str] = None,
+ ) -> "AsyncClient":
+ """Connect to an S7 PLC.
+
+ Args:
+ address: PLC IP address or hostname.
+ rack: PLC rack number.
+ slot: PLC slot number.
+ tcp_port: TCP port (default 102).
+ protocol: Protocol selection. AUTO tries S7CommPlus first,
+ then falls back to legacy S7.
+ use_tls: Whether to activate TLS after InitSSL.
+ tls_cert: Path to client TLS certificate (PEM).
+ tls_key: Path to client private key (PEM).
+ tls_ca: Path to CA certificate for PLC verification (PEM).
+
+ Returns:
+ self, for method chaining.
+ """
+ self._host = address
+ self._port = tcp_port
+ self._rack = rack
+ self._slot = slot
+
+ if protocol in (Protocol.AUTO, Protocol.S7COMMPLUS):
+ if await self._try_s7commplus(
+ address,
+ tcp_port,
+ rack,
+ slot,
+ use_tls=use_tls,
+ tls_cert=tls_cert,
+ tls_key=tls_key,
+ tls_ca=tls_ca,
+ ):
+ self._protocol = Protocol.S7COMMPLUS
+ logger.info(f"Async connected to {address}:{tcp_port} using S7CommPlus")
+ else:
+ if protocol == Protocol.S7COMMPLUS:
+ raise RuntimeError(
+ f"S7CommPlus connection to {address}:{tcp_port} failed and protocol=S7COMMPLUS was explicitly requested"
+ )
+ self._protocol = Protocol.LEGACY
+ logger.info(f"S7CommPlus not available, using legacy S7 for {address}:{tcp_port}")
+ else:
+ self._protocol = Protocol.LEGACY
+
+ # Always connect legacy client
+ self._legacy = LegacyAsyncClient()
+ await self._legacy.connect(address, rack, slot, tcp_port)
+ logger.info(f"Async legacy S7 connected to {address}:{tcp_port}")
+
+ return self
+
+ async def _try_s7commplus(
+ self,
+ address: str,
+ tcp_port: int,
+ rack: int,
+ slot: int,
+ *,
+ use_tls: bool = False,
+ tls_cert: Optional[str] = None,
+ tls_key: Optional[str] = None,
+ tls_ca: Optional[str] = None,
+ ) -> bool:
+ """Try to establish an S7CommPlus connection."""
+ plus = S7CommPlusAsyncClient()
+ try:
+ await plus.connect(
+ host=address,
+ port=tcp_port,
+ rack=rack,
+ slot=slot,
+ use_tls=use_tls,
+ tls_cert=tls_cert,
+ tls_key=tls_key,
+ tls_ca=tls_ca,
+ )
+ except Exception as e:
+ logger.debug(f"S7CommPlus connection failed: {e}")
+ return False
+
+ if not plus.session_setup_ok:
+ logger.debug("S7CommPlus session setup not OK, disconnecting")
+ await plus.disconnect()
+ return False
+
+ self._plus = plus
+ return True
+
+ async def disconnect(self) -> int:
+ """Disconnect from PLC.
+
+ Returns:
+ 0 on success (matches snap7.AsyncClient).
+ """
+ if self._plus is not None:
+ try:
+ await self._plus.disconnect()
+ except Exception:
+ pass
+ self._plus = None
+
+ if self._legacy is not None:
+ try:
+ await self._legacy.disconnect()
+ except Exception:
+ pass
+ self._legacy = None
+
+ self._protocol = Protocol.AUTO
+ return 0
+
+ async def db_read(self, db_number: int, start: int, size: int) -> bytearray:
+ """Read raw bytes from a data block."""
+ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
+ return bytearray(await self._plus.db_read(db_number, start, size))
+ if self._legacy is not None:
+ return await self._legacy.db_read(db_number, start, size)
+ raise RuntimeError("Not connected")
+
+ async def db_write(self, db_number: int, start: int, data: bytearray) -> int:
+ """Write raw bytes to a data block.
+
+ Returns:
+ 0 on success (matches snap7.AsyncClient).
+ """
+ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
+ await self._plus.db_write(db_number, start, bytes(data))
+ return 0
+ if self._legacy is not None:
+ return await self._legacy.db_write(db_number, start, data)
+ raise RuntimeError("Not connected")
+
+ async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]:
+ """Read multiple data block regions in a single request."""
+ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
+ return [bytearray(r) for r in await self._plus.db_read_multi(items)]
+ if self._legacy is not None:
+ results = []
+ for db, start, size in items:
+ results.append(await self._legacy.db_read(db, start, size))
+ return results
+ raise RuntimeError("Not connected")
+
+ async def explore(self) -> bytes:
+ """Browse the PLC object tree (S7CommPlus only).
+
+ Raises:
+ RuntimeError: If not connected via S7CommPlus.
+ """
+ if self._plus is None:
+ raise RuntimeError("explore() requires S7CommPlus connection")
+ return await self._plus.explore()
+
+ def __getattr__(self, name: str) -> Any:
+ """Delegate unknown methods to the legacy client."""
+ if name.startswith("_"):
+ raise AttributeError(name)
+ if self._legacy is not None:
+ return getattr(self._legacy, name)
+ raise AttributeError(f"'AsyncClient' object has no attribute {name!r} (not connected)")
+
+ async def __aenter__(self) -> "AsyncClient":
+ return self
+
+ async def __aexit__(self, *args: Any) -> None:
+ await self.disconnect()
+
+ def __repr__(self) -> str:
+ if self.connected:
+ return f""
+ return ""
diff --git a/s7/client.py b/s7/client.py
new file mode 100644
index 00000000..3b624b88
--- /dev/null
+++ b/s7/client.py
@@ -0,0 +1,266 @@
+"""Unified S7 client with protocol auto-discovery.
+
+Provides a single client that automatically selects the best protocol
+(S7CommPlus or legacy S7) for communicating with Siemens S7 PLCs.
+
+Usage::
+
+ from s7 import Client
+
+ client = Client()
+ client.connect("192.168.1.10", 0, 1)
+ data = client.db_read(1, 0, 4)
+"""
+
+import logging
+from typing import Any, Optional
+
+from snap7.client import Client as LegacyClient
+
+from ._protocol import Protocol
+from ._s7commplus_client import S7CommPlusClient
+
+logger = logging.getLogger(__name__)
+
+
+class Client:
+ """Unified S7 client with protocol auto-discovery.
+
+ Automatically selects the best protocol for the target PLC:
+ - S7CommPlus for S7-1200/1500 PLCs with full data operations
+ - Legacy S7 for S7-300/400 or when S7CommPlus is unavailable
+
+ Methods not explicitly defined are delegated to the underlying
+ legacy client via ``__getattr__``.
+
+ Example::
+
+ from s7 import Client
+
+ client = Client()
+ client.connect("192.168.1.10", 0, 1)
+ data = client.db_read(1, 0, 4)
+ print(client.protocol)
+ """
+
+ def __init__(self) -> None:
+ self._legacy: Optional[LegacyClient] = None
+ self._plus: Optional[S7CommPlusClient] = None
+ self._protocol: Protocol = Protocol.AUTO
+ self._host: str = ""
+ self._port: int = 102
+ self._rack: int = 0
+ self._slot: int = 1
+
+ @property
+ def protocol(self) -> Protocol:
+ """The protocol currently in use for DB operations."""
+ return self._protocol
+
+ @property
+ def connected(self) -> bool:
+ """Whether the client is connected to a PLC."""
+ if self._legacy is not None and self._legacy.connected:
+ return True
+ if self._plus is not None and self._plus.connected:
+ return True
+ return False
+
+ def connect(
+ self,
+ address: str,
+ rack: int = 0,
+ slot: int = 1,
+ tcp_port: int = 102,
+ *,
+ protocol: Protocol = Protocol.AUTO,
+ use_tls: bool = False,
+ tls_cert: Optional[str] = None,
+ tls_key: Optional[str] = None,
+ tls_ca: Optional[str] = None,
+ password: Optional[str] = None,
+ ) -> "Client":
+ """Connect to an S7 PLC.
+
+ Args:
+ address: PLC IP address or hostname.
+ rack: PLC rack number.
+ slot: PLC slot number.
+ tcp_port: TCP port (default 102).
+ protocol: Protocol selection. AUTO tries S7CommPlus first,
+ then falls back to legacy S7.
+ use_tls: Whether to activate TLS (required for V2+).
+ tls_cert: Path to client TLS certificate (PEM).
+ tls_key: Path to client private key (PEM).
+ tls_ca: Path to CA certificate for PLC verification (PEM).
+ password: PLC password for legitimation (V2+ with TLS).
+
+ Returns:
+ self, for method chaining.
+ """
+ self._host = address
+ self._port = tcp_port
+ self._rack = rack
+ self._slot = slot
+
+ if protocol in (Protocol.AUTO, Protocol.S7COMMPLUS):
+ if self._try_s7commplus(
+ address,
+ tcp_port,
+ rack,
+ slot,
+ use_tls=use_tls,
+ tls_cert=tls_cert,
+ tls_key=tls_key,
+ tls_ca=tls_ca,
+ password=password,
+ ):
+ self._protocol = Protocol.S7COMMPLUS
+ logger.info(f"Connected to {address}:{tcp_port} using S7CommPlus")
+ else:
+ if protocol == Protocol.S7COMMPLUS:
+ raise RuntimeError(
+ f"S7CommPlus connection to {address}:{tcp_port} failed and protocol=S7COMMPLUS was explicitly requested"
+ )
+ self._protocol = Protocol.LEGACY
+ logger.info(f"S7CommPlus not available, using legacy S7 for {address}:{tcp_port}")
+ else:
+ self._protocol = Protocol.LEGACY
+
+ # Always connect legacy client (needed for block ops, PLC control, etc.)
+ self._legacy = LegacyClient()
+ self._legacy.connect(address, rack, slot, tcp_port)
+ logger.info(f"Legacy S7 connected to {address}:{tcp_port}")
+
+ return self
+
+ def _try_s7commplus(
+ self,
+ address: str,
+ tcp_port: int,
+ rack: int,
+ slot: int,
+ *,
+ use_tls: bool = False,
+ tls_cert: Optional[str] = None,
+ tls_key: Optional[str] = None,
+ tls_ca: Optional[str] = None,
+ password: Optional[str] = None,
+ ) -> bool:
+ """Try to establish an S7CommPlus connection.
+
+ Returns True if S7CommPlus data operations are available.
+ """
+ plus = S7CommPlusClient()
+ try:
+ plus.connect(
+ host=address,
+ port=tcp_port,
+ rack=rack,
+ slot=slot,
+ use_tls=use_tls,
+ tls_cert=tls_cert,
+ tls_key=tls_key,
+ tls_ca=tls_ca,
+ password=password,
+ )
+ except Exception as e:
+ logger.debug(f"S7CommPlus connection failed: {e}")
+ return False
+
+ if not plus.session_setup_ok:
+ logger.debug("S7CommPlus session setup not OK, disconnecting")
+ plus.disconnect()
+ return False
+
+ self._plus = plus
+ return True
+
+ def disconnect(self) -> int:
+ """Disconnect from PLC.
+
+ Returns:
+ 0 on success (matches snap7.Client).
+ """
+ if self._plus is not None:
+ try:
+ self._plus.disconnect()
+ except Exception:
+ pass
+ self._plus = None
+
+ if self._legacy is not None:
+ try:
+ self._legacy.disconnect()
+ except Exception:
+ pass
+ self._legacy = None
+
+ self._protocol = Protocol.AUTO
+ return 0
+
+ def db_read(self, db_number: int, start: int, size: int) -> bytearray:
+ """Read raw bytes from a data block.
+
+ Uses S7CommPlus when available, otherwise legacy S7.
+ """
+ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
+ return bytearray(self._plus.db_read(db_number, start, size))
+ if self._legacy is not None:
+ return self._legacy.db_read(db_number, start, size)
+ raise RuntimeError("Not connected")
+
+ def db_write(self, db_number: int, start: int, data: bytearray) -> int:
+ """Write raw bytes to a data block.
+
+ Uses S7CommPlus when available, otherwise legacy S7.
+
+ Returns:
+ 0 on success (matches snap7.Client).
+ """
+ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
+ self._plus.db_write(db_number, start, bytes(data))
+ return 0
+ if self._legacy is not None:
+ return self._legacy.db_write(db_number, start, data)
+ raise RuntimeError("Not connected")
+
+ def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]:
+ """Read multiple data block regions in a single request.
+
+ Uses S7CommPlus native multi-read when available.
+ """
+ if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
+ return [bytearray(r) for r in self._plus.db_read_multi(items)]
+ if self._legacy is not None:
+ return [self._legacy.db_read(db, start, size) for db, start, size in items]
+ raise RuntimeError("Not connected")
+
+ def explore(self) -> bytes:
+ """Browse the PLC object tree (S7CommPlus only).
+
+ Raises:
+ RuntimeError: If not connected via S7CommPlus.
+ """
+ if self._plus is None:
+ raise RuntimeError("explore() requires S7CommPlus connection")
+ return self._plus.explore()
+
+ def __getattr__(self, name: str) -> Any:
+ """Delegate unknown methods to the legacy client."""
+ if name.startswith("_"):
+ raise AttributeError(name)
+ if self._legacy is not None:
+ return getattr(self._legacy, name)
+ raise AttributeError(f"'Client' object has no attribute {name!r} (not connected)")
+
+ def __enter__(self) -> "Client":
+ return self
+
+ def __exit__(self, *args: Any) -> None:
+ self.disconnect()
+
+ def __repr__(self) -> str:
+ if self.connected:
+ return f""
+ return ""
diff --git a/snap7/s7commplus/codec.py b/s7/codec.py
similarity index 100%
rename from snap7/s7commplus/codec.py
rename to s7/codec.py
diff --git a/snap7/s7commplus/connection.py b/s7/connection.py
similarity index 98%
rename from snap7/s7commplus/connection.py
rename to s7/connection.py
index ab4d1b67..878890bd 100644
--- a/snap7/s7commplus/connection.py
+++ b/s7/connection.py
@@ -44,7 +44,7 @@
from typing import Optional, Type
from types import TracebackType
-from ..connection import ISOTCPConnection
+from snap7.connection import ISOTCPConnection
from .protocol import (
FunctionCode,
Opcode,
@@ -216,7 +216,7 @@ def connect(
)
elif self._protocol_version == ProtocolVersion.V2:
if not self._tls_active:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("PLC reports V2 protocol but TLS is not active. V2 requires TLS. Use use_tls=True.")
# Enable IntegrityId tracking for V2+
@@ -254,12 +254,12 @@ def authenticate(self, password: str, username: str = "") -> None:
S7ConnectionError: If not connected, TLS not active, or auth fails
"""
if not self._connected:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Not connected")
if not self._tls_active or self._oms_secret is None:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Legitimation requires TLS. Connect with use_tls=True.")
@@ -317,13 +317,13 @@ def _get_legitimation_challenge(self) -> bytes:
offset += consumed
if return_value != 0:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError(f"GetVarSubStreamed for challenge failed: return_value={return_value}")
# Value is a USIntArray (BLOB) - read flags + type + length + data
if offset + 2 > len(resp_payload):
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Challenge response too short")
@@ -370,7 +370,7 @@ def _send_legitimation_new(self, encrypted_response: bytes) -> None:
if len(resp_payload) >= 1:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value < 0:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError(f"Legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"New legitimation return_value={return_value}")
@@ -402,7 +402,7 @@ def _send_legitimation_legacy(self, response: bytes) -> None:
if len(resp_payload) >= 1:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value < 0:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"Legacy legitimation return_value={return_value}")
@@ -444,7 +444,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes:
Response payload (after the 14-byte response header)
"""
if not self._connected:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Not connected")
@@ -510,7 +510,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes:
logger.debug(f" Response data ({len(response)} bytes): {response.hex(' ')}")
if len(response) < 14:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Response too short")
@@ -586,7 +586,7 @@ def _init_ssl(self) -> None:
response = response_frame[consumed:]
if len(response) < 14:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("InitSSL response too short")
@@ -675,7 +675,7 @@ def _create_session(self) -> None:
logger.debug(f"CreateObject response body ({len(response)} bytes): {response.hex(' ')}")
if len(response) < 14:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("CreateObject response too short")
@@ -911,7 +911,7 @@ def _setup_session(self) -> bool:
response = response_frame[consumed : consumed + data_length]
if len(response) < 14:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("SetupSession response too short")
@@ -988,7 +988,7 @@ def _activate_tls(
# Wrap the raw TCP socket used by ISOTCPConnection
raw_socket = self._iso_conn.socket
if raw_socket is None:
- from ..error import S7ConnectionError
+ from snap7.error import S7ConnectionError
raise S7ConnectionError("Cannot activate TLS: no TCP socket")
diff --git a/snap7/s7commplus/legitimation.py b/s7/legitimation.py
similarity index 100%
rename from snap7/s7commplus/legitimation.py
rename to s7/legitimation.py
diff --git a/snap7/s7commplus/protocol.py b/s7/protocol.py
similarity index 100%
rename from snap7/s7commplus/protocol.py
rename to s7/protocol.py
diff --git a/s7/py.typed b/s7/py.typed
new file mode 100644
index 00000000..e69de29b
diff --git a/s7/server.py b/s7/server.py
new file mode 100644
index 00000000..a0cad1d3
--- /dev/null
+++ b/s7/server.py
@@ -0,0 +1,132 @@
+"""Unified S7 server supporting both legacy S7 and S7CommPlus clients.
+
+Wraps both a legacy :class:`snap7.server.Server` and an
+:class:`S7CommPlusServer` so that test environments can serve both
+protocol stacks simultaneously.
+
+Usage::
+
+ from s7 import Server
+
+ server = Server()
+ server.start(tcp_port=102, s7commplus_port=11020)
+"""
+
+import logging
+from typing import Any, Optional
+
+from snap7.server import Server as LegacyServer
+
+from ._s7commplus_server import S7CommPlusServer, DataBlock
+
+logger = logging.getLogger(__name__)
+
+
+class Server:
+ """Unified S7 server for testing.
+
+ Runs a legacy S7 server and optionally an S7CommPlus server
+ side by side.
+ """
+
+ def __init__(self) -> None:
+ self._legacy = LegacyServer()
+ self._plus = S7CommPlusServer()
+
+ @property
+ def legacy_server(self) -> LegacyServer:
+ """Direct access to the legacy S7 server."""
+ return self._legacy
+
+ @property
+ def s7commplus_server(self) -> S7CommPlusServer:
+ """Direct access to the S7CommPlus server."""
+ return self._plus
+
+ def register_db(
+ self,
+ db_number: int,
+ variables: dict[str, tuple[str, int]],
+ size: int = 0,
+ ) -> DataBlock:
+ """Register a data block on the S7CommPlus server.
+
+ Args:
+ db_number: Data block number
+ variables: Dict of {name: (type_name, offset)}
+ size: Total DB size in bytes (auto-calculated if 0)
+
+ Returns:
+ The created DataBlock
+ """
+ return self._plus.register_db(db_number, variables, size)
+
+ def register_raw_db(self, db_number: int, data: bytearray) -> DataBlock:
+ """Register a raw data block on the S7CommPlus server.
+
+ Args:
+ db_number: Data block number
+ data: Raw bytearray backing the data block
+
+ Returns:
+ The created DataBlock
+ """
+ return self._plus.register_raw_db(db_number, data)
+
+ def get_db(self, db_number: int) -> Optional[DataBlock]:
+ """Get a registered data block."""
+ return self._plus.get_db(db_number)
+
+ def start(
+ self,
+ tcp_port: int = 102,
+ s7commplus_port: Optional[int] = None,
+ *,
+ use_tls: bool = False,
+ tls_cert: Optional[str] = None,
+ tls_key: Optional[str] = None,
+ ) -> None:
+ """Start the server(s).
+
+ Args:
+ tcp_port: Port for the legacy S7 server.
+ s7commplus_port: Port for the S7CommPlus server. If None,
+ only the legacy server is started.
+ use_tls: Whether to enable TLS on the S7CommPlus server.
+ tls_cert: Path to TLS certificate (PEM).
+ tls_key: Path to TLS private key (PEM).
+ """
+ self._legacy.start(tcp_port=tcp_port)
+ logger.info(f"Legacy S7 server started on port {tcp_port}")
+
+ if s7commplus_port is not None:
+ self._plus.start(
+ port=s7commplus_port,
+ use_tls=use_tls,
+ tls_cert=tls_cert,
+ tls_key=tls_key,
+ )
+ logger.info(f"S7CommPlus server started on port {s7commplus_port}")
+
+ def stop(self) -> None:
+ """Stop all servers."""
+ try:
+ self._plus.stop()
+ except Exception:
+ pass
+ try:
+ self._legacy.stop()
+ except Exception:
+ pass
+
+ def __getattr__(self, name: str) -> Any:
+ """Delegate unknown methods to the legacy server."""
+ if name.startswith("_"):
+ raise AttributeError(name)
+ return getattr(self._legacy, name)
+
+ def __enter__(self) -> "Server":
+ return self
+
+ def __exit__(self, *args: Any) -> None:
+ self.stop()
diff --git a/snap7/s7commplus/vlq.py b/s7/vlq.py
similarity index 100%
rename from snap7/s7commplus/vlq.py
rename to s7/vlq.py
diff --git a/snap7/s7commplus/__init__.py b/snap7/s7commplus/__init__.py
deleted file mode 100644
index ab49d09c..00000000
--- a/snap7/s7commplus/__init__.py
+++ /dev/null
@@ -1,36 +0,0 @@
-"""
-S7CommPlus protocol implementation for S7-1200/1500 PLCs.
-
-S7CommPlus (protocol ID 0x72) is the successor to S7comm (protocol ID 0x32),
-used by Siemens S7-1200 (firmware >= V4.0) and S7-1500 PLCs for full
-engineering access (program download/upload, symbolic addressing, etc.).
-
-Supported PLC / firmware targets::
-
- V1: S7-1200 FW V4.0+ (simple session handshake)
- V2: S7-1200/1500 older FW (session authentication)
- V3: S7-1200/1500 pre-TIA V17 (public-key key exchange)
- V3 + TLS: TIA Portal V17+ (TLS 1.3 with per-device certs)
-
-Protocol stack::
-
- +-------------------------------+
- | S7CommPlus (Protocol ID 0x72)|
- +-------------------------------+
- | TLS 1.3 (optional, V17+) |
- +-------------------------------+
- | COTP (ISO 8073) |
- +-------------------------------+
- | TPKT (RFC 1006) |
- +-------------------------------+
- | TCP (port 102) |
- +-------------------------------+
-
-The wire protocol (VLQ encoding, data types, function codes, object model)
-is the same across all versions -- only the session authentication differs.
-
-Status: V1 connection functional, V2 (TLS + IntegrityId) scaffolding complete.
-
-Reference implementation:
- https://github.com/thomas-v2/S7CommPlusDriver (C#, LGPL-3.0)
-"""
diff --git a/tests/conftest.py b/tests/conftest.py
index 4e53e6d3..527c8906 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -69,8 +69,8 @@ def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item
for mod_name in [
"tests.test_client_e2e",
"test_client_e2e",
- "tests.test_s7commplus_e2e",
- "test_s7commplus_e2e",
+ "tests.test_s7_e2e",
+ "test_s7_e2e",
]:
e2e = sys.modules.get(mod_name)
if e2e is not None:
diff --git a/tests/test_coverage_gaps.py b/tests/test_coverage_gaps.py
index 3802b4c7..cbd19b06 100644
--- a/tests/test_coverage_gaps.py
+++ b/tests/test_coverage_gaps.py
@@ -19,8 +19,8 @@
from snap7.error import S7ConnectionError
from snap7.server import Server
from snap7.type import SrvArea
-from snap7.s7commplus.connection import S7CommPlusConnection
-from snap7.s7commplus.legitimation import (
+from s7.connection import S7CommPlusConnection
+from s7.legitimation import (
LegitimationState,
build_legacy_response,
)
@@ -158,8 +158,8 @@ def test_legitimation_state_rotate_changes_key(self) -> None:
# S7CommPlus async client
# ============================================================================
-from snap7.s7commplus.server import S7CommPlusServer # noqa: E402
-from snap7.s7commplus.async_client import S7CommPlusAsyncClient # noqa: E402
+from s7._s7commplus_server import S7CommPlusServer # noqa: E402
+from s7._s7commplus_async_client import S7CommPlusAsyncClient # noqa: E402
ASYNC_TEST_PORT = 11125
@@ -227,13 +227,12 @@ async def test_properties(self, async_server: S7CommPlusServer) -> None:
finally:
await client.disconnect()
- async def test_using_legacy_fallback_property(self, async_server: S7CommPlusServer) -> None:
+ async def test_session_setup_ok_property(self, async_server: S7CommPlusServer) -> None:
client = S7CommPlusAsyncClient()
await client.connect("127.0.0.1", port=ASYNC_TEST_PORT)
try:
- # Server supports S7CommPlus data ops, so no fallback
- # (or fallback, depending on server implementation — just check the property works)
- assert isinstance(client.using_legacy_fallback, bool)
+ # Server supports S7CommPlus data ops, so session setup should succeed
+ assert isinstance(client.session_setup_ok, bool)
finally:
await client.disconnect()
diff --git a/tests/test_s7commplus_codec.py b/tests/test_s7_codec.py
similarity index 99%
rename from tests/test_s7commplus_codec.py
rename to tests/test_s7_codec.py
index 9b03881e..2bce0de0 100644
--- a/tests/test_s7commplus_codec.py
+++ b/tests/test_s7_codec.py
@@ -3,7 +3,7 @@
import struct
import pytest
-from snap7.s7commplus.codec import (
+from s7.codec import (
encode_header,
decode_header,
encode_request_header,
@@ -35,8 +35,8 @@
encode_object_qualifier,
_pvalue_element_size,
)
-from snap7.s7commplus.protocol import PROTOCOL_ID, DataType, Opcode, FunctionCode, Ids
-from snap7.s7commplus.vlq import encode_uint32_vlq, encode_int32_vlq, encode_uint64_vlq, encode_int64_vlq
+from s7.protocol import PROTOCOL_ID, DataType, Opcode, FunctionCode, Ids
+from s7.vlq import encode_uint32_vlq, encode_int32_vlq, encode_uint64_vlq, encode_int64_vlq
class TestFrameHeader:
diff --git a/tests/test_s7commplus_e2e.py b/tests/test_s7_e2e.py
similarity index 97%
rename from tests/test_s7commplus_e2e.py
rename to tests/test_s7_e2e.py
index f8c8bf0d..3ab8bbca 100644
--- a/tests/test_s7commplus_e2e.py
+++ b/tests/test_s7_e2e.py
@@ -2,7 +2,7 @@
These tests require a real PLC connection. Run with:
- pytest tests/test_s7commplus_e2e.py --e2e --plc-ip=YOUR_PLC_IP
+ pytest tests/test_s7_e2e.py --e2e --plc-ip=YOUR_PLC_IP
Available options:
--e2e Enable e2e tests (required)
@@ -47,14 +47,14 @@
import pytest
-from snap7.s7commplus.client import S7CommPlusClient
+from s7._s7commplus_client import S7CommPlusClient
-# Enable DEBUG logging for all s7commplus modules so we get full hex dumps
+# Enable DEBUG logging for all s7 modules so we get full hex dumps
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
-for _mod in ["snap7.s7commplus.client", "snap7.s7commplus.connection", "snap7.connection"]:
+for _mod in ["s7._s7commplus_client", "s7.connection", "snap7.connection"]:
logging.getLogger(_mod).setLevel(logging.DEBUG)
# =============================================================================
@@ -496,8 +496,8 @@ def test_diag_raw_get_multi_variables(self) -> None:
This tries several payload encodings to see which ones the PLC accepts.
"""
- from snap7.s7commplus.protocol import FunctionCode
- from snap7.s7commplus.vlq import encode_uint32_vlq
+ from s7.protocol import FunctionCode
+ from s7.vlq import encode_uint32_vlq
print(f"\n{'=' * 60}")
print("DIAGNOSTIC: Raw GetMultiVariables payload experiments")
@@ -523,7 +523,7 @@ def test_diag_raw_get_multi_variables(self) -> None:
# Try to parse return code
if len(response) > 0:
- from snap7.s7commplus.vlq import decode_uint32_vlq
+ from s7.vlq import decode_uint32_vlq
rc, consumed = decode_uint32_vlq(response, 0)
print(f" Return code (VLQ): {rc} (0x{rc:X})")
@@ -537,7 +537,7 @@ def test_diag_raw_get_multi_variables(self) -> None:
def test_diag_raw_set_variable(self) -> None:
"""Try SetVariable (0x04F2) instead of SetMultiVariables to see if PLC responds differently."""
- from snap7.s7commplus.protocol import FunctionCode
+ from s7.protocol import FunctionCode
print(f"\n{'=' * 60}")
print("DIAGNOSTIC: Raw SetVariable / GetVariable experiments")
@@ -563,8 +563,8 @@ def test_diag_raw_set_variable(self) -> None:
def test_diag_explore_then_read(self) -> None:
"""Explore first to discover object IDs, then try reading using those IDs."""
- from snap7.s7commplus.protocol import FunctionCode, ElementID
- from snap7.s7commplus.vlq import encode_uint32_vlq, decode_uint32_vlq
+ from s7.protocol import FunctionCode, ElementID
+ from s7.vlq import encode_uint32_vlq, decode_uint32_vlq
print(f"\n{'=' * 60}")
print("DIAGNOSTIC: Explore -> extract object IDs -> try reading")
diff --git a/tests/test_s7commplus_server.py b/tests/test_s7_server.py
similarity index 97%
rename from tests/test_s7commplus_server.py
rename to tests/test_s7_server.py
index 2f08f575..3b980e13 100644
--- a/tests/test_s7commplus_server.py
+++ b/tests/test_s7_server.py
@@ -7,10 +7,10 @@
import pytest
import asyncio
-from snap7.s7commplus.server import S7CommPlusServer, CPUState, DataBlock
-from snap7.s7commplus.client import S7CommPlusClient
-from snap7.s7commplus.async_client import S7CommPlusAsyncClient
-from snap7.s7commplus.protocol import ProtocolVersion
+from s7._s7commplus_server import S7CommPlusServer, CPUState, DataBlock
+from s7._s7commplus_client import S7CommPlusClient
+from s7._s7commplus_async_client import S7CommPlusAsyncClient
+from s7.protocol import ProtocolVersion
# Use a high port to avoid conflicts
TEST_PORT = 11120
diff --git a/tests/test_async_tls.py b/tests/test_s7_tls.py
similarity index 97%
rename from tests/test_async_tls.py
rename to tests/test_s7_tls.py
index 8c1bd007..7abc7f79 100644
--- a/tests/test_async_tls.py
+++ b/tests/test_s7_tls.py
@@ -8,9 +8,9 @@
import pytest
from snap7.error import S7ConnectionError
-from snap7.s7commplus.async_client import S7CommPlusAsyncClient
-from snap7.s7commplus.server import S7CommPlusServer
-from snap7.s7commplus.protocol import ProtocolVersion
+from s7._s7commplus_async_client import S7CommPlusAsyncClient
+from s7._s7commplus_server import S7CommPlusServer
+from s7.protocol import ProtocolVersion
TEST_PORT_V2 = 11130
TEST_PORT_V2_TLS = 11131
diff --git a/tests/test_s7commplus_unit.py b/tests/test_s7_unit.py
similarity index 97%
rename from tests/test_s7commplus_unit.py
rename to tests/test_s7_unit.py
index f7c5e57e..f11f0ae3 100644
--- a/tests/test_s7commplus_unit.py
+++ b/tests/test_s7_unit.py
@@ -3,17 +3,17 @@
import struct
import pytest
-from snap7.s7commplus.client import (
+from s7._s7commplus_client import (
S7CommPlusClient,
_build_read_payload,
_parse_read_response,
_build_write_payload,
_parse_write_response,
)
-from snap7.s7commplus.codec import encode_pvalue_blob
-from snap7.s7commplus.connection import S7CommPlusConnection, _element_size
-from snap7.s7commplus.protocol import DataType, ElementID, ObjectId
-from snap7.s7commplus.vlq import (
+from s7.codec import encode_pvalue_blob
+from s7.connection import S7CommPlusConnection, _element_size
+from s7.protocol import DataType, ElementID, ObjectId
+from s7.vlq import (
encode_uint32_vlq,
encode_uint64_vlq,
encode_int32_vlq,
@@ -269,7 +269,7 @@ def test_lword(self, conn: S7CommPlusConnection) -> None:
assert new_offset == len(vlq)
def test_lint(self, conn: S7CommPlusConnection) -> None:
- from snap7.s7commplus.vlq import encode_int64_vlq
+ from s7.vlq import encode_int64_vlq
vlq = encode_int64_vlq(-(2**40))
new_offset = conn._skip_typed_value(vlq, 0, DataType.LINT, 0x00)
@@ -288,7 +288,7 @@ def test_timestamp(self, conn: S7CommPlusConnection) -> None:
assert conn._skip_typed_value(data, 0, DataType.TIMESTAMP, 0x00) == 8
def test_timespan(self, conn: S7CommPlusConnection) -> None:
- from snap7.s7commplus.vlq import encode_int64_vlq
+ from s7.vlq import encode_int64_vlq
vlq = encode_int64_vlq(5000)
# TIMESPAN uses uint64_vlq for skipping in _skip_typed_value
@@ -430,7 +430,7 @@ def test_properties_not_connected(self) -> None:
assert client.connected is False
assert client.protocol_version == 0
assert client.session_id == 0
- assert client.using_legacy_fallback is False
+ assert client.session_setup_ok is False
def test_db_read_not_connected(self) -> None:
client = S7CommPlusClient()
diff --git a/tests/test_s7commplus_v2.py b/tests/test_s7_v2.py
similarity index 94%
rename from tests/test_s7commplus_v2.py
rename to tests/test_s7_v2.py
index 1a9fc8e7..e8a2250f 100644
--- a/tests/test_s7commplus_v2.py
+++ b/tests/test_s7_v2.py
@@ -8,20 +8,20 @@
import pytest
-from snap7.s7commplus.protocol import (
+from s7.protocol import (
FunctionCode,
LegitimationId,
ProtocolVersion,
READ_FUNCTION_CODES,
)
-from snap7.s7commplus.legitimation import (
+from s7.legitimation import (
LegitimationState,
build_legacy_response,
derive_legitimation_key,
_build_legitimation_payload,
)
-from snap7.s7commplus.vlq import encode_uint32_vlq, decode_uint32_vlq
-from snap7.s7commplus.connection import S7CommPlusConnection
+from s7.vlq import encode_uint32_vlq, decode_uint32_vlq
+from s7.connection import S7CommPlusConnection
class TestReadFunctionCodes:
@@ -243,7 +243,7 @@ class TestBuildNewResponse:
"""Test AES-256-CBC legitimation response building."""
def test_new_response_returns_bytes(self) -> None:
- from snap7.s7commplus.legitimation import build_new_response
+ from s7.legitimation import build_new_response
result = build_new_response(
password="test",
@@ -253,7 +253,7 @@ def test_new_response_returns_bytes(self) -> None:
assert isinstance(result, bytes)
def test_new_response_is_aes_block_aligned(self) -> None:
- from snap7.s7commplus.legitimation import build_new_response
+ from s7.legitimation import build_new_response
result = build_new_response(
password="test",
@@ -264,7 +264,7 @@ def test_new_response_is_aes_block_aligned(self) -> None:
assert len(result) % 16 == 0
def test_new_response_different_passwords_differ(self) -> None:
- from snap7.s7commplus.legitimation import build_new_response
+ from s7.legitimation import build_new_response
challenge = b"\xab" * 16
oms = b"\xcd" * 32
@@ -273,7 +273,7 @@ def test_new_response_different_passwords_differ(self) -> None:
assert r1 != r2
def test_new_response_different_secrets_differ(self) -> None:
- from snap7.s7commplus.legitimation import build_new_response
+ from s7.legitimation import build_new_response
challenge = b"\xab" * 16
r1 = build_new_response("test", challenge, b"\x00" * 32)
@@ -281,7 +281,7 @@ def test_new_response_different_secrets_differ(self) -> None:
assert r1 != r2
def test_new_response_with_username(self) -> None:
- from snap7.s7commplus.legitimation import build_new_response
+ from s7.legitimation import build_new_response
result = build_new_response(
password="test",
@@ -294,7 +294,7 @@ def test_new_response_with_username(self) -> None:
def test_new_response_decryptable(self) -> None:
"""Verify the response can be decrypted back to the original payload."""
- from snap7.s7commplus.legitimation import (
+ from s7.legitimation import (
build_new_response,
derive_legitimation_key,
_build_legitimation_payload,
diff --git a/tests/test_s7commplus_vlq.py b/tests/test_s7_vlq.py
similarity index 99%
rename from tests/test_s7commplus_vlq.py
rename to tests/test_s7_vlq.py
index d7dbb596..70ed5917 100644
--- a/tests/test_s7commplus_vlq.py
+++ b/tests/test_s7_vlq.py
@@ -2,7 +2,7 @@
import pytest
-from snap7.s7commplus.vlq import (
+from s7.vlq import (
encode_uint32_vlq,
decode_uint32_vlq,
encode_int32_vlq,
diff --git a/tox.ini b/tox.ini
index 4b10afdb..0b56471b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -19,18 +19,18 @@ commands =
[testenv:mypy]
basepython = python3.13
extras = test
-commands = mypy {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example
+commands = mypy {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example
[testenv:lint-ruff]
basepython = python3.13
extras = test
commands =
- ruff check {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example
- ruff format --diff {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example
+ ruff check {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example
+ ruff format --diff {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example
[testenv:ruff]
basepython = python3.13
extras = test
commands =
- ruff format {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example
- ruff check --fix {toxinidir}/snap7 {toxinidir}/tests {toxinidir}/example
+ ruff format {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example
+ ruff check --fix {toxinidir}/snap7 {toxinidir}/s7 {toxinidir}/tests {toxinidir}/example