Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 272 additions & 5 deletions snap7/s7commplus/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import asyncio
import logging
import ssl
import struct
from typing import Any, Optional

Expand Down Expand Up @@ -47,7 +48,9 @@
class S7CommPlusAsyncClient:
"""Async S7CommPlus client for S7-1200/1500 PLCs.

Supports V1 and V2 protocols. V3/TLS planned for future.
Supports all S7CommPlus protocol versions (V1/V2/V3/TLS). The protocol
version is auto-detected from the PLC's CreateObject response during
connection setup.

Uses asyncio for all I/O operations and asyncio.Lock for
concurrent safety when shared between multiple coroutines.
Expand Down Expand Up @@ -76,6 +79,11 @@ def __init__(self) -> None:
self._integrity_id_write: int = 0
self._with_integrity_id: bool = False

# TLS state
self._tls_active: bool = False
self._oms_secret: Optional[bytes] = None
self._server_session_version: Optional[int] = None

@property
def connected(self) -> bool:
if self._use_legacy_data and self._legacy_client is not None:
Expand All @@ -95,15 +103,37 @@ def using_legacy_fallback(self) -> bool:
"""Whether the client is using legacy S7 protocol for data operations."""
return self._use_legacy_data

@property
def tls_active(self) -> bool:
"""Whether TLS is active on the connection."""
return self._tls_active

@property
def oms_secret(self) -> Optional[bytes]:
"""OMS exporter secret from TLS session (None if TLS not active)."""
return self._oms_secret

async def connect(
self,
host: str,
port: int = 102,
rack: int = 0,
slot: int = 1,
*,
use_tls: bool = False,
tls_cert: Optional[str] = None,
tls_key: Optional[str] = None,
tls_ca: Optional[str] = None,
) -> None:
"""Connect to an S7-1200/1500 PLC.

The connection sequence:
1. COTP connection (same as legacy S7comm)
2. InitSSL handshake
3. TLS activation (if use_tls=True, required for V2)
4. CreateObject to establish S7CommPlus session
5. Enable IntegrityId tracking (V2+)

If the PLC does not support S7CommPlus data operations, a secondary
legacy S7 connection is established transparently for data access.

Expand All @@ -112,6 +142,10 @@ async def connect(
port: TCP port (default 102)
rack: PLC rack number
slot: PLC slot number
use_tls: Whether to activate TLS after InitSSL.
tls_cert: Path to client TLS certificate (PEM)
tls_key: Path to client private key (PEM)
tls_ca: Path to CA certificate for PLC verification (PEM)
"""
self._host = host
self._port = port
Expand All @@ -122,18 +156,41 @@ async def connect(
self._reader, self._writer = await asyncio.open_connection(host, port)

try:
# COTP handshake with S7CommPlus TSAP values
# Step 1: COTP handshake with S7CommPlus TSAP values
await self._cotp_connect(S7COMMPLUS_LOCAL_TSAP, S7COMMPLUS_REMOTE_TSAP)

# InitSSL handshake
# Step 2: InitSSL handshake
await self._init_ssl()

# S7CommPlus session setup
# Step 3: TLS activation (between InitSSL and CreateObject)
if use_tls:
await self._activate_tls(tls_cert=tls_cert, tls_key=tls_key, tls_ca=tls_ca)

# Step 4: S7CommPlus session setup
await self._create_session()

# Step 5: Version-specific post-setup
if self._protocol_version >= ProtocolVersion.V3:
if not use_tls:
logger.warning(
"PLC reports V3 protocol but TLS is not enabled. Connection may not work without use_tls=True."
)
elif self._protocol_version == ProtocolVersion.V2:
if not self._tls_active:
from ..error import S7ConnectionError

raise S7ConnectionError("PLC reports V2 protocol but TLS is not active. V2 requires TLS. Use use_tls=True.")
# Enable IntegrityId tracking for V2+
self._with_integrity_id = True
self._integrity_id_read = 0
self._integrity_id_write = 0
logger.info("V2 IntegrityId tracking enabled")

self._connected = True
logger.info(
f"Async S7CommPlus connected to {host}:{port}, version=V{self._protocol_version}, session={self._session_id}"
f"Async S7CommPlus connected to {host}:{port}, "
f"version=V{self._protocol_version}, session={self._session_id}, "
f"tls={self._tls_active}"
)

# Probe S7CommPlus data operations
Expand All @@ -145,6 +202,213 @@ async def connect(
await self.disconnect()
raise

async def authenticate(self, password: str, username: str = "") -> None:
"""Perform PLC password authentication (legitimation).

Must be called after connect() and before data operations on
password-protected PLCs. Requires TLS to be active (V2+).

The method auto-detects legacy vs new legitimation based on
the PLC's firmware version.

Args:
password: PLC password
username: Username for new-style auth (optional)

Raises:
S7ConnectionError: If not connected, TLS not active, or auth fails
"""
if not self._connected:
from ..error import S7ConnectionError

raise S7ConnectionError("Not connected")

if not self._tls_active or self._oms_secret is None:
from ..error import S7ConnectionError

raise S7ConnectionError("Legitimation requires TLS. Connect with use_tls=True.")

# Step 1: Get challenge from PLC
challenge = await self._get_legitimation_challenge()
logger.info(f"Received legitimation challenge ({len(challenge)} bytes)")

# Step 2: Build response (auto-detect legacy vs new)
from .legitimation import build_legacy_response, build_new_response

if username:
response_data = build_new_response(password, challenge, self._oms_secret, username)
await self._send_legitimation_new(response_data)
else:
try:
response_data = build_new_response(password, challenge, self._oms_secret, "")
await self._send_legitimation_new(response_data)
except NotImplementedError:
response_data = build_legacy_response(password, challenge)
await self._send_legitimation_legacy(response_data)

logger.info("PLC legitimation completed successfully")

async def _activate_tls(
self,
tls_cert: Optional[str] = None,
tls_key: Optional[str] = None,
tls_ca: Optional[str] = None,
) -> None:
"""Activate TLS 1.3 over the COTP connection.

Called after InitSSL and before CreateObject. Uses asyncio's
start_tls() to upgrade the existing connection to TLS.

Args:
tls_cert: Path to client TLS certificate (PEM)
tls_key: Path to client private key (PEM)
tls_ca: Path to CA certificate for PLC verification (PEM)
"""
if self._writer is None:
from ..error import S7ConnectionError

raise S7ConnectionError("Cannot activate TLS: not connected")

ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_3

# TLS 1.3 ciphersuites are configured differently from TLS 1.2
if hasattr(ctx, "set_ciphersuites"):
ctx.set_ciphersuites("TLS_AES_256_GCM_SHA384:TLS_AES_128_GCM_SHA256")
# If set_ciphersuites not available, TLS 1.3 uses its mandatory defaults

if tls_cert and tls_key:
ctx.load_cert_chain(tls_cert, tls_key)

if tls_ca:
ctx.load_verify_locations(tls_ca)
else:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

# Upgrade existing transport to TLS using asyncio start_tls
transport = self._writer.transport
loop = asyncio.get_event_loop()
new_transport = await loop.start_tls(
transport,
transport.get_protocol(),
ctx,
server_hostname=self._host,
)

# Update reader/writer to use the TLS transport
self._writer._transport = new_transport
self._tls_active = True

# Extract OMS exporter secret for legitimation key derivation
if new_transport is None:
from ..error import S7ConnectionError

raise S7ConnectionError("TLS handshake failed: no transport returned")

ssl_object = new_transport.get_extra_info("ssl_object")
if ssl_object is not None:
try:
self._oms_secret = ssl_object.export_keying_material("EXPERIMENTAL_OMS", 32, None)
logger.debug("OMS exporter secret extracted from TLS session")
except (AttributeError, ssl.SSLError) as e:
logger.warning(f"Could not extract OMS exporter secret: {e}")
self._oms_secret = None

logger.info("TLS 1.3 activated on async COTP connection")

async def _get_legitimation_challenge(self) -> bytes:
"""Request legitimation challenge from PLC.

Sends GetVarSubStreamed with address ServerSessionRequest (303).

Returns:
Challenge bytes from PLC
"""
from .protocol import LegitimationId, DataType as DT

payload = bytearray()
payload += struct.pack(">I", self._session_id)
payload += encode_uint32_vlq(1)
payload += encode_uint32_vlq(1)
payload += encode_uint32_vlq(LegitimationId.SERVER_SESSION_REQUEST)
payload += struct.pack(">I", 0)

resp_payload = await self._send_request(FunctionCode.GET_VAR_SUBSTREAMED, bytes(payload))

offset = 0
return_value, consumed = decode_uint64_vlq(resp_payload, offset)
offset += consumed

if return_value != 0:
from ..error import S7ConnectionError

raise S7ConnectionError(f"GetVarSubStreamed for challenge failed: return_value={return_value}")

if offset + 2 > len(resp_payload):
from ..error import S7ConnectionError

raise S7ConnectionError("Challenge response too short")

_flags = resp_payload[offset]
datatype = resp_payload[offset + 1]
offset += 2

if datatype == DT.BLOB:
length, consumed = decode_uint32_vlq(resp_payload, offset)
offset += consumed
return bytes(resp_payload[offset : offset + length])
else:
count, consumed = decode_uint32_vlq(resp_payload, offset)
offset += consumed
return bytes(resp_payload[offset : offset + count])

async def _send_legitimation_new(self, encrypted_response: bytes) -> None:
"""Send new-style legitimation response (AES-256-CBC encrypted)."""
from .protocol import LegitimationId, DataType as DT

payload = bytearray()
payload += struct.pack(">I", self._session_id)
payload += encode_uint32_vlq(1)
payload += encode_uint32_vlq(LegitimationId.LEGITIMATE)
payload += bytes([0x00, DT.BLOB])
payload += encode_uint32_vlq(len(encrypted_response))
payload += encrypted_response
payload += struct.pack(">I", 0)

resp_payload = await self._send_request(FunctionCode.SET_VARIABLE, bytes(payload))

if len(resp_payload) >= 1:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value < 0:
from ..error import S7ConnectionError

raise S7ConnectionError(f"Legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"New legitimation return_value={return_value}")

async def _send_legitimation_legacy(self, response: bytes) -> None:
"""Send legacy legitimation response (SHA-1 XOR)."""
from .protocol import LegitimationId, DataType as DT

payload = bytearray()
payload += struct.pack(">I", self._session_id)
payload += encode_uint32_vlq(1)
payload += encode_uint32_vlq(LegitimationId.SERVER_SESSION_RESPONSE)
payload += bytes([0x10, DT.USINT]) # flags=0x10 (array)
payload += encode_uint32_vlq(len(response))
payload += response
payload += struct.pack(">I", 0)

resp_payload = await self._send_request(FunctionCode.SET_VARIABLE, bytes(payload))

if len(resp_payload) >= 1:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value < 0:
from ..error import S7ConnectionError

raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"Legacy legitimation return_value={return_value}")

async def _probe_s7commplus_data(self) -> bool:
"""Test if the PLC supports S7CommPlus data operations."""
try:
Expand Down Expand Up @@ -198,6 +462,9 @@ async def disconnect(self) -> None:
self._with_integrity_id = False
self._integrity_id_read = 0
self._integrity_id_write = 0
self._tls_active = False
self._oms_secret = None
self._server_session_version = None

if self._writer:
try:
Expand Down
6 changes: 5 additions & 1 deletion snap7/s7commplus/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,11 @@ def _setup_ssl_context(
"""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
ctx.set_ciphers("TLS_AES_256_GCM_SHA384:TLS_AES_128_GCM_SHA256")

# TLS 1.3 ciphersuites are configured differently from TLS 1.2
if hasattr(ctx, "set_ciphersuites"):
ctx.set_ciphersuites("TLS_AES_256_GCM_SHA384:TLS_AES_128_GCM_SHA256")
# If set_ciphersuites not available, TLS 1.3 uses its mandatory defaults

if cert_path and key_path:
ctx.load_cert_chain(cert_path, key_path)
Expand Down
Loading
Loading