Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 112 additions & 24 deletions snap7/s7commplus/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ async def connect(
if use_tls:
await self._activate_tls(tls_cert=tls_cert, tls_key=tls_key, tls_ca=tls_ca)

# Step 4: S7CommPlus session setup
# Step 4: S7CommPlus session setup (CreateObject)
await self._create_session()

# Step 5: Version-specific post-setup
# Step 5: Version-specific validation (before session setup handshake)
if self._protocol_version >= ProtocolVersion.V3:
if not use_tls:
logger.warning(
Expand All @@ -187,15 +187,22 @@ async def connect(
logger.info("V2 IntegrityId tracking enabled")

self._connected = True

# Step 6: Session setup - echo ServerSessionVersion back to PLC
if self._server_session_version is not None:
session_setup_ok = await self._setup_session()
else:
logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete")
session_setup_ok = False
logger.info(
f"Async S7CommPlus connected to {host}:{port}, "
f"version=V{self._protocol_version}, session={self._session_id}, "
f"tls={self._tls_active}"
)

# Probe S7CommPlus data operations
if not await self._probe_s7commplus_data():
logger.info("S7CommPlus data operations not supported, falling back to legacy S7 protocol")
# Check if S7CommPlus session setup succeeded
if not session_setup_ok:
logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol")
await self._setup_legacy_fallback()

except Exception:
Expand Down Expand Up @@ -409,25 +416,6 @@ async def _send_legitimation_legacy(self, response: bytes) -> None:
raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"Legacy legitimation return_value={return_value}")

async def _probe_s7commplus_data(self) -> bool:
"""Test if the PLC supports S7CommPlus data operations."""
try:
payload = struct.pack(">I", 0) + encode_uint32_vlq(0) + encode_uint32_vlq(0)
payload += encode_object_qualifier()
payload += struct.pack(">I", 0)

response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload)
if len(response) < 1:
return False
return_value, _ = decode_uint64_vlq(response, 0)
if return_value != 0:
logger.debug(f"S7CommPlus probe: PLC returned error {return_value}")
return False
return True
except Exception as e:
logger.debug(f"S7CommPlus probe failed: {e}")
return False

async def _setup_legacy_fallback(self) -> None:
"""Establish a secondary legacy S7 connection for data operations."""
from ..client import Client
Expand Down Expand Up @@ -737,6 +725,106 @@ async def _create_session(self) -> None:
self._session_id = struct.unpack_from(">I", response, 9)[0]
self._protocol_version = version

# Parse ServerSessionVersion from response payload
self._parse_create_object_response(response[14:])

def _parse_create_object_response(self, payload: bytes) -> None:
"""Parse CreateObject response to extract ServerSessionVersion (attribute 306)."""
offset = 0
while offset < len(payload):
tag = payload[offset]

if tag == ElementID.ATTRIBUTE:
offset += 1
if offset >= len(payload):
break
attr_id, consumed = decode_uint32_vlq(payload, offset)
offset += consumed

if attr_id == ObjectId.SERVER_SESSION_VERSION:
if offset + 2 > len(payload):
break
_flags = payload[offset]
datatype = payload[offset + 1]
offset += 2
if datatype in (DataType.UDINT, DataType.DWORD):
value, consumed = decode_uint32_vlq(payload, offset)
offset += consumed
self._server_session_version = value
logger.info(f"ServerSessionVersion = {value}")
return
else:
# Skip attribute value
if offset + 2 > len(payload):
break
_flags = payload[offset]
_dt = payload[offset + 1]
offset += 2
# Best-effort skip: advance past common VLQ-encoded values
if offset < len(payload):
_, consumed = decode_uint32_vlq(payload, offset)
offset += consumed

elif tag == ElementID.START_OF_OBJECT:
offset += 1
if offset + 4 > len(payload):
break
offset += 4 # RelationId
_, consumed = decode_uint32_vlq(payload, offset)
offset += consumed # ClassId
_, consumed = decode_uint32_vlq(payload, offset)
offset += consumed # ClassFlags
_, consumed = decode_uint32_vlq(payload, offset)
offset += consumed # AttributeId

elif tag == ElementID.TERMINATING_OBJECT:
offset += 1
elif tag == 0x00:
offset += 1
else:
offset += 1

logger.debug("ServerSessionVersion not found in CreateObject response")

async def _setup_session(self) -> bool:
"""Echo ServerSessionVersion back to the PLC via SetMultiVariables.

Without this step, the PLC rejects all subsequent data operations
with ERROR2 (0x05A9).

Returns:
True if session setup succeeded.
"""
if self._server_session_version is None:
return False

payload = bytearray()
payload += struct.pack(">I", self._session_id)
payload += encode_uint32_vlq(1) # Item count
payload += encode_uint32_vlq(1) # Total address field count
payload += encode_uint32_vlq(ObjectId.SERVER_SESSION_VERSION)
payload += encode_uint32_vlq(1) # ItemNumber
payload += bytes([0x00, DataType.UDINT])
payload += encode_uint32_vlq(self._server_session_version)
payload += bytes([0x00]) # Fill byte
payload += encode_object_qualifier()
payload += struct.pack(">I", 0) # Trailing padding

try:
resp_payload = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, bytes(payload))
if len(resp_payload) >= 1:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value != 0:
logger.warning(f"SetupSession: PLC returned error {return_value}")
return False
else:
logger.info("Session setup completed successfully")
return True
return False
except Exception as e:
logger.warning(f"SetupSession failed: {e}")
return False

async def _delete_session(self) -> None:
"""Send DeleteObject to close the session."""
seq_num = self._next_sequence_number()
Expand Down
41 changes: 5 additions & 36 deletions snap7/s7commplus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,44 +148,13 @@ def connect(
logger.info("Performing PLC legitimation (password authentication)")
self._connection.authenticate(password)

# Probe S7CommPlus data operations with a minimal request
if not self._probe_s7commplus_data():
logger.info("S7CommPlus data operations not supported, falling back to legacy S7 protocol")
# Check if S7CommPlus session setup succeeded. If the PLC accepted the
# ServerSessionVersion echo, data operations should work. If it returned
# an error (e.g. ERROR2), fall back to legacy S7 protocol.
if not self._connection.session_setup_ok:
logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol")
self._setup_legacy_fallback()

def _probe_s7commplus_data(self) -> bool:
"""Test if the PLC supports S7CommPlus data operations.

Sends a minimal GetMultiVariables request with zero items. If the PLC
responds with ERROR2 or a non-zero return code, data operations are
not supported.

Returns:
True if S7CommPlus data operations work.
"""
if self._connection is None:
return False

try:
# Send a minimal GetMultiVariables with 0 items
payload = struct.pack(">I", 0) + encode_uint32_vlq(0) + encode_uint32_vlq(0)
payload += encode_object_qualifier()
payload += struct.pack(">I", 0)

response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload)

# Check if we got a valid response (return value = 0)
if len(response) < 1:
return False
return_value, _ = decode_uint64_vlq(response, 0)
if return_value != 0:
logger.debug(f"S7CommPlus probe: PLC returned error {return_value}")
return False
return True
except Exception as e:
logger.debug(f"S7CommPlus probe failed: {e}")
return False

def _setup_legacy_fallback(self) -> None:
"""Establish a secondary legacy S7 connection for data operations."""
from ..client import Client
Expand Down
20 changes: 17 additions & 3 deletions snap7/s7commplus/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def __init__(
self._tls_active: bool = False
self._connected = False
self._server_session_version: Optional[int] = None
self._session_setup_ok: bool = False

# V2+ IntegrityId tracking
self._integrity_id_read: int = 0
Expand Down Expand Up @@ -150,6 +151,11 @@ def integrity_id_write(self) -> int:
"""Current write IntegrityId counter (V2+)."""
return self._integrity_id_write

@property
def session_setup_ok(self) -> bool:
"""Whether the session setup (ServerSessionVersion echo) succeeded."""
return self._session_setup_ok

@property
def oms_secret(self) -> Optional[bytes]:
"""OMS exporter secret from TLS session (for legitimation)."""
Expand Down Expand Up @@ -197,9 +203,10 @@ def connect(

# Step 5: Session setup - echo ServerSessionVersion back to PLC
if self._server_session_version is not None:
self._setup_session()
self._session_setup_ok = self._setup_session()
else:
logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete")
self._session_setup_ok = False

# Step 6: Version-specific post-setup
if self._protocol_version >= ProtocolVersion.V3:
Expand Down Expand Up @@ -409,6 +416,7 @@ def disconnect(self) -> None:
pass

self._connected = False
self._session_setup_ok = False
self._tls_active = False
self._ssl_socket = None
self._oms_secret = None
Expand Down Expand Up @@ -836,17 +844,20 @@ def _skip_typed_value(self, data: bytes, offset: int, datatype: int, flags: int)
# Unknown type - can't skip reliably
return offset

def _setup_session(self) -> None:
def _setup_session(self) -> bool:
"""Send SetMultiVariables to echo ServerSessionVersion back to the PLC.

This completes the session handshake by writing the ServerSessionVersion
attribute back to the session object. Without this step, the PLC rejects
all subsequent data operations with ERROR2 (0x05A9).

Returns:
True if session setup succeeded (return_value == 0).

Reference: thomas-v2/S7CommPlusDriver SetSessionSetupData
"""
if self._server_session_version is None:
return
return False

seq_num = self._next_sequence_number()

Expand Down Expand Up @@ -913,8 +924,11 @@ def _setup_session(self) -> None:
return_value, _ = decode_uint64_vlq(resp_payload, 0)
if return_value != 0:
logger.warning(f"SetupSession: PLC returned error {return_value}")
return False
else:
logger.info("Session setup completed successfully")
return True
return False

def _delete_session(self) -> None:
"""Send DeleteObject to close the session."""
Expand Down
Loading