-
-
Notifications
You must be signed in to change notification settings - Fork 266
Expand file tree
/
Copy pathasync_client.py
More file actions
246 lines (200 loc) · 7.92 KB
/
async_client.py
File metadata and controls
246 lines (200 loc) · 7.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""Unified async S7 client with protocol auto-discovery.
Provides a single async client that automatically selects the best protocol
(S7CommPlus or legacy S7) for communicating with Siemens S7 PLCs.
Usage::
from s7 import AsyncClient
async with AsyncClient() as client:
await client.connect("192.168.1.10", 0, 1)
data = await client.db_read(1, 0, 4)
"""
import logging
from typing import Any, Optional
from snap7.async_client import AsyncClient as LegacyAsyncClient
from snap7.s7commplus.async_client import S7CommPlusAsyncClient
from ._protocol import Protocol
logger = logging.getLogger(__name__)
class AsyncClient:
"""Unified async S7 client with protocol auto-discovery.
Async counterpart of :class:`s7.Client`. Automatically selects the
best protocol for the target PLC using asyncio for non-blocking I/O.
Methods not explicitly defined are delegated to the underlying
legacy async client via ``__getattr__``.
Examples::
from s7 import AsyncClient
async with AsyncClient() as client:
await client.connect("192.168.1.10", 0, 1)
data = await client.db_read(1, 0, 4)
print(client.protocol)
"""
def __init__(self) -> None:
self._legacy: Optional[LegacyAsyncClient] = None
self._plus: Optional[S7CommPlusAsyncClient] = None
self._protocol: Protocol = Protocol.AUTO
self._host: str = ""
self._port: int = 102
self._rack: int = 0
self._slot: int = 1
@property
def protocol(self) -> Protocol:
"""The protocol currently in use for DB operations."""
return self._protocol
@property
def connected(self) -> bool:
"""Whether the client is connected to a PLC."""
if self._legacy is not None and self._legacy.connected:
return True
if self._plus is not None and self._plus.connected:
return True
return False
async def connect(
self,
address: str,
rack: int = 0,
slot: int = 1,
tcp_port: int = 102,
*,
protocol: Protocol = Protocol.AUTO,
) -> "AsyncClient":
"""Connect to an S7 PLC.
Args:
address: PLC IP address or hostname.
rack: PLC rack number.
slot: PLC slot number.
tcp_port: TCP port (default 102).
protocol: Protocol selection. AUTO tries S7CommPlus first,
then falls back to legacy S7.
Returns:
self, for method chaining.
"""
self._host = address
self._port = tcp_port
self._rack = rack
self._slot = slot
if protocol in (Protocol.AUTO, Protocol.S7COMMPLUS):
if await self._try_s7commplus(address, tcp_port, rack, slot):
self._protocol = Protocol.S7COMMPLUS
logger.info(f"Async connected to {address}:{tcp_port} using S7CommPlus")
else:
if protocol == Protocol.S7COMMPLUS:
raise RuntimeError(
f"S7CommPlus connection to {address}:{tcp_port} failed and protocol=S7COMMPLUS was explicitly requested"
)
self._protocol = Protocol.LEGACY
logger.info(f"S7CommPlus not available, using legacy S7 for {address}:{tcp_port}")
else:
self._protocol = Protocol.LEGACY
# Always connect legacy client
self._legacy = LegacyAsyncClient()
await self._legacy.connect(address, rack, slot, tcp_port)
logger.info(f"Async legacy S7 connected to {address}:{tcp_port}")
return self
async def _try_s7commplus(
self,
address: str,
tcp_port: int,
rack: int,
slot: int,
) -> bool:
"""Attempt async S7CommPlus connection and probe data operations."""
plus = S7CommPlusAsyncClient()
try:
await plus.connect(host=address, port=tcp_port, rack=rack, slot=slot)
except Exception as e:
logger.debug(f"Async S7CommPlus connection failed: {e}")
return False
if plus.using_legacy_fallback:
logger.debug("S7CommPlus connected but data ops not supported, disconnecting")
try:
await plus.disconnect()
except Exception:
pass
return False
self._plus = plus
return True
async def disconnect(self) -> int:
"""Disconnect from the PLC.
Returns:
0 on success.
"""
if self._plus is not None:
try:
await self._plus.disconnect()
except Exception:
pass
self._plus = None
if self._legacy is not None:
try:
await self._legacy.disconnect()
except Exception:
pass
self._legacy = None
self._protocol = Protocol.AUTO
return 0
async def db_read(self, db_number: int, start: int, size: int) -> bytearray:
"""Read data from a data block.
Args:
db_number: DB number to read from.
start: Start byte offset.
size: Number of bytes to read.
Returns:
Data read from the DB.
"""
if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
return bytearray(await self._plus.db_read(db_number, start, size))
if self._legacy is not None:
return await self._legacy.db_read(db_number, start, size)
raise RuntimeError("Not connected")
async def db_write(self, db_number: int, start: int, data: bytearray | bytes) -> int:
"""Write data to a data block.
Args:
db_number: DB number to write to.
start: Start byte offset.
data: Data to write.
Returns:
0 on success.
"""
if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
await self._plus.db_write(db_number, start, bytes(data))
return 0
if self._legacy is not None:
return await self._legacy.db_write(db_number, start, bytearray(data))
raise RuntimeError("Not connected")
async def db_read_multi(self, items: list[tuple[int, int, int]]) -> list[bytearray]:
"""Read multiple data block regions.
Args:
items: List of (db_number, start_offset, size) tuples.
Returns:
List of data for each item.
"""
if self._protocol == Protocol.S7COMMPLUS and self._plus is not None:
return [bytearray(r) for r in await self._plus.db_read_multi(items)]
if self._legacy is not None:
results = []
for db, start, size in items:
results.append(await self._legacy.db_read(db, start, size))
return results
raise RuntimeError("Not connected")
async def explore(self) -> bytes:
"""Browse the PLC object tree (S7CommPlus only).
Returns:
Raw response payload.
"""
if self._plus is not None:
return await self._plus.explore()
raise RuntimeError("explore() requires S7CommPlus protocol")
def __getattr__(self, name: str) -> Any:
"""Delegate unknown attributes to the legacy async client."""
if name.startswith("_"):
raise AttributeError(name)
legacy = self.__dict__.get("_legacy")
if legacy is not None:
return getattr(legacy, name)
raise AttributeError(f"'{type(self).__name__}' has no attribute '{name}'")
async def __aenter__(self) -> "AsyncClient":
return self
async def __aexit__(self, *args: Any) -> None:
await self.disconnect()
def __repr__(self) -> str:
status = "connected" if self.connected else "disconnected"
proto = self._protocol.value if self.connected else "none"
return f"<s7.AsyncClient {self._host}:{self._port} {status} protocol={proto}>"