From a3850c9d8970ba7bc0f745c03570fb64b45af8be Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Thu, 19 Mar 2026 17:00:45 +0200 Subject: [PATCH 1/2] Add S7 routing support for multi-subnet PLC access Implement routing parameters in the COTP Connection Request PDU so clients can reach PLCs behind a gateway on another subnet. The new ISOTCPConnection.set_routing() method appends subnet ID (0xC6) and routing TSAP (0xC7) parameters to the CR, and Client.connect_routed() provides a high-level entry point that mirrors connect() but accepts gateway and destination rack/slot/subnet. Closes #615 Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 1 + snap7/client.py | 70 +++++++++++++ snap7/connection.py | 37 +++++++ tests/test_routing.py | 225 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 tests/test_routing.py diff --git a/pyproject.toml b/pyproject.toml index b865de58..78f16850 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ markers =[ "logo", "mainloop", "partner", + "routing", "server", "util", "conformance: protocol conformance tests" diff --git a/snap7/client.py b/snap7/client.py index 234e3eb3..23cf1c33 100644 --- a/snap7/client.py +++ b/snap7/client.py @@ -401,6 +401,76 @@ def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "C return self + def connect_routed( + self, + host: str, + router_rack: int, + router_slot: int, + subnet: int, + dest_rack: int, + dest_slot: int, + port: int = 102, + timeout: float = 5.0, + ) -> "Client": + """Connect to an S7 PLC via a routing gateway on another subnet. + + The gateway PLC (identified by *host*, *router_rack*, *router_slot*) + forwards the connection to the target PLC (identified by *subnet*, + *dest_rack*, *dest_slot*) through S7 routing parameters embedded in + the COTP Connection Request. + + Args: + host: IP address of the routing gateway PLC + router_rack: Rack number of the gateway PLC + router_slot: Slot number of the gateway PLC + subnet: Subnet ID of the target network (0x0000-0xFFFF) + dest_rack: Rack number of the destination PLC + dest_slot: Slot number of the destination PLC + port: TCP port (default 102) + timeout: Connection timeout in seconds + + Returns: + Self for method chaining + """ + self.host = host + self.port = port + self.rack = router_rack + self.slot = router_slot + self._params[Parameter.RemotePort] = port + + # Remote TSAP targets the gateway rack/slot + self.remote_tsap = 0x0100 | (router_rack << 5) | router_slot + + try: + start_time = time.time() + + self.connection = ISOTCPConnection( + host=host, + port=port, + local_tsap=self.local_tsap, + remote_tsap=self.remote_tsap, + ) + self.connection.set_routing(subnet, dest_rack, dest_slot) + self.connection.connect(timeout=timeout) + + # Setup communication and negotiate PDU length + self._setup_communication() + + self.connected = True + self._exec_time = int((time.time() - start_time) * 1000) + logger.info( + f"Connected (routed) to {host}:{port} via rack {router_rack} slot {router_slot}, " + f"subnet {subnet:#06x} -> rack {dest_rack} slot {dest_slot}" + ) + except Exception as e: + self.disconnect() + if isinstance(e, S7Error): + raise + else: + raise S7ConnectionError(f"Routed connection failed: {e}") + + return self + def disconnect(self) -> int: """Disconnect from S7 PLC. diff --git a/snap7/connection.py b/snap7/connection.py index 466125ff..730da93d 100644 --- a/snap7/connection.py +++ b/snap7/connection.py @@ -61,6 +61,10 @@ class ISOTCPConnection: COTP_PARAM_CALLING_TSAP = 0xC1 COTP_PARAM_CALLED_TSAP = 0xC2 + # S7 routing parameter codes + COTP_PARAM_SUBNET_ID = 0xC6 + COTP_PARAM_ROUTING_TSAP = 0xC7 + def __init__( self, host: str, @@ -94,6 +98,29 @@ def __init__( self.src_ref = 0x0001 # Source reference self.dst_ref = 0x0000 # Destination reference (assigned by peer) + # Routing parameters (set via connect_routed) + self._routing: bool = False + self._subnet_id: int = 0 + self._routing_tsap: int = 0 + + def set_routing(self, subnet_id: int, dest_rack: int, dest_slot: int) -> None: + """Configure S7 routing parameters for multi-subnet access. + + When routing is enabled, the COTP Connection Request includes + additional parameters that instruct the gateway PLC to forward + the connection to a target PLC on another subnet. + + Args: + subnet_id: Subnet ID of the target network (2 bytes) + dest_rack: Rack number of the destination PLC + dest_slot: Slot number of the destination PLC + """ + self._routing = True + self._subnet_id = subnet_id & 0xFFFF + # Routing TSAP encodes the final target rack/slot the same way + # as a normal remote TSAP. + self._routing_tsap = 0x0100 | (dest_rack << 5) | dest_slot + def connect(self, timeout: float = 5.0) -> None: """ Establish ISO on TCP connection. @@ -279,6 +306,16 @@ def _build_cotp_cr(self) -> bytes: parameters = calling_tsap + called_tsap + pdu_size_param + # Append routing parameters when routing is enabled + if self._routing: + subnet_param = struct.pack(">BBH", self.COTP_PARAM_SUBNET_ID, 2, self._subnet_id) + routing_tsap_param = struct.pack(">BBH", self.COTP_PARAM_ROUTING_TSAP, 2, self._routing_tsap) + parameters += subnet_param + routing_tsap_param + logger.debug( + f"COTP CR with routing: subnet={self._subnet_id:#06x}, " + f"routing_tsap={self._routing_tsap:#06x}" + ) + # Update PDU length to include parameters total_length = 6 + len(parameters) pdu = struct.pack(">B", total_length) + base_pdu[1:] + parameters diff --git a/tests/test_routing.py b/tests/test_routing.py new file mode 100644 index 00000000..123fa8c5 --- /dev/null +++ b/tests/test_routing.py @@ -0,0 +1,225 @@ +"""Tests for S7 routing support (multi-subnet PLC access).""" + +import struct + +import pytest + +from snap7.connection import ISOTCPConnection +from snap7.client import Client + +# Use a unique port to avoid conflicts with other test suites +ROUTING_TEST_PORT = 11102 + + +@pytest.mark.routing +class TestRoutingTSAP: + """Test TSAP construction for routed connections.""" + + def test_remote_tsap_encodes_rack_slot(self) -> None: + """Remote TSAP should encode rack and slot per S7 spec.""" + rack, slot = 0, 2 + expected = 0x0100 | (rack << 5) | slot # 0x0102 + conn = ISOTCPConnection("127.0.0.1", remote_tsap=expected) + assert conn.remote_tsap == 0x0102 + + def test_routing_tsap_encodes_dest_rack_slot(self) -> None: + """Routing TSAP should encode destination rack/slot.""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=3) + assert conn._routing_tsap == 0x0100 | (0 << 5) | 3 # 0x0103 + + def test_routing_tsap_higher_rack(self) -> None: + """Routing TSAP with rack=2, slot=1.""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x0002, dest_rack=2, dest_slot=1) + assert conn._routing_tsap == 0x0100 | (2 << 5) | 1 # 0x0141 + + +@pytest.mark.routing +class TestCOTPCRRouting: + """Test COTP Connection Request PDU generation with routing.""" + + def _parse_cotp_cr(self, pdu: bytes) -> dict[str, object]: + """Parse a COTP CR PDU into its components for inspection.""" + result: dict[str, object] = {} + pdu_len = pdu[0] + result["pdu_len"] = pdu_len + result["pdu_type"] = pdu[1] + result["dst_ref"] = struct.unpack(">H", pdu[2:4])[0] + result["src_ref"] = struct.unpack(">H", pdu[4:6])[0] + result["class_opt"] = pdu[6] + + # Parse variable-part parameters + params: dict[int, bytes] = {} + offset = 7 + while offset < len(pdu): + if offset + 2 > len(pdu): + break + code = pdu[offset] + length = pdu[offset + 1] + data = pdu[offset + 2 : offset + 2 + length] + params[code] = data + offset += 2 + length + + result["params"] = params + return result + + def test_standard_cr_has_no_routing_params(self) -> None: + """A non-routed CR should not contain routing parameters.""" + conn = ISOTCPConnection("127.0.0.1") + pdu = conn._build_cotp_cr() + parsed = self._parse_cotp_cr(pdu) + params = parsed["params"] + assert isinstance(params, dict) + assert ISOTCPConnection.COTP_PARAM_SUBNET_ID not in params + assert ISOTCPConnection.COTP_PARAM_ROUTING_TSAP not in params + + def test_routed_cr_contains_subnet_param(self) -> None: + """A routed CR must include the subnet ID parameter (0xC6).""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=2) + pdu = conn._build_cotp_cr() + parsed = self._parse_cotp_cr(pdu) + params = parsed["params"] + assert isinstance(params, dict) + assert ISOTCPConnection.COTP_PARAM_SUBNET_ID in params + subnet_data = params[ISOTCPConnection.COTP_PARAM_SUBNET_ID] + assert struct.unpack(">H", subnet_data)[0] == 0x0001 + + def test_routed_cr_contains_routing_tsap(self) -> None: + """A routed CR must include the routing TSAP parameter (0xC7).""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=2) + pdu = conn._build_cotp_cr() + parsed = self._parse_cotp_cr(pdu) + params = parsed["params"] + assert isinstance(params, dict) + assert ISOTCPConnection.COTP_PARAM_ROUTING_TSAP in params + tsap_data = params[ISOTCPConnection.COTP_PARAM_ROUTING_TSAP] + expected_tsap = 0x0100 | (0 << 5) | 2 + assert struct.unpack(">H", tsap_data)[0] == expected_tsap + + def test_routed_cr_pdu_length_is_consistent(self) -> None: + """The PDU length byte must equal len(pdu) - 1.""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x00FF, dest_rack=1, dest_slot=1) + pdu = conn._build_cotp_cr() + # The first byte is the length of the rest of the PDU + assert pdu[0] == len(pdu) - 1 + + def test_standard_cr_pdu_length_is_consistent(self) -> None: + """Non-routed PDU length byte must also be consistent.""" + conn = ISOTCPConnection("127.0.0.1") + pdu = conn._build_cotp_cr() + assert pdu[0] == len(pdu) - 1 + + def test_routed_cr_still_has_standard_params(self) -> None: + """Routing should not remove the standard TSAP / PDU size params.""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=3) + pdu = conn._build_cotp_cr() + parsed = self._parse_cotp_cr(pdu) + params = parsed["params"] + assert isinstance(params, dict) + assert ISOTCPConnection.COTP_PARAM_CALLING_TSAP in params + assert ISOTCPConnection.COTP_PARAM_CALLED_TSAP in params + assert ISOTCPConnection.COTP_PARAM_PDU_SIZE in params + + +@pytest.mark.routing +class TestRoutedFrameValidity: + """Test that routed connections produce valid protocol frames.""" + + def test_routed_cr_wrapped_in_tpkt(self) -> None: + """A routed CR wrapped in TPKT should have correct TPKT header.""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x0005, dest_rack=0, dest_slot=1) + cr_pdu = conn._build_cotp_cr() + tpkt = conn._build_tpkt(cr_pdu) + + # TPKT header: version=3, reserved=0, length=total + assert tpkt[0] == 3 + assert tpkt[1] == 0 + total_len = struct.unpack(">H", tpkt[2:4])[0] + assert total_len == len(tpkt) + assert tpkt[4:] == cr_pdu + + def test_subnet_id_truncated_to_16_bits(self) -> None: + """Subnet IDs larger than 16 bits should be masked.""" + conn = ISOTCPConnection("127.0.0.1") + conn.set_routing(subnet_id=0x1FFFF, dest_rack=0, dest_slot=1) + # 0x1FFFF & 0xFFFF == 0xFFFF + assert conn._subnet_id == 0xFFFF + + +@pytest.mark.routing +@pytest.mark.server +class TestClientConnectRouted: + """Test Client.connect_routed against the built-in server.""" + + def test_connect_routed_to_server(self) -> None: + """Client.connect_routed should negotiate PDU with a local server. + + The server does not validate routing parameters in the COTP CR, + so the connection handshake should succeed. + """ + from snap7.server import Server + from snap7.type import SrvArea + from ctypes import c_char + + server = Server() + size = 100 + db_data = bytearray(size) + db_array = (c_char * size).from_buffer(db_data) + server.register_area(SrvArea.DB, 1, db_array) + server.start(tcp_port=ROUTING_TEST_PORT) + + try: + client = Client() + client.connect_routed( + host="127.0.0.1", + router_rack=0, + router_slot=2, + subnet=0x0001, + dest_rack=0, + dest_slot=3, + port=ROUTING_TEST_PORT, + ) + assert client.get_connected() + + # Verify we can do a basic read through the routed connection + data = client.db_read(1, 0, 10) + assert len(data) == 10 + + client.disconnect() + finally: + server.stop() + + def test_connect_routed_returns_self(self) -> None: + """connect_routed should return self for method chaining.""" + from snap7.server import Server + from snap7.type import SrvArea + from ctypes import c_char + + server = Server() + size = 10 + db_data = bytearray(size) + db_array = (c_char * size).from_buffer(db_data) + server.register_area(SrvArea.DB, 1, db_array) + server.start(tcp_port=ROUTING_TEST_PORT + 1) + + try: + client = Client() + result = client.connect_routed( + host="127.0.0.1", + router_rack=0, + router_slot=2, + subnet=0x0002, + dest_rack=0, + dest_slot=1, + port=ROUTING_TEST_PORT + 1, + ) + assert result is client + client.disconnect() + finally: + server.stop() From 0fc7a606cd47b47037b63f9bb1a36f38e53ca5ec Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Thu, 19 Mar 2026 17:26:30 +0200 Subject: [PATCH 2/2] Fix ruff format for routing debug log line Co-Authored-By: Claude Opus 4.6 --- snap7/connection.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/snap7/connection.py b/snap7/connection.py index 730da93d..bd06dc20 100644 --- a/snap7/connection.py +++ b/snap7/connection.py @@ -311,10 +311,7 @@ def _build_cotp_cr(self) -> bytes: subnet_param = struct.pack(">BBH", self.COTP_PARAM_SUBNET_ID, 2, self._subnet_id) routing_tsap_param = struct.pack(">BBH", self.COTP_PARAM_ROUTING_TSAP, 2, self._routing_tsap) parameters += subnet_param + routing_tsap_param - logger.debug( - f"COTP CR with routing: subnet={self._subnet_id:#06x}, " - f"routing_tsap={self._routing_tsap:#06x}" - ) + logger.debug(f"COTP CR with routing: subnet={self._subnet_id:#06x}, routing_tsap={self._routing_tsap:#06x}") # Update PDU length to include parameters total_length = 6 + len(parameters)