-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathcommands.py
More file actions
400 lines (321 loc) · 14.8 KB
/
commands.py
File metadata and controls
400 lines (321 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# Need to figure out how to resolve the 'Untyped decorator makes function "..." untyped' errors in mypy when using click decorators
# mypy: disable-error-code="misc"
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import TYPE_CHECKING, cast
import click
from pyomnilogic_local.cli import ensure_connection
from pyomnilogic_local.cli.pcap_utils import parse_pcap_file, process_pcap_messages
from pyomnilogic_local.cli.utils import async_get_filter_diagnostics
if TYPE_CHECKING:
from pyomnilogic_local.api.api import OmniLogicAPI
from pyomnilogic_local.models.telemetry import TelemetryChlorinator
@click.group()
@click.option("--raw/--no-raw", default=False, help="Output the raw XML from the OmniLogic, do not parse the response")
@click.pass_context
def debug(ctx: click.Context, raw: bool) -> None:
"""Debug commands for low-level controller access.
These commands provide direct access to controller data and debugging utilities
including configuration, telemetry, diagnostics, and PCAP file analysis.
"""
ctx.ensure_object(dict)
ctx.obj["RAW"] = raw
# Don't connect yet - parse_pcap doesn't need it, others will call ensure_connection individually
@debug.command()
@click.pass_context
def get_mspconfig(ctx: click.Context) -> None:
"""Retrieve the MSP configuration from the controller.
The MSP configuration contains all pool equipment definitions, system IDs,
and configuration parameters. Use --raw to see the unprocessed XML.
Example:
omnilogic debug get-mspconfig
omnilogic debug --raw get-mspconfig
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
mspconfig = asyncio.run(omni.async_get_mspconfig(raw=ctx.obj["RAW"]))
click.echo(mspconfig)
@debug.command()
@click.pass_context
def get_telemetry(ctx: click.Context) -> None:
"""Retrieve current telemetry data from the controller.
Telemetry includes real-time sensor readings, equipment states, temperatures,
and other operational data. Use --raw to see the unprocessed XML.
Example:
omnilogic debug get-telemetry
omnilogic debug --raw get-telemetry
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
telemetry = asyncio.run(omni.async_get_telemetry(raw=ctx.obj["RAW"]))
click.echo(telemetry)
@debug.command()
@click.option(
"--pool-id", required=True, type=int, help="System ID of the Body Of Water the filter is associated with. Example: --pool-id 1"
)
@click.option("--filter-id", required=True, type=int, help="System ID of the filter to request diagnostics for. Example: --filter-id 5")
@click.pass_context
def get_filter_diagnostics(ctx: click.Context, pool_id: int, filter_id: int) -> None:
"""Get diagnostic information for a specific filter/pump.
This command retrieves detailed diagnostic data including firmware versions,
power consumption, and error status for a filter or pump.
Example:
omnilogic debug get-filter-diagnostics --pool-id 1 --filter-id 5
"""
ensure_connection(ctx)
filter_diags = asyncio.run(async_get_filter_diagnostics(ctx.obj["OMNI"], pool_id, filter_id, ctx.obj["RAW"]))
if ctx.obj["RAW"]:
click.echo(filter_diags)
else:
drv1 = chr(filter_diags.get_param_by_name("DriveFWRevisionB1"))
drv2 = chr(filter_diags.get_param_by_name("DriveFWRevisionB2"))
drv3 = chr(filter_diags.get_param_by_name("DriveFWRevisionB3"))
drv4 = chr(filter_diags.get_param_by_name("DriveFWRevisionB4"))
dfw1 = chr(filter_diags.get_param_by_name("DisplayFWRevisionB1"))
dfw2 = chr(filter_diags.get_param_by_name("DisplayFWRevisionB2"))
dfw3 = chr(filter_diags.get_param_by_name("DisplayFWRevisionB3"))
dfw4 = chr(filter_diags.get_param_by_name("DisplayFWRevisionB4"))
pow1 = filter_diags.get_param_by_name("PowerMSB")
pow2 = filter_diags.get_param_by_name("PowerLSB")
errs = filter_diags.get_param_by_name("ErrorStatus")
click.echo(
f"DRIVE FW REV: {drv1}{drv2}.{drv3}{drv4}\n"
f"DISPLAY FW REV: {dfw1}{dfw2}.{dfw3}.{dfw4}\n"
f"POWER: {pow1:x}{pow2:x}W\n"
f"ERROR STATUS: {errs}"
)
@debug.command()
@click.argument("pcap_file", type=click.Path(exists=True, path_type=Path))
def parse_pcap(pcap_file: Path) -> None:
"""Parse a PCAP file and reconstruct OmniLogic protocol communication.
Analyzes network packet captures to decode OmniLogic protocol messages.
Automatically reassembles multi-part messages (LeadMessage + BlockMessages)
and decompresses payloads.
The PCAP file should contain UDP traffic captured from OmniLogic controller
communication (typically on port 10444).
Example:
omnilogic debug parse-pcap /path/to/capture.pcap
tcpdump -i eth0 -w pool.pcap udp port 10444
omnilogic debug parse-pcap pool.pcap
"""
# Read the PCAP file
try:
packets = parse_pcap_file(str(pcap_file))
except Exception as e:
click.echo(f"Error reading PCAP file: {e}", err=True)
raise click.Abort from e
# Process all packets and extract OmniLogic messages
results = process_pcap_messages(packets)
# Display the results
for src_ip, dst_ip, omni_msg, decoded_content in results:
click.echo(f"\n{src_ip} sent {omni_msg.type.name} to {dst_ip}")
if decoded_content:
click.echo("Decoded message content:")
click.echo(decoded_content)
click.echo() # Extra newline for readability
@debug.command()
@click.argument("bow_id", type=int)
@click.argument("equip_id", type=int)
@click.argument("is_on")
@click.pass_context
def set_equipment(ctx: click.Context, bow_id: int, equip_id: int, is_on: str) -> None:
"""Control equipment by turning it on/off or setting a value.
BOW_ID: The Body of Water (pool/spa) system ID
EQUIP_ID: The equipment system ID to control
IS_ON: Equipment state - can be:
- Boolean: true/false, on/off, 1/0
- Integer: 0-100 for variable speed equipment (0=off, 1-100=speed percentage)
For most equipment (relays, lights), use true/false or 1/0.
For variable speed pumps/filters, use 0-100 to set speed percentage.
Examples:
# Turn on a relay
omnilogic --host 192.168.1.100 debug set-equipment 7 10 true
# Turn off a light
omnilogic --host 192.168.1.100 debug set-equipment 7 15 false
# Set pump to 50% speed
omnilogic --host 192.168.1.100 debug set-equipment 7 8 50
# Turn off pump (0% speed)
omnilogic --host 192.168.1.100 debug set-equipment 7 8 0
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
is_on_value: int | bool | str
# Parse is_on parameter - can be bool-like string or integer
is_on_lower = is_on.lower()
if is_on_lower in ("true", "on", "yes", "1"):
is_on_value = True
elif is_on_lower in ("false", "off", "no", "0"):
is_on_value = False
else:
is_on_value = is_on
# Execute the command
try:
asyncio.run(omni.async_set_equipment(bow_id, equip_id, is_on_value))
if isinstance(is_on_value, bool):
state = "ON" if is_on_value else "OFF"
click.echo(f"Successfully set equipment {equip_id} in BOW {bow_id} to {state}")
else:
click.echo(f"Successfully set equipment {equip_id} in BOW {bow_id} to {is_on_value}%")
except Exception as e:
click.echo(f"Error setting equipment: {e}", err=True)
raise click.Abort from e
@debug.command()
@click.argument("bow_id", type=int)
@click.argument("equip_id", type=int)
@click.argument("timed_percent", type=int)
@click.argument("op_mode", type=int)
@click.pass_context
def set_chlor_params(ctx: click.Context, bow_id: int, equip_id: int, timed_percent: int, op_mode: int) -> None:
"""Set chlorinator parameters with explicit control over configuration.
This command sets chlorinator parameters using the current chlorinator's
configuration for cell_type, sc_timeout, bow_type, and orp_timeout, while
allowing you to specify timed_percent and op_mode. The cfg_state is derived
from the chlorinator's current on/off state.
BOW_ID: The Body of Water (pool/spa) system ID
EQUIP_ID: The chlorinator equipment system ID
TIMED_PERCENT: Chlorine generation percentage (0-100)
OP_MODE: Operating mode (0=DISABLED, 1=TIMED, 2=ORP_AUTO)
Examples:
# Set chlorinator to 75% in TIMED mode
omnilogic --host 192.168.1.100 debug set-chlor-params 7 12 75 1
# Set to ORP AUTO mode with 50% generation
omnilogic --host 192.168.1.100 debug set-chlor-params 7 12 50 2
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
# Validate timed_percent
if not 0 <= timed_percent <= 1000: # Temporarily allow up to 1000 to test ORP behavior
click.echo(f"Error: timed_percent must be between 0-100, got {timed_percent}", err=True)
raise click.Abort
# Validate op_mode
if not 0 <= op_mode <= 2:
click.echo(f"Error: op_mode must be between 0-3, got {op_mode}", err=True)
raise click.Abort
# Get MSPConfig and Telemetry to find the chlorinator
try:
mspconfig_raw = asyncio.run(omni.async_get_mspconfig(raw=False))
telemetry_raw = asyncio.run(omni.async_get_telemetry(raw=False))
except Exception as e:
click.echo(f"Error retrieving configuration: {e}", err=True)
raise click.Abort from e
# Find the BOW
bow = None
if mspconfig_raw.backyard.bow:
for candidate_bow in mspconfig_raw.backyard.bow:
if candidate_bow.system_id == bow_id:
bow = candidate_bow
break
if bow is None:
click.echo(f"Error: Body of Water with ID {bow_id} not found", err=True)
raise click.Abort
# Find the chlorinator
if bow.chlorinator is None or bow.chlorinator.system_id != equip_id:
click.echo(f"Error: Chlorinator with ID {equip_id} not found in BOW {bow_id}", err=True)
raise click.Abort
chlorinator = bow.chlorinator
# Get telemetry for the chlorinator to determine is_on state
chlorinator_telemetry = telemetry_raw.get_telem_by_systemid(equip_id)
if chlorinator_telemetry is None:
click.echo(f"Warning: No telemetry found for chlorinator {equip_id}, defaulting cfg_state to 3 (on)", err=True)
cfg_state = 3
else:
# Cast to TelemetryChlorinator to access enable attribute
chlorinator_telem = cast("TelemetryChlorinator", chlorinator_telemetry)
# Determine cfg_state from enable flag in telemetry
cfg_state = 3 if chlorinator_telem.enable else 2
# Determine bow_type from equipment type (0=pool, 1=spa)
bow_type = 0 if bow.equip_type == "BOW_POOL" else 1
# Get parameters from chlorinator config
cell_type = chlorinator.cell_type.value
sc_timeout = chlorinator.superchlor_timeout
orp_timeout = chlorinator.orp_timeout
# Execute the command
try:
asyncio.run(
omni.async_set_chlorinator_params(
pool_id=bow_id,
equipment_id=equip_id,
timed_percent=timed_percent,
cell_type=cell_type,
op_mode=op_mode,
sc_timeout=sc_timeout,
bow_type=bow_type,
orp_timeout=orp_timeout,
cfg_state=cfg_state,
)
)
click.echo(
f"Sent command to chlorinator {equip_id} in BOW {bow_id}:\n"
f" Timed Percent: {timed_percent}%\n"
f" Operating Mode: {op_mode}\n"
f" Config State: {cfg_state} ({'on' if cfg_state == 3 else 'off'})\n"
f" Cell Type: {cell_type}\n"
f" SC Timeout: {sc_timeout}\n"
f" BOW Type: {bow_type}\n"
f" ORP Timeout: {orp_timeout}"
)
except Exception as e:
click.echo(f"Error setting chlorinator parameters: {e}", err=True)
raise click.Abort from e
@debug.command()
@click.argument("bow_id", type=int)
@click.argument("csad_id", type=int)
@click.argument("target", type=float)
@click.pass_context
def set_csad_ph(ctx: click.Context, bow_id: int, csad_id: int, target: float) -> None:
"""Set the pH target value for a CSAD (Chemical Sense and Dispense).
This command sets the target pH level that the CSAD will attempt to maintain.
BOW_ID: The Body of Water (pool/spa) system ID
CSAD_ID: The CSAD equipment system ID
TARGET: Target pH value (typically 7.0-8.0)
Examples:
# Set pH target to 7.4
omnilogic --host 192.168.1.100 debug set-csad-ph 7 20 7.4
# Set pH target to 7.2
omnilogic --host 192.168.1.100 debug set-csad-ph 7 20 7.2
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
# Validate target pH (typical range is 6.8-8.2, but allow wider range)
if not 0.0 <= target <= 14.0:
click.echo(f"Error: pH target must be between 0.0-14.0, got {target}", err=True)
raise click.Abort
# Execute the command
try:
asyncio.run(omni.async_set_csad_target_value(pool_id=bow_id, csad_id=csad_id, ph_target=target))
click.echo(f"Successfully set CSAD {csad_id} in BOW {bow_id} pH target to {target}")
except Exception as e:
click.echo(f"Error setting CSAD pH target: {e}", err=True)
raise click.Abort from e
@debug.command()
@click.argument("bow_id", type=int)
@click.argument("csad_id", type=int)
@click.argument("target", type=int)
@click.pass_context
def set_csad_orp(ctx: click.Context, bow_id: int, csad_id: int, target: int) -> None:
"""Set the ORP target level for a CSAD (Chemical Sense and Dispense).
This command sets the target ORP (Oxidation-Reduction Potential) level in
millivolts that the CSAD will attempt to maintain.
BOW_ID: The Body of Water (pool/spa) system ID
CSAD_ID: The CSAD equipment system ID
TARGET: Target ORP value in millivolts (typically 600-800 mV)
Examples:
# Set ORP target to 700 mV
omnilogic --host 192.168.1.100 debug set-csad-orp 7 20 700
# Set ORP target to 650 mV
omnilogic --host 192.168.1.100 debug set-csad-orp 7 20 650
"""
ensure_connection(ctx)
omni: OmniLogicAPI = ctx.obj["OMNI"]
# Validate target ORP (typical range is 400-900 mV, but allow 0-1000)
if not 0 <= target <= 1000:
click.echo(f"Error: ORP target must be between 0-1000 mV, got {target}", err=True)
raise click.Abort
# Execute the command
try:
asyncio.run(omni.async_set_csad_orp_target_level(pool_id=bow_id, csad_id=csad_id, orp_target=target))
click.echo(f"Successfully set CSAD {csad_id} in BOW {bow_id} ORP target to {target} mV")
except Exception as e:
click.echo(f"Error setting CSAD ORP target: {e}", err=True)
raise click.Abort from e