-
-
Notifications
You must be signed in to change notification settings - Fork 266
Expand file tree
/
Copy pathconnection.py
More file actions
494 lines (399 loc) · 16.1 KB
/
connection.py
File metadata and controls
494 lines (399 loc) · 16.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
"""
ISO on TCP connection management (RFC 1006).
Implements TPKT (Transport Service on top of TCP) and COTP (Connection Oriented
Transport Protocol) layers for S7 communication.
"""
import socket
import struct
import logging
from enum import IntEnum
from typing import Optional, Type, Union
from types import TracebackType
from .error import S7ConnectionError, S7TimeoutError
class TPDUSize(IntEnum):
"""TPDU sizes per ISO 8073 / RFC 905.
The value is the exponent: actual size = 2^value bytes.
"""
S_128 = 0x07
S_256 = 0x08
S_512 = 0x09
S_1024 = 0x0A
S_2048 = 0x0B
S_4096 = 0x0C
S_8192 = 0x0D
logger = logging.getLogger(__name__)
class ISOTCPConnection:
"""
ISO on TCP connection implementation.
Handles the transport layer for S7 communication including:
- TCP socket management
- TPKT framing (RFC 1006)
- COTP connection setup and data transfer
- PDU size negotiation
"""
# COTP PDU types
COTP_CR = 0xE0 # Connection Request
COTP_CC = 0xD0 # Connection Confirm
COTP_DR = 0x80 # Disconnect Request
COTP_DC = 0xC0 # Disconnect Confirm
COTP_DT = 0xF0 # Data Transfer
COTP_ED = 0x10 # Expedited Data
COTP_AK = 0x60 # Data Acknowledgment
COTP_EA = 0x20 # Expedited Acknowledgment
COTP_RJ = 0x50 # Reject
COTP_ER = 0x70 # Error
# COTP parameter codes (ISO 8073)
COTP_PARAM_PDU_SIZE = 0xC0
COTP_PARAM_CALLING_TSAP = 0xC1
COTP_PARAM_CALLED_TSAP = 0xC2
# S7 routing parameter codes
COTP_PARAM_SUBNET_ID = 0xC6
COTP_PARAM_ROUTING_TSAP = 0xC7
def __init__(
self,
host: str,
port: int = 102,
local_tsap: int = 0x0100,
remote_tsap: Union[int, bytes] = 0x0102,
tpdu_size: TPDUSize = TPDUSize.S_1024,
):
"""
Initialize ISO TCP connection.
Args:
host: Target PLC IP address
port: TCP port (default 102 for S7)
local_tsap: Local Transport Service Access Point
remote_tsap: Remote Transport Service Access Point (int for 2-byte TSAP,
bytes for variable-length TSAP like b"SIMATIC-ROOT-HMI")
tpdu_size: TPDU size to request during COTP negotiation
"""
self.host = host
self.port = port
self.local_tsap = local_tsap
self.remote_tsap = remote_tsap
self.tpdu_size = tpdu_size
self.socket: Optional[socket.socket] = None
self.connected = False
self.pdu_size = 240 # Default PDU size, negotiated during connection
self.timeout = 5.0 # Default timeout in seconds
# Connection parameters
self.src_ref = 0x0001 # Source reference
self.dst_ref = 0x0000 # Destination reference (assigned by peer)
# Routing parameters (set via connect_routed)
self._routing: bool = False
self._subnet_id: int = 0
self._routing_tsap: int = 0
def set_routing(self, subnet_id: int, dest_rack: int, dest_slot: int) -> None:
"""Configure S7 routing parameters for multi-subnet access.
When routing is enabled, the COTP Connection Request includes
additional parameters that instruct the gateway PLC to forward
the connection to a target PLC on another subnet.
Args:
subnet_id: Subnet ID of the target network (2 bytes)
dest_rack: Rack number of the destination PLC
dest_slot: Slot number of the destination PLC
"""
self._routing = True
self._subnet_id = subnet_id & 0xFFFF
# Routing TSAP encodes the final target rack/slot the same way
# as a normal remote TSAP.
self._routing_tsap = 0x0100 | (dest_rack << 5) | dest_slot
def connect(self, timeout: float = 5.0) -> None:
"""
Establish ISO on TCP connection.
Args:
timeout: Connection timeout in seconds
"""
self.timeout = timeout
try:
# Step 1: TCP connection
self._tcp_connect()
# Step 2: ISO connection (COTP handshake)
self._iso_connect()
self.connected = True
logger.info(f"Connected to {self.host}:{self.port}, PDU size: {self.pdu_size}")
except Exception as e:
self.disconnect()
if isinstance(e, (S7ConnectionError, S7TimeoutError)):
raise
else:
raise S7ConnectionError(f"Connection failed: {e}")
def disconnect(self) -> None:
"""Disconnect from S7 device."""
if self.socket:
try:
if self.connected:
# Send COTP disconnect request
self._send_cotp_disconnect()
self.socket.close()
except Exception:
pass # Ignore errors during disconnect
finally:
self.socket = None
self.connected = False
logger.info(f"Disconnected from {self.host}:{self.port}")
def send_data(self, data: bytes) -> None:
"""
Send data over ISO connection.
Args:
data: S7 PDU data to send
"""
if not self.connected or self.socket is None:
raise S7ConnectionError("Not connected")
# Wrap data in COTP Data Transfer PDU
cotp_data = self._build_cotp_dt(data)
# Wrap in TPKT frame
tpkt_frame = self._build_tpkt(cotp_data)
# Send over TCP
try:
self.socket.sendall(tpkt_frame)
logger.debug(f"Sent {len(tpkt_frame)} bytes: {tpkt_frame.hex(' ')}")
except socket.error as e:
self.connected = False
raise S7ConnectionError(f"Send failed: {e}")
def receive_data(self) -> bytes:
"""
Receive data from ISO connection.
Returns:
S7 PDU data
"""
if not self.connected:
raise S7ConnectionError("Not connected")
try:
# Receive TPKT header (4 bytes)
tpkt_header = self._recv_exact(4)
# Parse TPKT header
version, reserved, length = struct.unpack(">BBH", tpkt_header)
if version != 3:
raise S7ConnectionError(f"Invalid TPKT version: {version}")
# Receive remaining data
remaining = length - 4
if remaining <= 0:
raise S7ConnectionError("Invalid TPKT length")
payload = self._recv_exact(remaining)
# Parse COTP header and extract data
logger.debug(f"Received TPKT: version={version} length={length} payload ({len(payload)} bytes): {payload.hex(' ')}")
return self._parse_cotp_data(payload)
except socket.timeout:
self.connected = False
raise S7TimeoutError("Receive timeout")
except socket.error as e:
self.connected = False
raise S7ConnectionError(f"Receive failed: {e}")
def _tcp_connect(self) -> None:
"""Establish TCP connection."""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout)
try:
self.socket.connect((self.host, self.port))
logger.debug(f"TCP connected to {self.host}:{self.port}")
except socket.error as e:
raise S7ConnectionError(f"TCP connection failed: {e}")
def _iso_connect(self) -> None:
"""Establish ISO connection using COTP handshake."""
if self.socket is None:
raise S7ConnectionError("Socket not initialized")
# Send Connection Request
cr_pdu = self._build_cotp_cr()
tpkt_frame = self._build_tpkt(cr_pdu)
self.socket.sendall(tpkt_frame)
logger.debug("Sent COTP Connection Request")
# Receive Connection Confirm
tpkt_header = self._recv_exact(4)
version, reserved, length = struct.unpack(">BBH", tpkt_header)
if version != 3:
raise S7ConnectionError(f"Invalid TPKT version in response: {version}")
payload = self._recv_exact(length - 4)
self._parse_cotp_cc(payload)
logger.debug("Received COTP Connection Confirm")
def _build_tpkt(self, payload: bytes) -> bytes:
"""
Build TPKT frame.
TPKT Header (4 bytes):
- Version (1 byte): Always 3
- Reserved (1 byte): Always 0
- Length (2 bytes): Total frame length including header
"""
length = len(payload) + 4
return struct.pack(">BBH", 3, 0, length) + payload
def _build_cotp_cr(self) -> bytes:
"""
Build COTP Connection Request PDU.
COTP CR format:
- PDU Length: Length of COTP header (excluding this byte)
- PDU Type: 0xE0 (Connection Request)
- Destination Reference: 2 bytes
- Source Reference: 2 bytes
- Class/Option: 1 byte
- Parameters: Variable length
"""
# Basic COTP CR without parameters
base_pdu = struct.pack(
">BBHHB",
6, # PDU length (header without parameters)
self.COTP_CR, # PDU type
0x0000, # Destination reference (0 for CR)
self.src_ref, # Source reference
0x00, # Class/option (Class 0, no extended formats)
)
# Add TSAP parameters
# Calling TSAP (local) - always 2 bytes
calling_tsap = struct.pack(">BBH", self.COTP_PARAM_CALLING_TSAP, 2, self.local_tsap)
# Called TSAP (remote) - can be 2-byte int or variable-length bytes (e.g. "SIMATIC-ROOT-HMI")
if isinstance(self.remote_tsap, bytes):
called_tsap = struct.pack(">BB", self.COTP_PARAM_CALLED_TSAP, len(self.remote_tsap)) + self.remote_tsap
else:
called_tsap = struct.pack(">BBH", self.COTP_PARAM_CALLED_TSAP, 2, self.remote_tsap)
# PDU Size parameter (ISO 8073 code, e.g. 0x0A = 1024 bytes)
pdu_size_param = struct.pack(">BBB", self.COTP_PARAM_PDU_SIZE, 1, self.tpdu_size)
parameters = calling_tsap + called_tsap + pdu_size_param
# Append routing parameters when routing is enabled
if self._routing:
subnet_param = struct.pack(">BBH", self.COTP_PARAM_SUBNET_ID, 2, self._subnet_id)
routing_tsap_param = struct.pack(">BBH", self.COTP_PARAM_ROUTING_TSAP, 2, self._routing_tsap)
parameters += subnet_param + routing_tsap_param
logger.debug(f"COTP CR with routing: subnet={self._subnet_id:#06x}, routing_tsap={self._routing_tsap:#06x}")
# Update PDU length to include parameters
total_length = 6 + len(parameters)
pdu = struct.pack(">B", total_length) + base_pdu[1:] + parameters
return pdu
def _parse_cotp_cc(self, data: bytes) -> None:
"""
Parse COTP Connection Confirm PDU.
Extracts destination reference and negotiated PDU size.
"""
if len(data) < 7:
raise S7ConnectionError("Invalid COTP CC: too short")
pdu_len, pdu_type, dst_ref, src_ref, class_opt = struct.unpack(">BBHHB", data[:7])
if pdu_type != self.COTP_CC:
raise S7ConnectionError(f"Expected COTP CC, got {pdu_type:#02x}")
self.dst_ref = dst_ref
# Parse parameters if present
if len(data) > 7:
self._parse_cotp_parameters(data[7:])
def _parse_cotp_parameters(self, params: bytes) -> None:
"""Parse COTP parameters from Connection Confirm."""
offset = 0
while offset < len(params):
if offset + 2 > len(params):
break
param_code = params[offset]
param_len = params[offset + 1]
if offset + 2 + param_len > len(params):
break
param_data = params[offset + 2 : offset + 2 + param_len]
if param_code == self.COTP_PARAM_PDU_SIZE:
# PDU Size parameter
if param_len == 1:
# ISO 8073 code: size = 2^code
self.pdu_size = 1 << param_data[0]
elif param_len == 2:
# Raw 2-byte value
self.pdu_size = struct.unpack(">H", param_data)[0]
logger.debug(f"Negotiated PDU size: {self.pdu_size}")
else:
logger.debug(f"Unsupported COTP parameter: code={param_code:#04x}, length={param_len}")
offset += 2 + param_len
def _build_cotp_dt(self, data: bytes) -> bytes:
"""
Build COTP Data Transfer PDU.
COTP DT format:
- PDU Length: 2 (fixed for DT)
- PDU Type: 0xF0 (Data Transfer)
- EOT + Number: 0x80 (End of TSDU, sequence number 0)
- Data: Variable length
"""
header = struct.pack(">BBB", 2, self.COTP_DT, 0x80)
return header + data
def _parse_cotp_data(self, cotp_pdu: bytes) -> bytes:
"""
Parse COTP Data Transfer PDU and extract S7 data.
"""
if len(cotp_pdu) < 3:
raise S7ConnectionError("Invalid COTP DT: too short")
pdu_len, pdu_type, eot_num = struct.unpack(">BBB", cotp_pdu[:3])
if pdu_type != self.COTP_DT:
raise S7ConnectionError(f"Expected COTP DT, got {pdu_type:#02x}")
return cotp_pdu[3:] # Return data portion
def _send_cotp_disconnect(self) -> None:
"""Send COTP Disconnect Request."""
if self.socket is None:
return # Nothing to disconnect
dr_pdu = struct.pack(
">BBHHBB",
6, # PDU length
self.COTP_DR, # PDU type
self.dst_ref, # Destination reference
self.src_ref, # Source reference
0x00, # Reason (normal disconnect)
0x00, # Additional info
)
tpkt_frame = self._build_tpkt(dr_pdu)
try:
self.socket.sendall(tpkt_frame)
except socket.error:
pass # Ignore errors during disconnect
def _recv_exact(self, size: int) -> bytes:
"""
Receive exactly the specified number of bytes.
Args:
size: Number of bytes to receive
Returns:
Received data
Raises:
S7ConnectionError: If connection is lost
S7TimeoutError: If timeout occurs
"""
if self.socket is None:
raise S7ConnectionError("Socket not initialized")
data = bytearray()
while len(data) < size:
try:
chunk = self.socket.recv(size - len(data))
if not chunk:
self.connected = False
raise S7ConnectionError("Connection closed by peer")
data.extend(chunk)
except socket.timeout:
self.connected = False
raise S7TimeoutError("Receive timeout")
except socket.error as e:
self.connected = False
raise S7ConnectionError(f"Receive error: {e}")
return bytes(data)
def check_connection(self) -> bool:
"""Check if the TCP connection is still alive.
Uses a non-blocking socket peek to detect broken connections.
"""
if not self.connected or self.socket is None:
return False
try:
original_timeout = self.socket.gettimeout()
self.socket.settimeout(0)
try:
data = self.socket.recv(1, socket.MSG_PEEK)
if not data:
self.connected = False
return False
return True
except BlockingIOError:
# No data available but connection is still alive
return True
except (socket.error, OSError):
self.connected = False
return False
finally:
self.socket.settimeout(original_timeout)
except Exception:
return False
def __enter__(self) -> "ISOTCPConnection":
"""Context manager entry."""
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Context manager exit."""
self.disconnect()