diff --git a/snap7/s7commplus/async_client.py b/snap7/s7commplus/async_client.py index 4ace2241..70f831e1 100644 --- a/snap7/s7commplus/async_client.py +++ b/snap7/s7commplus/async_client.py @@ -166,10 +166,10 @@ async def connect( if use_tls: await self._activate_tls(tls_cert=tls_cert, tls_key=tls_key, tls_ca=tls_ca) - # Step 4: S7CommPlus session setup + # Step 4: S7CommPlus session setup (CreateObject) await self._create_session() - # Step 5: Version-specific post-setup + # Step 5: Version-specific validation (before session setup handshake) if self._protocol_version >= ProtocolVersion.V3: if not use_tls: logger.warning( @@ -187,15 +187,22 @@ async def connect( logger.info("V2 IntegrityId tracking enabled") self._connected = True + + # Step 6: Session setup - echo ServerSessionVersion back to PLC + if self._server_session_version is not None: + session_setup_ok = await self._setup_session() + else: + logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete") + session_setup_ok = False logger.info( f"Async S7CommPlus connected to {host}:{port}, " f"version=V{self._protocol_version}, session={self._session_id}, " f"tls={self._tls_active}" ) - # Probe S7CommPlus data operations - if not await self._probe_s7commplus_data(): - logger.info("S7CommPlus data operations not supported, falling back to legacy S7 protocol") + # Check if S7CommPlus session setup succeeded + if not session_setup_ok: + logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") await self._setup_legacy_fallback() except Exception: @@ -409,25 +416,6 @@ async def _send_legitimation_legacy(self, response: bytes) -> None: raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}") logger.debug(f"Legacy legitimation return_value={return_value}") - async def _probe_s7commplus_data(self) -> bool: - """Test if the PLC supports S7CommPlus data operations.""" - try: - payload = struct.pack(">I", 0) + encode_uint32_vlq(0) + encode_uint32_vlq(0) - payload += encode_object_qualifier() - payload += struct.pack(">I", 0) - - response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - if len(response) < 1: - return False - return_value, _ = decode_uint64_vlq(response, 0) - if return_value != 0: - logger.debug(f"S7CommPlus probe: PLC returned error {return_value}") - return False - return True - except Exception as e: - logger.debug(f"S7CommPlus probe failed: {e}") - return False - async def _setup_legacy_fallback(self) -> None: """Establish a secondary legacy S7 connection for data operations.""" from ..client import Client @@ -737,6 +725,106 @@ async def _create_session(self) -> None: self._session_id = struct.unpack_from(">I", response, 9)[0] self._protocol_version = version + # Parse ServerSessionVersion from response payload + self._parse_create_object_response(response[14:]) + + def _parse_create_object_response(self, payload: bytes) -> None: + """Parse CreateObject response to extract ServerSessionVersion (attribute 306).""" + offset = 0 + while offset < len(payload): + tag = payload[offset] + + if tag == ElementID.ATTRIBUTE: + offset += 1 + if offset >= len(payload): + break + attr_id, consumed = decode_uint32_vlq(payload, offset) + offset += consumed + + if attr_id == ObjectId.SERVER_SESSION_VERSION: + if offset + 2 > len(payload): + break + _flags = payload[offset] + datatype = payload[offset + 1] + offset += 2 + if datatype in (DataType.UDINT, DataType.DWORD): + value, consumed = decode_uint32_vlq(payload, offset) + offset += consumed + self._server_session_version = value + logger.info(f"ServerSessionVersion = {value}") + return + else: + # Skip attribute value + if offset + 2 > len(payload): + break + _flags = payload[offset] + _dt = payload[offset + 1] + offset += 2 + # Best-effort skip: advance past common VLQ-encoded values + if offset < len(payload): + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed + + elif tag == ElementID.START_OF_OBJECT: + offset += 1 + if offset + 4 > len(payload): + break + offset += 4 # RelationId + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed # ClassId + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed # ClassFlags + _, consumed = decode_uint32_vlq(payload, offset) + offset += consumed # AttributeId + + elif tag == ElementID.TERMINATING_OBJECT: + offset += 1 + elif tag == 0x00: + offset += 1 + else: + offset += 1 + + logger.debug("ServerSessionVersion not found in CreateObject response") + + async def _setup_session(self) -> bool: + """Echo ServerSessionVersion back to the PLC via SetMultiVariables. + + Without this step, the PLC rejects all subsequent data operations + with ERROR2 (0x05A9). + + Returns: + True if session setup succeeded. + """ + if self._server_session_version is None: + return False + + payload = bytearray() + payload += struct.pack(">I", self._session_id) + payload += encode_uint32_vlq(1) # Item count + payload += encode_uint32_vlq(1) # Total address field count + payload += encode_uint32_vlq(ObjectId.SERVER_SESSION_VERSION) + payload += encode_uint32_vlq(1) # ItemNumber + payload += bytes([0x00, DataType.UDINT]) + payload += encode_uint32_vlq(self._server_session_version) + payload += bytes([0x00]) # Fill byte + payload += encode_object_qualifier() + payload += struct.pack(">I", 0) # Trailing padding + + try: + resp_payload = await self._send_request(FunctionCode.SET_MULTI_VARIABLES, bytes(payload)) + if len(resp_payload) >= 1: + return_value, _ = decode_uint64_vlq(resp_payload, 0) + if return_value != 0: + logger.warning(f"SetupSession: PLC returned error {return_value}") + return False + else: + logger.info("Session setup completed successfully") + return True + return False + except Exception as e: + logger.warning(f"SetupSession failed: {e}") + return False + async def _delete_session(self) -> None: """Send DeleteObject to close the session.""" seq_num = self._next_sequence_number() diff --git a/snap7/s7commplus/client.py b/snap7/s7commplus/client.py index 44112c9c..a3f32ab4 100644 --- a/snap7/s7commplus/client.py +++ b/snap7/s7commplus/client.py @@ -148,44 +148,13 @@ def connect( logger.info("Performing PLC legitimation (password authentication)") self._connection.authenticate(password) - # Probe S7CommPlus data operations with a minimal request - if not self._probe_s7commplus_data(): - logger.info("S7CommPlus data operations not supported, falling back to legacy S7 protocol") + # Check if S7CommPlus session setup succeeded. If the PLC accepted the + # ServerSessionVersion echo, data operations should work. If it returned + # an error (e.g. ERROR2), fall back to legacy S7 protocol. + if not self._connection.session_setup_ok: + logger.info("S7CommPlus session setup failed, falling back to legacy S7 protocol") self._setup_legacy_fallback() - def _probe_s7commplus_data(self) -> bool: - """Test if the PLC supports S7CommPlus data operations. - - Sends a minimal GetMultiVariables request with zero items. If the PLC - responds with ERROR2 or a non-zero return code, data operations are - not supported. - - Returns: - True if S7CommPlus data operations work. - """ - if self._connection is None: - return False - - try: - # Send a minimal GetMultiVariables with 0 items - payload = struct.pack(">I", 0) + encode_uint32_vlq(0) + encode_uint32_vlq(0) - payload += encode_object_qualifier() - payload += struct.pack(">I", 0) - - response = self._connection.send_request(FunctionCode.GET_MULTI_VARIABLES, payload) - - # Check if we got a valid response (return value = 0) - if len(response) < 1: - return False - return_value, _ = decode_uint64_vlq(response, 0) - if return_value != 0: - logger.debug(f"S7CommPlus probe: PLC returned error {return_value}") - return False - return True - except Exception as e: - logger.debug(f"S7CommPlus probe failed: {e}") - return False - def _setup_legacy_fallback(self) -> None: """Establish a secondary legacy S7 connection for data operations.""" from ..client import Client diff --git a/snap7/s7commplus/connection.py b/snap7/s7commplus/connection.py index 1b0a013e..ab4d1b67 100644 --- a/snap7/s7commplus/connection.py +++ b/snap7/s7commplus/connection.py @@ -112,6 +112,7 @@ def __init__( self._tls_active: bool = False self._connected = False self._server_session_version: Optional[int] = None + self._session_setup_ok: bool = False # V2+ IntegrityId tracking self._integrity_id_read: int = 0 @@ -150,6 +151,11 @@ def integrity_id_write(self) -> int: """Current write IntegrityId counter (V2+).""" return self._integrity_id_write + @property + def session_setup_ok(self) -> bool: + """Whether the session setup (ServerSessionVersion echo) succeeded.""" + return self._session_setup_ok + @property def oms_secret(self) -> Optional[bytes]: """OMS exporter secret from TLS session (for legitimation).""" @@ -197,9 +203,10 @@ def connect( # Step 5: Session setup - echo ServerSessionVersion back to PLC if self._server_session_version is not None: - self._setup_session() + self._session_setup_ok = self._setup_session() else: logger.warning("PLC did not provide ServerSessionVersion - session setup incomplete") + self._session_setup_ok = False # Step 6: Version-specific post-setup if self._protocol_version >= ProtocolVersion.V3: @@ -409,6 +416,7 @@ def disconnect(self) -> None: pass self._connected = False + self._session_setup_ok = False self._tls_active = False self._ssl_socket = None self._oms_secret = None @@ -836,17 +844,20 @@ def _skip_typed_value(self, data: bytes, offset: int, datatype: int, flags: int) # Unknown type - can't skip reliably return offset - def _setup_session(self) -> None: + def _setup_session(self) -> bool: """Send SetMultiVariables to echo ServerSessionVersion back to the PLC. This completes the session handshake by writing the ServerSessionVersion attribute back to the session object. Without this step, the PLC rejects all subsequent data operations with ERROR2 (0x05A9). + Returns: + True if session setup succeeded (return_value == 0). + Reference: thomas-v2/S7CommPlusDriver SetSessionSetupData """ if self._server_session_version is None: - return + return False seq_num = self._next_sequence_number() @@ -913,8 +924,11 @@ def _setup_session(self) -> None: return_value, _ = decode_uint64_vlq(resp_payload, 0) if return_value != 0: logger.warning(f"SetupSession: PLC returned error {return_value}") + return False else: logger.info("Session setup completed successfully") + return True + return False def _delete_session(self) -> None: """Send DeleteObject to close the session."""