Skip to content

Commit f208352

Browse files
author
sidey79
committed
feat: Implementiere MQTT Command-Handling und Architektur-Refactoring
Integriert einen dedizierten MQTT Command-Listener und refaktoriert die Publisher/Controller-Beziehung für direktes Command-Handling. - feat(mqtt): MqttPublisher verarbeitet Commands nun intern über `_handle_command` - refactor(controller): Controller injiziert MqttPublisher für Heartbeats und direkten Zugriff - perf(protocols): Optimierung der Demodulationsschleife für inaktive Protokolle in `message_unsynced.py` - fix(main): Auflösung der zirkulären Abhängigkeit zwischen Controller und Publisher - test: Anpassung der Testsuite an neue Architektur und Mocking-Strategie
1 parent b500c3b commit f208352

12 files changed

Lines changed: 309 additions & 180 deletions

File tree

.devcontainer/devcontainer.json

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
{
44
"name": "Python 3",
55
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
6-
"image": "mcr.microsoft.com/devcontainers/python:2-3-bookworm",
6+
"dockerComposeFile": "docker-compose.yml",
7+
"workspaceFolder": "/workspaces/PySignalduino",
8+
"service": "devcontainer",
9+
// Features to add to the dev container. More info: https://containers.dev/features.
10+
// "features": {},
711
"features": {
812
"ghcr.io/devcontainers/features/node:1": {
913
"nodeGypDependencies": true,
@@ -18,32 +22,15 @@
1822
}
1923
//"ghcr.io/hspaans/devcontainer-features/pytest:2": {}
2024
},
21-
22-
// Features to add to the dev container. More info: https://containers.dev/features.
23-
// "features": {},
24-
2525
// Use 'forwardPorts' to make a list of ports inside the container available locally.
2626
// "forwardPorts": [],
27-
28-
// Use 'postCreateCommand' to run commands after the container is created.
29-
"postCreateCommand": "pip3 install --user -r requirements-dev.txt -r requirements.txt || exit 0",
3027
"customizations": {
3128
"vscode": {
3229
"extensions": [
3330
"RooVeterinaryInc.roo-cline"
3431
]
3532
}
3633
},
37-
"runArgs": [
38-
"--env-file",
39-
".devcontainer/devcontainer.env",
40-
"--network=bridge",
41-
"--memory=4gb"
42-
],
43-
44-
// Configure tool-specific properties.
45-
// "customizations": {},
46-
47-
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
48-
// "remoteUser": "root"
34+
// Use 'postCreateCommand' to run commands after the container is created.
35+
"postCreateCommand": "pip3 install --user -r requirements-dev.txt -r requirements.txt || exit 0"
4936
}

.devcontainer/docker-compose.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
services:
2+
devcontainer:
3+
# Build the image from the existing devcontainer setup
4+
image: mcr.microsoft.com/devcontainers/python:3
5+
# The current working directory is mounted automatically
6+
volumes:
7+
- ..:/workspaces/PySignalduino
8+
9+
# Use the existing settings from devcontainer.json
10+
# Overriding the entrypoint is necessary when using a non-Compose devcontainer base image
11+
command: /bin/bash -c "sleep infinity"
12+
13+
# Environment variables from .devcontainer/devcontainer.env
14+
env_file:
15+
- ./devcontainer.env
16+
17+
# This ensures services in the compose file can be reached by their service name
18+
# The default bridge network is sufficient for this purpose
19+
20+
mqtt:
21+
image: eclipse-mosquitto:latest
22+
container_name: mosquitto-dev-broker
23+
#ports:
24+
# Expose port 1883 on the host, so other local clients can also connect if needed
25+
#- "1883:1883"
26+
volumes:
27+
- ./mosquitto/config:/mosquitto/config
28+
- ./mosquitto/data:/mosquitto/data
29+
- ./mosquitto/log:/mosquitto/log
30+
command: mosquitto -c /mosquitto/config/mosquitto.conf
31+
restart: unless-stopped
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
listener 1883 0.0.0.0
2+
allow_anonymous true
3+
4+
# Mosquitto Standard-Pfade
5+
persistence true
6+
persistence_location /mosquitto/data/
7+
log_dest file /mosquitto/log/mosquitto.log

.vscode/settings.json

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,14 @@
4545
"tests"
4646
],
4747
"python.testing.unittestEnabled": false,
48-
"python.testing.pytestEnabled": true
48+
"python.testing.pytestEnabled": true,
49+
"vsmqtt.brokerProfiles": [
50+
{
51+
"name": "devmqtt",
52+
"host": "mqtt",
53+
"port": 1883,
54+
"clientId": "vsmqtt_client_db93",
55+
"savedSubscriptions": ['signalduino/']
56+
}
57+
]
4958
}

AGENTS.md

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,22 +253,32 @@ Dieser Architecture-First Development Process ist für **alle** neuen Funktionen
253253

254254
Die Einhaltung dieses Prozesses gewährleistet, dass Design-Entscheidungen bewusst getroffen, dokumentiert und nachvollziehbar sind, was die langfristige Wartbarkeit, Skalierbarkeit und Qualität des PySignalduino-Projekts sicherstellt.
255255

256-
## Fehlerbehebungsprozess für fehlende Abhängigkeiten
257-
256+
## Fehlerbehebungsprozess
258257
### Problemidentifikation
259258
1. **Symptom:** ImportError oder ModuleNotFoundError während der Testausführung
260259
2. **Ursachenanalyse:**
261260
- Überprüfen der Traceback-Meldung auf fehlende Module
262261
- Vergleich mit requirements.txt und requirements-dev.txt
263262
- Prüfen der Dokumentation auf Installationsanweisungen
264263

265-
### Lösungsimplementierung
264+
### Lösungsimplementierung (Abhängigkeiten)
266265
1. **requirements-dev.txt aktualisieren:**
267266
- Modulname zur Datei hinzufügen
268267
- Commit mit Conventional Commits Syntax erstellen (z.B. "fix: add <module> to requirements-dev.txt")
269268
2. **Dokumentation prüfen:**
270269
- Sicherstellen, dass Installationsanweisungen in README.md und docs/ aktuell sind
271270

271+
### Problemidentifikation (Hohe CPU-Last im Parser)
272+
1. **Symptom:** Anhaltende 100% CPU-Auslastung auf einem oder mehreren Kernen während des Parsens von MU/MC-Nachrichten.
273+
2. **Ursachenanalyse:**
274+
- **Parser-Architektur prüfen:** Der gesamte Parservorgang sollte in [`signalduino/controller.py`](signalduino/controller.py) über `asyncio.to_thread` abgewickelt werden.
275+
- **Protokoll-Ineffizienz:** Die synchrone Demodulationsschleife in [`sd_protocols/message_unsynced.py`](sd_protocols/message_unsynced.py) oder [`sd_protocols/manchester.py`](sd_protocols/manchester.py) blockiert den Worker-Thread zu lange.
276+
3. **Validierung:** Temporäres Hinzufügen von Zeit-Logging (z.B. mit `time.perf_counter()`) in der Protokollschleife in `demodulate_mu` zur Identifizierung des blockierenden Protokolls.
277+
278+
### Lösungsimplementierung (Parser-Performance)
279+
1. **Backtracking-Hölle vermeiden:** Wenn ein Protokoll eine sehr lange Demodulationszeit (z.B. > 10ms) aufweist, liegt wahrscheinlich ein Catastrophic Backtracking in einem regulären Ausdruck vor.
280+
2 **Deaktivierung inaktiver Protokolle:** Die `active`-Prüfung in `demodulate_mu` sollte verwendet werden, um inaktive Protokolle auszuschließen.
281+
272282
### Verifikation
273283
1. **Installation testen:**
274284
```bash

docs/ASYNCIO_MIGRATION.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,30 @@ Vergessene `await`‑Schlüsselwörter führen zu `RuntimeWarning` oder hängen
194194

195195
Wenn Sie Threads und asyncio mischen müssen (z.B. für Legacy‑Code), verwenden Sie `asyncio.run_coroutine_threadsafe()` oder `loop.call_soon_threadsafe()`.
196196

197+
### 4. Async-Busy-Loops und CPU-Auslastung (100%)
198+
199+
Wenn `asyncio.Queue.get()` in einer `while True`-Schleife ständig Elemente zurückgibt (z.B. bei hohem Nachrichtenaufkommen), kann die Co-Routine den Event-Loop dominieren, selbst wenn die schwere Arbeit in einem Thread-Pool ausgelagert wird. Dies führt zu hoher CPU-Auslastung und sporadischer Bearbeitung anderer Async-Tasks.
200+
201+
**Lösung:** Stellen Sie in schnell laufenden Verarbeitungsschleifen sicher, dass ein expliziter Yield-Punkt vorhanden ist, um anderen Tasks die Kontrolle zu übergeben.
202+
203+
```python
204+
# Falsch (potenzielle Busy-Loop bei vollem Buffer)
205+
# while not self._stop_event.is_set():
206+
# item = await queue.get()
207+
# process_item(item) # Wenn schnell, dominiert diese Task
208+
209+
# Korrekt
210+
while not self._stop_event.is_set():
211+
try:
212+
line = await self._raw_message_queue.get()
213+
# ... Verarbeitung (kann await asyncio.to_thread enthalten) ...
214+
215+
# Sicherstellen, dass andere Tasks Zeit bekommen
216+
await asyncio.sleep(0.01)
217+
except Exception:
218+
break
219+
```
220+
197221
## Vollständiges Migrationsbeispiel
198222

199223
Hier ein komplettes Beispiel, das einen einfachen MQTT‑Bridge‑Service migriert:

main.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from signalduino.constants import SDUINO_CMD_TIMEOUT
1111
from signalduino.controller import SignalduinoController
1212
from signalduino.exceptions import SignalduinoConnectionError, SignalduinoCommandTimeout
13+
from signalduino.mqtt import MqttPublisher
1314
from signalduino.transport import SerialTransport, TCPTransport
1415
from signalduino.types import DecodedMessage, RawFrame # NEU: RawFrame
1516

@@ -58,7 +59,7 @@ async def _async_run(args: argparse.Namespace):
5859
logger.info(f"Initialisiere serielle Verbindung auf {args.serial} mit {args.baud} Baud...")
5960
transport = SerialTransport(port=args.serial, baudrate=args.baud)
6061
elif args.tcp:
61-
logger.info(f"Initialisiere TCP Verbindung zu {args.tcp}:{args.port}...")
62+
logger.info(f"Initializing TCP connection to {args.tcp}:{args.port}...")
6263
transport = TCPTransport(host=args.tcp, port=args.port)
6364

6465
# Wenn weder --serial noch --tcp (oder deren ENV-Defaults) gesetzt sind
@@ -67,18 +68,42 @@ async def _async_run(args: argparse.Namespace):
6768
sys.exit(1)
6869

6970
# Controller initialisieren
71+
# Wir initialisieren den Controller zuerst (mit mqtt_publisher=None),
72+
# um ihn als Argument an MqttPublisher übergeben zu können (zirkuläre Abhängigkeit).
7073
controller = SignalduinoController(
7174
transport=transport,
7275
message_callback=message_callback,
73-
logger=logger
76+
logger=logger,
77+
mqtt_publisher=None # Wird später zugewiesen
7478
)
79+
80+
# MQTT Publisher explizit initialisieren, falls Host in Argumenten gesetzt
81+
mqtt_publisher = None
82+
# args.mqtt_host ist gesetzt, wenn es entweder als CLI-Argument übergeben wurde
83+
# oder wenn es als Umgebungsvariable gesetzt war (Standardwert in main()).
84+
# Wir prüfen hier nur, ob ein Wert vorhanden ist, da der Controller sonst
85+
# intern die Umgebungsvariable MQTT_HOST prüft.
86+
if args.mqtt_host:
87+
logger.info(f"Initializing MQTT publisher for host: {args.mqtt_host}")
88+
mqtt_publisher = MqttPublisher(
89+
logger=logger,
90+
controller=controller, # Korrektur: Fügt das fehlende 'controller'-Argument hinzu
91+
host=args.mqtt_host,
92+
port=args.mqtt_port,
93+
username=args.mqtt_username,
94+
password=args.mqtt_password,
95+
topic=args.mqtt_topic,
96+
)
97+
# Weisen Sie den Publisher dem Controller nachträglich zu.
98+
controller.mqtt_publisher = mqtt_publisher # NEU: Nachträgliche Zuweisung
99+
75100

76101
# Starten
77102
try:
78-
logger.info("Verbinde zum Signalduino...")
103+
logger.info("Connecting to Signalduino...")
79104
# NEU: Verwende async with Block
80105
async with controller:
81-
logger.info("Verbunden! Starte Initialisierung und Hauptschleife...")
106+
logger.info("Connected! Starting initialization and main loop...")
82107

83108
# Starte die Hauptschleife, warte auf deren Beendigung oder ein Timeout
84109
await controller.run(timeout=args.timeout)

sd_protocols/message_unsynced.py

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ def demodulate_mu(self, msg_data: Dict[str, Any], msg_type: str = "MU") -> List[
4545
mu_protocols = self.get_keys('clockabs')
4646

4747
for pid in mu_protocols:
48+
if not self.check_property(pid, 'active', True):
49+
continue
4850
self._logging(f"MU checking PID {pid}", 5)
4951
# Prepare working copy of raw_data and patterns
5052
# (Perl does this per protocol iteration because filterfunc might modify them)
@@ -144,63 +146,40 @@ def demodulate_mu(self, msg_data: Dict[str, Any], msg_type: str = "MU") -> List[
144146
# Construct Regex
145147
# Perl: $regex="(?:$startStr)($signalRegex)"; where signalRegex is (one|zero|float){min,}
146148

147-
signal_or_group = "|".join(signal_regex_parts)
148-
if self.get_property(pid, 'reconstructBit'):
149-
# Add endPatternLookup keys
150-
extras = [re.escape(k) for k in end_pattern_lookup.keys()]
151-
if extras:
152-
signal_or_group += "|" + "|".join(extras)
153-
154-
length_min = self.check_property(pid, 'length_min', 0)
155-
# length_max = self.check_property(pid, 'length_max', '')
156-
157-
# Python re doesn't support variable length lookbehind or similar easily,
158-
# but here we are matching forward.
159-
# Perl loop: while ( $rawData =~ m/$regex/g)
160-
# regex = (?:$startStr)((?:p1|p2|...){min,})
161-
162-
# We already sliced raw_data to start at startStr if present.
163-
# So startStr is at the beginning of current_raw_data.
164-
# However, if startStr was found, it is consumed?
165-
# Perl: $rawData = substr($rawData, $message_start);
166-
# regex = "(?:$startStr)($signalRegex)";
167-
# So it matches startStr again at the beginning?
168-
# Wait, if we sliced it, the first chars ARE startStr.
169-
170-
# Let's try to match iteratively
171-
172-
full_regex_str = f"(?:{re.escape(start_str)})((?:{signal_or_group}){{ {length_min}, }})"
173-
if self.get_property(pid, 'reconstructBit'):
174-
# Perl: $signalRegex .= '(?:' . join('|',keys %endPatternLookupHash) . ')?';
175-
# This is appended to the repeating group? No.
176-
# Perl code:
177-
# $signalRegex .= qq[{$length_min,}];
178-
# if (defined(...reconstructBit...)) { $signalRegex .= '(?:' . join('|',keys %endPatternLookupHash) . ')?'; }
179-
# So it's ((?:p1|p2){min,}(?:partial)?)
180-
pass # Logic handled below manually or we construct regex precisely
181-
182-
# It seems cleaner to just use the regex to find the data part
183-
# Constructing complex regex in Python from dynamic parts
184-
185-
# Simplified approach:
186-
# 1. We are at start of potential message (startStr)
187-
# 2. Extract as many valid chunks as possible
188-
189-
# Re-implementing Perl's while loop over matches
190-
# The regex matches the *entire* message (start + data).
149+
# Build the base repeating pattern (signal_group_inner)
150+
# Optimization for catastrophic backtracking (e.g., P61: '12|11' -> '1(2|1)')
151+
# Only apply if all parts share the same length and single-character prefix.
191152

192-
# Adjust signal_or_group for the repeating part
193-
signal_group_inner = "|".join(signal_regex_parts)
153+
unescaped_parts = list(pattern_lookup.keys())
154+
signal_group_inner = "|".join(signal_regex_parts) # Default: unoptimized
194155

156+
try:
157+
# Check if optimization is possible (all same length, same prefix, length > 1)
158+
if unescaped_parts and all(len(p) == len(unescaped_parts[0]) for p in unescaped_parts) and len(unescaped_parts[0]) > 1:
159+
first_part = unescaped_parts[0]
160+
prefix = first_part[0]
161+
162+
if all(p.startswith(prefix) for p in unescaped_parts):
163+
suffixes = [p[1:] for p in unescaped_parts]
164+
165+
# Reconstruct the inner group: prefix(?:suffix1|suffix2|...)
166+
# Note: re.escape is safe even for single characters
167+
signal_group_inner = re.escape(prefix) + "(?:" + "|".join(re.escape(s) for s in suffixes) + ")"
168+
self._logging(f"MU Demod: Optimized repeating pattern for PID {pid}: {signal_group_inner}", 5)
169+
except Exception:
170+
# Fallback to default in case of unexpected pattern data
171+
pass
172+
195173
# Handle reconstructBit logic for regex end
196174
reconstruct_part = ""
197175
if self.get_property(pid, 'reconstructBit') and end_pattern_lookup:
198176
reconstruct_part = "(?:" + "|".join([re.escape(k) for k in end_pattern_lookup.keys()]) + ")?"
199177

200-
# We need to compile this regex
178+
length_min = self.check_property(pid, 'length_min', 0)
179+
201180
# Note: Python f-string braces need escaping
202181
regex_pattern = f"(?:{re.escape(start_str)})((?:{signal_group_inner}){{{length_min},}}{reconstruct_part})"
203-
182+
204183
try:
205184
# print(f"DEBUG: Compiling regex for {pid}: {regex_pattern[:50]}...")
206185
matcher = re.compile(regex_pattern)

signalduino/controller.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from datetime import datetime, timedelta, timezone
88
from typing import Any, Awaitable, Callable, List, Optional, Dict, Tuple, Pattern
99

10-
from .commands import SignalduinoCommands, MqttCommandDispatcher
10+
from .commands import SignalduinoCommands
1111
from .constants import (
1212
SDUINO_CMD_TIMEOUT,
1313
SDUINO_INIT_MAXRETRY,
@@ -56,7 +56,7 @@ def __init__(
5656
# NEU: Automatische Initialisierung des MqttPublisher, wenn keine Instanz übergeben wird und
5757
# die Umgebungsvariable MQTT_HOST gesetzt ist.
5858
if mqtt_publisher is None and os.environ.get("MQTT_HOST"):
59-
self.mqtt_publisher = MqttPublisher(logger=self.logger)
59+
self.mqtt_publisher = MqttPublisher(controller=self, logger=self.logger)
6060
else:
6161
self.mqtt_publisher = mqtt_publisher
6262

@@ -78,8 +78,10 @@ def __init__(
7878

7979
mqtt_topic_root = self.mqtt_publisher.base_topic if self.mqtt_publisher else None
8080
self.commands = SignalduinoCommands(self.send_command, mqtt_topic_root)
81-
if mqtt_publisher:
82-
self.mqtt_dispatcher = MqttCommandDispatcher(self)
81+
82+
def get_version(self) -> Optional[str]:
83+
"""Returns the cached firmware version string."""
84+
return self.init_version_response
8385

8486
async def send_command(
8587
self,
@@ -148,7 +150,7 @@ async def _reader_task(self) -> None:
148150
self.logger.debug(f"Reader task received line: {line}")
149151
await self._raw_message_queue.put(line)
150152

151-
await asyncio.sleep(0) # Ensure yield, even if readline returns immediately without data
153+
await asyncio.sleep(0.01) # Ensure minimal yield time to prevent 100% CPU usage
152154
except Exception as e:
153155
self.logger.error(f"Reader task error: {e}")
154156
break
@@ -167,6 +169,9 @@ async def _parser_task(self) -> None:
167169
# Verwende die neue MqttPublisher.publish(message: DecodedMessage) Signatur
168170
await self.mqtt_publisher.publish(decoded[0])
169171
await self._handle_as_command_response(line)
172+
173+
# Ensure a minimal yield time for other tasks when the queue is rapidly processed.
174+
await asyncio.sleep(0.01)
170175
except Exception as e:
171176
self.logger.error(f"Parser task error: {e}")
172177
break
@@ -369,12 +374,4 @@ async def _publish_status_heartbeat(self) -> None:
369374
"version": self.init_version_response,
370375
"connected": not self.transport.closed()
371376
}
372-
await self.mqtt_publisher.publish_simple("status/heartbeat", json.dumps(status))
373-
374-
async def _handle_mqtt_command(self, topic: str, payload: str) -> None:
375-
"""Handle incoming MQTT commands."""
376-
if self.mqtt_dispatcher:
377-
try:
378-
await self.mqtt_dispatcher.dispatch(topic, payload)
379-
except CommandValidationError as e:
380-
self.logger.error(f"Invalid MQTT command: {e}")
377+
await self.mqtt_publisher.publish_simple("status/heartbeat", json.dumps(status))

0 commit comments

Comments
 (0)