Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ markers =[
"logo",
"mainloop",
"partner",
"routing",
"server",
"util",
"conformance: protocol conformance tests"
Expand Down
70 changes: 70 additions & 0 deletions snap7/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,76 @@ def connect(self, address: str, rack: int, slot: int, tcp_port: int = 102) -> "C

return self

def connect_routed(
self,
host: str,
router_rack: int,
router_slot: int,
subnet: int,
dest_rack: int,
dest_slot: int,
port: int = 102,
timeout: float = 5.0,
) -> "Client":
"""Connect to an S7 PLC via a routing gateway on another subnet.

The gateway PLC (identified by *host*, *router_rack*, *router_slot*)
forwards the connection to the target PLC (identified by *subnet*,
*dest_rack*, *dest_slot*) through S7 routing parameters embedded in
the COTP Connection Request.

Args:
host: IP address of the routing gateway PLC
router_rack: Rack number of the gateway PLC
router_slot: Slot number of the gateway PLC
subnet: Subnet ID of the target network (0x0000-0xFFFF)
dest_rack: Rack number of the destination PLC
dest_slot: Slot number of the destination PLC
port: TCP port (default 102)
timeout: Connection timeout in seconds

Returns:
Self for method chaining
"""
self.host = host
self.port = port
self.rack = router_rack
self.slot = router_slot
self._params[Parameter.RemotePort] = port

# Remote TSAP targets the gateway rack/slot
self.remote_tsap = 0x0100 | (router_rack << 5) | router_slot

try:
start_time = time.time()

self.connection = ISOTCPConnection(
host=host,
port=port,
local_tsap=self.local_tsap,
remote_tsap=self.remote_tsap,
)
self.connection.set_routing(subnet, dest_rack, dest_slot)
self.connection.connect(timeout=timeout)

# Setup communication and negotiate PDU length
self._setup_communication()

self.connected = True
self._exec_time = int((time.time() - start_time) * 1000)
logger.info(
f"Connected (routed) to {host}:{port} via rack {router_rack} slot {router_slot}, "
f"subnet {subnet:#06x} -> rack {dest_rack} slot {dest_slot}"
)
except Exception as e:
self.disconnect()
if isinstance(e, S7Error):
raise
else:
raise S7ConnectionError(f"Routed connection failed: {e}")

return self

def disconnect(self) -> int:
"""Disconnect from S7 PLC.

Expand Down
34 changes: 34 additions & 0 deletions snap7/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class ISOTCPConnection:
COTP_PARAM_CALLING_TSAP = 0xC1
COTP_PARAM_CALLED_TSAP = 0xC2

# S7 routing parameter codes
COTP_PARAM_SUBNET_ID = 0xC6
COTP_PARAM_ROUTING_TSAP = 0xC7

def __init__(
self,
host: str,
Expand Down Expand Up @@ -94,6 +98,29 @@ def __init__(
self.src_ref = 0x0001 # Source reference
self.dst_ref = 0x0000 # Destination reference (assigned by peer)

# Routing parameters (set via connect_routed)
self._routing: bool = False
self._subnet_id: int = 0
self._routing_tsap: int = 0

def set_routing(self, subnet_id: int, dest_rack: int, dest_slot: int) -> None:
"""Configure S7 routing parameters for multi-subnet access.

When routing is enabled, the COTP Connection Request includes
additional parameters that instruct the gateway PLC to forward
the connection to a target PLC on another subnet.

Args:
subnet_id: Subnet ID of the target network (2 bytes)
dest_rack: Rack number of the destination PLC
dest_slot: Slot number of the destination PLC
"""
self._routing = True
self._subnet_id = subnet_id & 0xFFFF
# Routing TSAP encodes the final target rack/slot the same way
# as a normal remote TSAP.
self._routing_tsap = 0x0100 | (dest_rack << 5) | dest_slot

def connect(self, timeout: float = 5.0) -> None:
"""
Establish ISO on TCP connection.
Expand Down Expand Up @@ -279,6 +306,13 @@ def _build_cotp_cr(self) -> bytes:

parameters = calling_tsap + called_tsap + pdu_size_param

# Append routing parameters when routing is enabled
if self._routing:
subnet_param = struct.pack(">BBH", self.COTP_PARAM_SUBNET_ID, 2, self._subnet_id)
routing_tsap_param = struct.pack(">BBH", self.COTP_PARAM_ROUTING_TSAP, 2, self._routing_tsap)
parameters += subnet_param + routing_tsap_param
logger.debug(f"COTP CR with routing: subnet={self._subnet_id:#06x}, routing_tsap={self._routing_tsap:#06x}")

# Update PDU length to include parameters
total_length = 6 + len(parameters)
pdu = struct.pack(">B", total_length) + base_pdu[1:] + parameters
Expand Down
225 changes: 225 additions & 0 deletions tests/test_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""Tests for S7 routing support (multi-subnet PLC access)."""

import struct

import pytest

from snap7.connection import ISOTCPConnection
from snap7.client import Client

# Use a unique port to avoid conflicts with other test suites
ROUTING_TEST_PORT = 11102


@pytest.mark.routing
class TestRoutingTSAP:
"""Test TSAP construction for routed connections."""

def test_remote_tsap_encodes_rack_slot(self) -> None:
"""Remote TSAP should encode rack and slot per S7 spec."""
rack, slot = 0, 2
expected = 0x0100 | (rack << 5) | slot # 0x0102
conn = ISOTCPConnection("127.0.0.1", remote_tsap=expected)
assert conn.remote_tsap == 0x0102

def test_routing_tsap_encodes_dest_rack_slot(self) -> None:
"""Routing TSAP should encode destination rack/slot."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=3)
assert conn._routing_tsap == 0x0100 | (0 << 5) | 3 # 0x0103

def test_routing_tsap_higher_rack(self) -> None:
"""Routing TSAP with rack=2, slot=1."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x0002, dest_rack=2, dest_slot=1)
assert conn._routing_tsap == 0x0100 | (2 << 5) | 1 # 0x0141


@pytest.mark.routing
class TestCOTPCRRouting:
"""Test COTP Connection Request PDU generation with routing."""

def _parse_cotp_cr(self, pdu: bytes) -> dict[str, object]:
"""Parse a COTP CR PDU into its components for inspection."""
result: dict[str, object] = {}
pdu_len = pdu[0]
result["pdu_len"] = pdu_len
result["pdu_type"] = pdu[1]
result["dst_ref"] = struct.unpack(">H", pdu[2:4])[0]
result["src_ref"] = struct.unpack(">H", pdu[4:6])[0]
result["class_opt"] = pdu[6]

# Parse variable-part parameters
params: dict[int, bytes] = {}
offset = 7
while offset < len(pdu):
if offset + 2 > len(pdu):
break
code = pdu[offset]
length = pdu[offset + 1]
data = pdu[offset + 2 : offset + 2 + length]
params[code] = data
offset += 2 + length

result["params"] = params
return result

def test_standard_cr_has_no_routing_params(self) -> None:
"""A non-routed CR should not contain routing parameters."""
conn = ISOTCPConnection("127.0.0.1")
pdu = conn._build_cotp_cr()
parsed = self._parse_cotp_cr(pdu)
params = parsed["params"]
assert isinstance(params, dict)
assert ISOTCPConnection.COTP_PARAM_SUBNET_ID not in params
assert ISOTCPConnection.COTP_PARAM_ROUTING_TSAP not in params

def test_routed_cr_contains_subnet_param(self) -> None:
"""A routed CR must include the subnet ID parameter (0xC6)."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=2)
pdu = conn._build_cotp_cr()
parsed = self._parse_cotp_cr(pdu)
params = parsed["params"]
assert isinstance(params, dict)
assert ISOTCPConnection.COTP_PARAM_SUBNET_ID in params
subnet_data = params[ISOTCPConnection.COTP_PARAM_SUBNET_ID]
assert struct.unpack(">H", subnet_data)[0] == 0x0001

def test_routed_cr_contains_routing_tsap(self) -> None:
"""A routed CR must include the routing TSAP parameter (0xC7)."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=2)
pdu = conn._build_cotp_cr()
parsed = self._parse_cotp_cr(pdu)
params = parsed["params"]
assert isinstance(params, dict)
assert ISOTCPConnection.COTP_PARAM_ROUTING_TSAP in params
tsap_data = params[ISOTCPConnection.COTP_PARAM_ROUTING_TSAP]
expected_tsap = 0x0100 | (0 << 5) | 2
assert struct.unpack(">H", tsap_data)[0] == expected_tsap

def test_routed_cr_pdu_length_is_consistent(self) -> None:
"""The PDU length byte must equal len(pdu) - 1."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x00FF, dest_rack=1, dest_slot=1)
pdu = conn._build_cotp_cr()
# The first byte is the length of the rest of the PDU
assert pdu[0] == len(pdu) - 1

def test_standard_cr_pdu_length_is_consistent(self) -> None:
"""Non-routed PDU length byte must also be consistent."""
conn = ISOTCPConnection("127.0.0.1")
pdu = conn._build_cotp_cr()
assert pdu[0] == len(pdu) - 1

def test_routed_cr_still_has_standard_params(self) -> None:
"""Routing should not remove the standard TSAP / PDU size params."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=3)
pdu = conn._build_cotp_cr()
parsed = self._parse_cotp_cr(pdu)
params = parsed["params"]
assert isinstance(params, dict)
assert ISOTCPConnection.COTP_PARAM_CALLING_TSAP in params
assert ISOTCPConnection.COTP_PARAM_CALLED_TSAP in params
assert ISOTCPConnection.COTP_PARAM_PDU_SIZE in params


@pytest.mark.routing
class TestRoutedFrameValidity:
"""Test that routed connections produce valid protocol frames."""

def test_routed_cr_wrapped_in_tpkt(self) -> None:
"""A routed CR wrapped in TPKT should have correct TPKT header."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x0005, dest_rack=0, dest_slot=1)
cr_pdu = conn._build_cotp_cr()
tpkt = conn._build_tpkt(cr_pdu)

# TPKT header: version=3, reserved=0, length=total
assert tpkt[0] == 3
assert tpkt[1] == 0
total_len = struct.unpack(">H", tpkt[2:4])[0]
assert total_len == len(tpkt)
assert tpkt[4:] == cr_pdu

def test_subnet_id_truncated_to_16_bits(self) -> None:
"""Subnet IDs larger than 16 bits should be masked."""
conn = ISOTCPConnection("127.0.0.1")
conn.set_routing(subnet_id=0x1FFFF, dest_rack=0, dest_slot=1)
# 0x1FFFF & 0xFFFF == 0xFFFF
assert conn._subnet_id == 0xFFFF


@pytest.mark.routing
@pytest.mark.server
class TestClientConnectRouted:
"""Test Client.connect_routed against the built-in server."""

def test_connect_routed_to_server(self) -> None:
"""Client.connect_routed should negotiate PDU with a local server.

The server does not validate routing parameters in the COTP CR,
so the connection handshake should succeed.
"""
from snap7.server import Server
from snap7.type import SrvArea
from ctypes import c_char

server = Server()
size = 100
db_data = bytearray(size)
db_array = (c_char * size).from_buffer(db_data)
server.register_area(SrvArea.DB, 1, db_array)
server.start(tcp_port=ROUTING_TEST_PORT)

try:
client = Client()
client.connect_routed(
host="127.0.0.1",
router_rack=0,
router_slot=2,
subnet=0x0001,
dest_rack=0,
dest_slot=3,
port=ROUTING_TEST_PORT,
)
assert client.get_connected()

# Verify we can do a basic read through the routed connection
data = client.db_read(1, 0, 10)
assert len(data) == 10

client.disconnect()
finally:
server.stop()

def test_connect_routed_returns_self(self) -> None:
"""connect_routed should return self for method chaining."""
from snap7.server import Server
from snap7.type import SrvArea
from ctypes import c_char

server = Server()
size = 10
db_data = bytearray(size)
db_array = (c_char * size).from_buffer(db_data)
server.register_area(SrvArea.DB, 1, db_array)
server.start(tcp_port=ROUTING_TEST_PORT + 1)

try:
client = Client()
result = client.connect_routed(
host="127.0.0.1",
router_rack=0,
router_slot=2,
subnet=0x0002,
dest_rack=0,
dest_slot=1,
port=ROUTING_TEST_PORT + 1,
)
assert result is client
client.disconnect()
finally:
server.stop()
Loading