Skip to content

Commit 5988e8b

Browse files
authored
subdata initialization: wait for all retained topics (#3015)
* subdata initialization: wait for all retained topics * clean up * fix * fix pytest * clean up
1 parent a316977 commit 5988e8b

7 files changed

Lines changed: 42 additions & 72 deletions

File tree

packages/helpermodules/command_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515

1616

1717
@pytest.fixture
18-
def subdata_fixture() -> None:
19-
SubData(*([Mock()]*16))
18+
def subdata_fixture(monkeypatch) -> None:
19+
monkeypatch.setattr(SubData, "initialize", lambda x: None)
20+
SubData(*([Mock()]*15))
2021
SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock(
2122
spec=Chargepoint, chargepoint_module=Mock(spec=ChargepointModulePro)))}
2223

packages/helpermodules/setdata.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,15 @@ class SetData:
2828
def __init__(self,
2929
event_ev_template: Event,
3030
event_cp_config: Event,
31-
event_soc: Event,
32-
event_subdata_initialized: Event):
31+
event_soc: Event):
3332
self.event_ev_template = event_ev_template
3433
self.event_cp_config = event_cp_config
3534
self.event_soc = event_soc
36-
self.event_subdata_initialized = event_subdata_initialized
3735
self.heartbeat = False
3836

3937
def set_data(self):
4038
self.internal_broker_client = BrokerClient("mqttset", self.on_connect, self.on_message)
41-
self.event_subdata_initialized.wait()
42-
log.debug("Subdata initialization completed. Starting setdata loop to broker.")
39+
log.debug("Starting setdata loop to broker.")
4340
self.internal_broker_client.start_infinite_loop()
4441

4542
def disconnect(self) -> None:

packages/helpermodules/subdata.py

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
from pathlib import Path
66
from threading import Event
7+
import time
78
from typing import Dict, Union
89
import re
910
import subprocess
@@ -23,7 +24,6 @@
2324
from helpermodules import graph, system
2425
from helpermodules.broker import BrokerClient
2526
from helpermodules.messaging import MessageType, pub_system_message
26-
from helpermodules.utils import ProcessingCounter
2727
from helpermodules.utils.run_command import run_command
2828
from helpermodules.utils.topic_parser import decode_payload, get_index, get_second_index
2929
from helpermodules.pub import Pub
@@ -39,6 +39,8 @@
3939
log = logging.getLogger(__name__)
4040
mqtt_log = logging.getLogger("mqtt")
4141

42+
QUIET_TIME = 3
43+
4244

4345
class SubData:
4446
""" Klasse, die die benötigten Topics abonniert, die Instanzen erstellt, wenn z.b. ein Modul neu konfiguriert
@@ -77,7 +79,6 @@ def __init__(self,
7779
event_copy_data: Event,
7880
event_global_data_initialized: Event,
7981
event_command_completed: Event,
80-
event_subdata_initialized: Event,
8182
event_vehicle_update_completed: Event,
8283
event_start_internal_chargepoint: Event,
8384
event_stop_internal_chargepoint: Event,
@@ -93,7 +94,6 @@ def __init__(self,
9394
self.event_copy_data = event_copy_data
9495
self.event_global_data_initialized = event_global_data_initialized
9596
self.event_command_completed = event_command_completed
96-
self.event_subdata_initialized = event_subdata_initialized
9797
self.event_vehicle_update_completed = event_vehicle_update_completed
9898
self.event_start_internal_chargepoint = event_start_internal_chargepoint
9999
self.event_stop_internal_chargepoint = event_stop_internal_chargepoint
@@ -104,13 +104,28 @@ def __init__(self,
104104
self.event_modbus_server = event_modbus_server
105105
self.event_restart_gpio = event_restart_gpio
106106
self.heartbeat = False
107-
# Immer wenn ein Subscribe hinzugefügt wird, wird der Zähler hinzugefügt und subdata_initialized gepublished.
108-
# Wenn subdata_initialized empfangen wird, wird der Zäheler runtergezählt. Erst wenn alle subdata_initialized
109-
# empfangen wurden, wurden auch die vorher subskribierten Topics empfangen und der Algorithmus kann starten.
110-
self.processing_counter = ProcessingCounter(self.event_subdata_initialized)
107+
self.last_msg_time = None
108+
self.initialized = False
109+
self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message)
110+
self.initialize()
111+
112+
def initialize(self):
113+
try:
114+
self.internal_broker_client.client.loop_start()
115+
log.debug("Warte auf retained Topics ...")
116+
while self.last_msg_time is None:
117+
time.sleep(0.05)
118+
# quiet time detector: mqtt schickt alle retained messages direkt nach dem subscribe
119+
while time.time() - self.last_msg_time < QUIET_TIME:
120+
time.sleep(0.05)
121+
self.internal_broker_client.client.loop_stop()
122+
log.debug("Alle retained Topics empfangen.")
123+
self.initialized = True
124+
except Exception:
125+
log.exception("Fehler beim Initialisieren des Subdata-Moduls")
111126

112127
def sub_topics(self):
113-
self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message)
128+
log.debug("Starte Subdata-MQTT-Loop")
114129
self.internal_broker_client.start_infinite_loop()
115130

116131
def disconnect(self) -> None:
@@ -149,15 +164,14 @@ def on_connect(self, client: mqtt.Client, userdata, flags: dict, rc: int):
149164
("openWB/LegacySmartHome/Status/wattnichtHaus", 2),
150165
("openWB/io/#", 2),
151166
])
152-
self.processing_counter.add_task()
153-
Pub().pub("openWB/system/subdata_initialized", True)
154167

155168
def on_message(self, client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
156169
""" wait for incoming topics.
157170
"""
158-
mqtt_log.debug("Topic: "+str(msg.topic) +
159-
", Payload: "+str(msg.payload.decode("utf-8")))
171+
mqtt_log.debug(f"Topic: {msg.topic}, Payload: {msg.payload.decode('utf-8')}")
160172
self.heartbeat = True
173+
if self.initialized is False:
174+
self.last_msg_time = time.time()
161175
if "openWB/vehicle/template/charge_template/" in msg.topic:
162176
self.process_vehicle_charge_template_topic(
163177
self.ev_charge_template_data, msg)
@@ -311,8 +325,6 @@ def process_vehicle_topic(self, client: mqtt.Client, var: Dict[str, ev.Ev], msg:
311325
var["ev"+index].soc_module = mod.create_vehicle(config, index)
312326
client.subscribe(f"openWB/vehicle/{index}/soc_module/calculated_soc_state", 2)
313327
client.subscribe(f"openWB/vehicle/{index}/soc_module/general_config", 2)
314-
self.processing_counter.add_task()
315-
Pub().pub("openWB/system/subdata_initialized", True)
316328
self.event_soc.set()
317329
else:
318330
# temporäres ChargeTemplate aktualisieren, wenn dem Fahrzeug ein anderes Ladeprofil zugeordnet
@@ -610,10 +622,7 @@ def process_general_topic(self, var: general.General, msg: mqtt.MQTTMessage):
610622
if decode_payload(msg.payload) and self.general_data.data.extern:
611623
self.event_modbus_server.set()
612624
elif "openWB/general/http_api" == msg.topic:
613-
if (
614-
self.event_subdata_initialized.is_set() and
615-
self.general_data.data.http_api != decode_payload(msg.payload)
616-
):
625+
if self.initialized and self.general_data.data.http_api != decode_payload(msg.payload):
617626
pub_system_message(
618627
msg.payload,
619628
"Bitte die openWB <a href=\"/openWB/web/settings/#/System/SystemConfiguration\">"
@@ -750,7 +759,7 @@ def process_optional_topic(self, var: optional.Optional, msg: mqtt.MQTTMessage):
750759
var.data.ocpp = dataclass_from_dict(Ocpp, config_dict)
751760
elif re.search("/optional/monitoring/", msg.topic) is not None:
752761
# do not reconfigure monitoring if topic is received on startup
753-
if self.event_subdata_initialized.is_set():
762+
if self.initialized:
754763
config = decode_payload(msg.payload)
755764
if config["type"] is None:
756765
var.monitoring_stop()
@@ -834,8 +843,6 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
834843
# Durch das erneute Subscribe werden die Komponenten mit dem aktualisierten TCP-Client angelegt.
835844
client.subscribe(f"openWB/system/device/{index}/component/+/config", 2)
836845
client.subscribe(f"openWB/system/device/{index}/error_timestamp", 2)
837-
self.processing_counter.add_task()
838-
Pub().pub("openWB/system/subdata_initialized", True)
839846
elif re.search("^.+/device/[0-9]+/component/[0-9]+/simulation$", msg.topic) is not None:
840847
index = get_index(msg.topic)
841848
index_second = get_second_index(msg.topic)
@@ -872,11 +879,9 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
872879
config = dataclass_from_dict(component.component_descriptor.configuration_factory, component_config)
873880
var["device"+index].add_component(config)
874881
client.subscribe(f"openWB/system/device/{index}/component/{index_second}/simulation", 2)
875-
self.processing_counter.add_task()
876-
Pub().pub("openWB/system/subdata_initialized", True)
877882
elif "mqtt" and "bridge" in msg.topic:
878883
# do not reconfigure mqtt bridges if topic is received on startup
879-
if self.event_subdata_initialized.is_set():
884+
if self.initialized:
880885
index = get_index(msg.topic)
881886
parent_file = Path(__file__).resolve().parents[2]
882887
try:
@@ -904,7 +909,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
904909
run_command([str(Path(__file__).resolve().parents[2] / "runs" / "start_remote_support.sh"),
905910
token, port, user], process_exception=True)
906911
elif "openWB/system/backup_password" in msg.topic:
907-
if self.event_subdata_initialized.is_set():
912+
if self.initialized:
908913
key_file = Path.home() / "backup.key"
909914
payload = decode_payload(msg.payload)
910915
if payload is None or payload == "":
@@ -927,7 +932,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
927932
self.set_json_payload(var["system"].data["backup_cloud"], msg)
928933
elif ("openWB/system/dataprotection_acknowledged" == msg.topic and
929934
decode_payload(msg.payload) is False):
930-
if self.event_subdata_initialized.is_set():
935+
if self.initialized:
931936
Pub().pub("openWB/set/command/removeCloudBridge/todo",
932937
{"command": "removeCloudBridge"})
933938
else:
@@ -967,10 +972,6 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
967972
"openWB/system/time" == msg.topic):
968973
# Logged in update.log, not used in data.data and removed due to readability purposes of main.log.
969974
return
970-
elif "openWB/system/subdata_initialized" == msg.topic:
971-
if decode_payload(msg.payload) != "":
972-
Pub().pub("openWB/system/subdata_initialized", "")
973-
self.processing_counter.task_done()
974975
elif "openWB/system/update_config_completed" == msg.topic:
975976
if decode_payload(msg.payload) != "":
976977
Pub().pub("openWB/system/update_config_completed", "")
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
from helpermodules.utils._get_default import get_default
22
from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler
3-
from helpermodules.utils.processing_counter import ProcessingCounter

packages/helpermodules/utils/processing_counter.py

Lines changed: 0 additions & 26 deletions
This file was deleted.

packages/main.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import time
2020
from threading import Event, Thread, enumerate
2121
import traceback
22-
from control.chargelog.chargelog import calc_energy_costs, calculate_charged_energy_by_source
22+
from control.chargelog.chargelog import calc_energy_costs
2323

2424
from control import data, prepare, process
2525
from control.algorithm import algorithm
@@ -303,7 +303,6 @@ def schedule_jobs():
303303
event_global_data_initialized = Event()
304304
event_command_completed = Event()
305305
event_command_completed.set()
306-
event_subdata_initialized = Event()
307306
event_update_config_completed = Event()
308307
event_modbus_server = Event()
309308
event_jobs_running = Event()
@@ -314,12 +313,11 @@ def schedule_jobs():
314313
prep = prepare.Prepare()
315314
soc = update_soc.UpdateSoc(event_update_soc)
316315
set = setdata.SetData(event_ev_template,
317-
event_cp_config, event_soc,
318-
event_subdata_initialized)
316+
event_cp_config, event_soc)
319317
sub = subdata.SubData(event_ev_template,
320318
event_cp_config, loadvars_.event_module_update_completed,
321319
event_copy_data, event_global_data_initialized, event_command_completed,
322-
event_subdata_initialized, soc.event_vehicle_update_completed,
320+
soc.event_vehicle_update_completed,
323321
general_internal_chargepoint_handler.event_start,
324322
general_internal_chargepoint_handler.event_stop,
325323
event_update_config_completed,
@@ -348,7 +346,6 @@ def schedule_jobs():
348346
Thread(target=start_modbus_server, args=(event_modbus_server,), name="Modbus Control Server").start()
349347
# Warten, damit subdata Zeit hat, alle Topics auf dem Broker zu empfangen.
350348
event_update_config_completed.wait(300)
351-
event_subdata_initialized.wait(300)
352349
Pub().pub("openWB/set/system/boot_done", True)
353350
Path(Path(__file__).resolve().parents[1]/"ramdisk"/"bootdone").touch()
354351
schedule_jobs()

packages/modules/update_soc_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020

2121
@pytest.fixture(autouse=True)
22-
def mock_data() -> None:
22+
def mock_data(monkeypatch) -> None:
2323
data.data_init(Mock())
2424

25-
SubData(*([Mock()]*16))
25+
monkeypatch.setattr(SubData, "initialize", lambda x: None)
26+
SubData(*([Mock()]*15))
2627
SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock(
2728
spec=Chargepoint,
2829
id=id,

0 commit comments

Comments
 (0)